Питання 5 · Розділ 15

Що таке Consumer Group

Уявіть команду кур'єрів:

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Що таке Consumer Group?

Consumer Group — це група консьюмерів, які спільно читають дані з топика. Кожен консьюмер у групі обробляє свою частину партицій.

Навіщо: механізм, який дозволяє кільком консьюмерам ДІЛИТИ партиції між собою так, щоб кожне повідомлення оброблялося рівно одним консьюмером з групи. Без групи кожен консьюмер читав би всі повідомлення (дублювання).

Аналогія

Уявіть команду кур’єрів:

  • Топик — це склад із замовленнями (партиції = різні полиці)
  • Consumer Group — це одна команда кур’єрів
  • Кожен кур’єр (консьюмер) бере замовлення зі своїх полиць
  • Два кур’єри не беруть замовлення з однієї полиці (уникають дублювання)

Якщо кур’єр йде (падає), його полиці автоматично перерозподіляються між рештою — це називається ребаланс.

Consumer Group "order-processors":
  Consumer 1 → читає Партицію 0
  Consumer 2 → читає Партицію 1
  Consumer 3 → читає Партицію 2

Основні правила

  • Кожна партиція обробляється тільки одним консьюмером у групі
  • Один консьюмер може читати кілька партицій
  • Якщо консьюмерів більше, ніж партицій — зайві простоюють

Навіщо потрібні Consumer Groups?

  1. Масштабування — більше консьюмерів = більше паралелізм
  2. Відмовостійкість — якщо один консьюмер впаде, інші підхоплять його партиції
  3. Ізоляція — різні групи читають один топик незалежно

Простий приклад

// Consumer 1
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Consumer 2 (в тій самій групі)
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Обидва консьюмери будуть читати різні партиції

🟡 Middle Level

Коли НЕ використовувати Consumer Group

  1. Потрібно щоб кожне повідомлення прочитали ВСІ консьюмери — запустіть різні group.id
  2. Потрібен broadcast-паттерн (кожен консьюмер читає все) — різні group.id

Балансування партицій

3 партиції, 2 консьюмери:
  C1 → P0, P1
  C2 → P2

3 партиції, 3 консьюмери:
  C1 → P0
  C2 → P1
  C3 → P2  ← ідеальний баланс

3 партиції, 5 консьюмерів:
  C1 → P0
  C2 → P1
  C3 → P2
  C4 → idle (простоює)
  C5 → idle (простоює)

Partition Assignment Strategies

Стратегія Як працює Плюси Мінуси
Range Ділить партиції по діапазону Проста Нерівномірність при непарному числі
RoundRobin Чергує партиції Більш рівномірна Переміщує багато при ребалансі
Sticky Мінімізує переміщення Стабільність Складніше в реалізації
CooperativeSticky Покрокове переміщення Мінімальний простой Kafka 2.3+

Range Assignor

Партиції 0,1,2,3,4 → 2 консьюмери:
  C1: 0, 1, 2  (3 партиції)
  C2: 3, 4     (2 партиції)
  → Нерівномірно!

RoundRobin Assignor

Партиції 0,1,2,3,4 → 2 консьюмери:
  C1: 0, 2, 4  (3 партиції)
  C2: 1, 3     (2 партиції)
  → Краще, але все ще не ідеально

Rebalancing

Відбувається при:

  • Додаванні/видаленні консьюмера
  • Додаванні партицій в топик
  • Зміні підписки на топики
  • Таймауті сесії консьюмера
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync();  // Коммітити offsets перед ребалансом
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Ініціалізація нових партицій
    }
});

Multiple Consumer Groups

Один топик "orders" читається двома групами:

Group "order-processing":
  C1 → P0, P1
  C2 → P2, P3

Group "analytics":
  C3 → P0, P1, P2, P3  // читає всі дані незалежно

Кожна група отримує повну копію даних!

Типові помилки

Помилка Наслідок Рішення
Більше консьюмерів ніж партицій Зайві простоюють, витрачають ресурси Consumer count == Partition count
Без ConsumerRebalanceListener Втрата offsets при ребалансі Завжди використовуйте listener
Однаковий group.id для різних додатків Ділять партиції → кожен отримує частину даних Унікальний group.id per application
Довга обробка між poll Виключення з групи → ребаланс Зменшіть max.poll.records або оптимізуйте

🔴 Senior Level

Group Coordinator Protocol — внутрішній устрій

Group Coordinator — це один з брокерів кластера, обраний контролером. Відповідає за:

  • Управління членством у групі (Join/Leave)
  • Розподіл партицій (Assignment)
  • Зберігання committed offsets в __consumer_offsets
Протокол групи (Kafka < 3.0, Consumer Protocol v0):

1. JoinGroup Request:
   Consumer → Coordinator: "Я хочу вступити в групу"
   Coordinator → Leader: "Ось список всіх members"

2. Leader обчислює assignment:
   Leader (один з консьюмерів) запускає PartitionAssignor
   Результат: Map<memberId, List<TopicPartition>>

3. SyncGroup Request:
   Leader → Coordinator: assignment для всіх members
   Coordinator → кожен Consumer: його персональний assignment

4. Heartbeat:
   Кожен Consumer → Coordinator: "Я живий" (кожні heartbeat.interval.ms)

5. LeaveGroup:
   Consumer → Coordinator: "Я йду"
   → Trigger rebalance

Kafka 3.0+ Consumer Protocol v1 (KIP-848):

KIP (Kafka Improvement Proposal) — процес пропозицій щодо покращення Kafka.
KIP-848 — пропозиція про server-side assignment.
Group Coordinator — спеціальний брокер, який управляє членством у групі.

Новий протокол на основі heartbeat RPC:
- Assignment обчислюється на Coordinator (не на Leader)
- Усунуто SyncGroup round-trip
- Підтримує Server-Side Assignment
- Швидший ребаланс (на 1-2 RTT менше)

__consumer_offsets — Internal Topic

Тема: __consumer_offsets
Партиції: 50 (за замовчуванням, offsets.topic.num.partitions)
RF: 3 (за замовчуванням)
Cleanup: compact (зберігає тільки останній offset для кожної групи)

Format ключа: [group.id, topic, partition]
Format значення: [offset, metadata, commitTimestamp, expireTimestamp]

Приклад:
  Key: ["order-processors", "orders", 0]
  Value: [offset=42567, metadata="", timestamp=1712345678]

Зміщення комміту:

// Auto commit (enable.auto.commit=true)
// Комміт кожні auto.commit.interval.ms (5 секунд за замовчуванням)
// Ризик: повідомлення можуть бути втрачені при crash між commit і processing

// Manual commit (enable.auto.commit=false)
consumer.commitSync();      // Синхронний — блокує до підтвердження
consumer.commitAsync();     // Асинхронний — не блокує, retry callback

Sticky Assignor — алгоритм

Алгоритм Sticky Assignor (Kafka 2.3+):

1. Поточний assignment: C1={P0,P1}, C2={P2}
2. Новий консьюмер C3 приєднується
3. Sticky зберігає поточні призначення:
   C1: {P0, P1} → зберігає (не переміщує)
   C2: {P2} → зберігає (не переміщує)
4. Решта партиції розподіляються:
   C3: {} → отримує вільні партиції
   Якщо партицій немає → C3 = idle

Переваги перед Range/RoundRobin:

  • Мінімізує partition movement (менше disruption)
  • Зберігає locality даних (page cache warm)
  • Менше reprocessing при ребалансі

Cooperative Rebalancing (KIP-429, Kafka 2.3+)

Eager Rebalancing (старий):

1. Trigger rebalance
2. ALL consumers revoke ALL partitions
3. Stop processing
4. Compute new assignment
5. Resume processing
→ Full stop всій групі на 5-30 секунд

Cooperative Rebalancing (новий):

1. Trigger rebalance
2. Кожен consumer відкликає ТІЛЬКИ деякі партиції
3. Continue processing з рештою партицій
4. Incremental assignment для відкликаних партицій
5. Repeat until stable
→ Мінімальний простой, processing продовжується
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Static Membership (KIP-345)

props.put("group.instance.id", "consumer-1");  // Унікальний ID інстанса

Поведінка:

  • При перезапуску консьюмер з тим самим group.instance.id не викликає ребаланс
  • Coordinator чекає session.timeout.ms перед виключенням
  • При поверненні — отримує ті самі партиції
  • Зменшує churn при rolling deploy

Порівняння: | | Dynamic Membership | Static Membership | | —————– | —————————- | —————————————– | | group.instance.id | Не заданий | Заданий | | Restart | Rebalance | No rebalance (в межах session.timeout) | | Rolling deploy | N ребалансів для N інстансів | 0 ребалансів | | Crash detection | session.timeout.ms | session.timeout.ms |

Edge Cases (3+)

  1. Split-Brain Consumer Group: При network partition консьюмери можуть втратити зв’язок з coordinator. Якщо session.timeout.ms спливає, coordinator виключає їх і запускає ребаланс. При відновленні мережі «старі» консьюмери думають, що вони все ще в групі, а «нові» вже отримали їх партиції. Результат: duplicate processing — два консьюмери читають одну партицію. Рішення: session.timeout.ms має бути достатньо великим, використовуйте static membership.

  2. Offset Commit Race Condition: Consumer A читає offset 100-150, починає обробку. Rebalance → партиція переходить до Consumer B. Consumer A завершує обробку і коммітить offset 151. Consumer B починає з offset 151, але повідомлення 100-150 ще не оброблені Consumer A (обробка була async). Рішення: commitSync в onPartitionsRevoked + blocking processing.

  3. __consumer_offsets Partition Unavailable: Якщо партиція __consumer_offsets, що зберігає offsets групи, стає недоступною (leader down, ISR < min.insync.replicas), консьюмери не можуть коммітити offsets. Вони продовжують читати, але при перезапуску почнуть з останнього збереженого offset → можливе дублювання. При min.insync.replicas=2 і RF=3 це малоймовірно, але можливе при подвійному збої.

  4. Max Poll Records + Slow Processing: max.poll.records=500, обробка одного повідомлення займає 1 секунду. Обробка 500 повідомлень = 500 секунд. max.poll.interval.ms=300000 (5 хвилин). Consumer не встигає викликати poll() протягом 5 хвилин → coordinator виключає → rebalance. Рішення: Зменште max.poll.records або збільште max.poll.interval.ms.

  5. Consumer Group з 0 активних консьюмерів: Група існує в __consumer_offsets, але всі консьюмери offline. При запуску нового консьюмера він отримує всі партиції і починає з останнього committed offset. Якщо offsets старі (retention expired), почне з auto.offset.reset (earliest/latest). Це може викликати massive replay або skip даних.

Performance Numbers

Метрика Значення Умови
Rebalance time (eager) 5-30 секунд Залежить від розміру групи
Rebalance time (cooperative) 1-5 секунд Покрокове переміщення
Heartbeat interval 3-10 секунд 1/3 від session.timeout.ms
Offset commit latency 1-5 ms Синхронний, локальний брокер
__consumer_offsets throughput ~10K commits/sec 50 партицій
Max consumers per group Обмежено партиціями Практично: сотні

Production War Story

Ситуація: Платіжний процесор з 15 консьюмерами в групі payment-processors (15 партицій). Використовували Range assignor і eager rebalancing. При rolling deploy (послідовний перезапуск усіх 15 подів в Kubernetes) кожен под викликав ребаланс.

Проблема: 15 послідовних ребалансів зайняли 15 × 15 секунд = 225 секунд простою. За цей час lag виріс до 200K повідомлень. Після відновлення деякі платежі обробилися двічі (offset commit race condition).

Діагностика:

kafka-consumer-groups.sh --describe --group payment-processors
# Показав: STATE=Dead на 225 секунд, потім STATE=Empty → Rebalancing
# Consumer lag: 0 → 200K → 0 (після catchup з duplicates)

Рішення:

  1. Перейшли на CooperativeStickyAssignor — ребаланс став 2-3 секунди
  2. Ввели group.instance.id=${POD_NAME} — при rolling deploy не було ребалансів
  3. Додали ConsumerRebalanceListener з commitSync в onPartitionsRevoked
  4. Налаштували Pod Disruption Budget в K8s — максимум 1 под вниз одночасно
  5. Збільшили session.timeout.ms до 45 секунд, heartbeat.interval.ms до 15 секунд

Результат: Rolling deploy — 0 ребалансів, 0 downtime, 0 duplicates.

Урок: Eager rebalancing + rolling deploy = cascading failure. Завжди використовуйте cooperative rebalancing + static membership для production.

Моніторинг (JMX + Burrow)

JMX метрики:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-metrics,client-id=*,key=commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,key=records-lag-max

Burrow:

  • Consumer group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend analysis
  • HTTP API → Grafana dashboard → PagerDuty alerts
  • Не потребує модифікації консьюмерів (читає __consumer_offsets напряму)

Highload Best Practices

  1. Consumer count == Partition count — ідеальний баланс
  2. CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  3. Static membership (group.instance.id) — для rolling deploy stability
  4. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  5. ConsumerRebalanceListener обов’язковий — commitSync + state cleanup
  6. Monitor rebalance frequency — alert при > 1 rebalance за 10 хвилин
  7. enable.auto.commit=false — завжди коммітьте після обробки
  8. max.poll.records підбирайте під latency обробки (ціль: poll < max.poll.interval.ms)
  9. Burrow для monitoring — consumer-centric, не потребує instrumentation
  10. KIP-848 (Server-Side Assignment) — використовуйте при міграції на Kafka 3.0+

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Consumer Group — група консьюмерів, спільно читаючих топик; кожна партиція = один консьюмер
  • __consumer_offsets — внутрішній топик для зберігання зміщень (compact)
  • Partition Assignment Strategies: Range, RoundRobin, Sticky, CooperativeSticky
  • Rebalancing відбувається при додаванні/видаленні консьюмера, зміні партицій
  • CooperativeStickyAssignor (Kafka 2.3+) — мінімальний простой при ребалансі
  • Static Membership (group.instance.id) — усуває ребаланс при rolling deploy
  • Eager rebalancing = full stop; Cooperative = incremental, processing продовжується
  • Ideal: consumer count == partition count

Часті уточнюючі запитання:

  • Що якщо консьюмерів більше ніж партицій? — Зайві простоюють (idle), споживають ресурси.
  • Що таке __consumer_offsets? — Внутрішній compact-топик, зберігає offsets для всіх consumer groups.
  • Чим Cooperative відрізняється від Eager? — Eager: всі зупиняються. Cooperative: тільки партиції, що переміщуються.
  • Навіщо потрібен group.instance.id? — Static membership: при перезапуску той самий консьюмер отримує ті самі партиції без ребалансу.

Червоні прапорці (НЕ говорити):

  • «Два консьюмери можуть читати одну партицію в групі» — тільки один
  • «Автокомміт — найкращий вибір для production» — ручний комміт після обробки
  • «Ребаланс — це швидко» — eager: 5-30 секунд простою
  • «Range assignor — найкращий для production» — нерівномірний, використовуйте CooperativeSticky

Пов’язані теми:

  • [[6. Як працює балансування консьюмерів у групі]]
  • [[7. Чи можна мати більше консьюмерів, ніж партицій]]
  • [[8. Що станеться при додаванні нового консьюмера в групі]]
  • [[15. Що таке ребаланс і коли він відбувається]]
  • [[13. Як працює commit offset]]