Що таке Consumer Group
Уявіть команду кур'єрів:
🟢 Junior Level
Що таке Consumer Group?
Consumer Group — це група консьюмерів, які спільно читають дані з топика. Кожен консьюмер у групі обробляє свою частину партицій.
Навіщо: механізм, який дозволяє кільком консьюмерам ДІЛИТИ партиції між собою так, щоб кожне повідомлення оброблялося рівно одним консьюмером з групи. Без групи кожен консьюмер читав би всі повідомлення (дублювання).
Аналогія
Уявіть команду кур’єрів:
- Топик — це склад із замовленнями (партиції = різні полиці)
- Consumer Group — це одна команда кур’єрів
- Кожен кур’єр (консьюмер) бере замовлення зі своїх полиць
- Два кур’єри не беруть замовлення з однієї полиці (уникають дублювання)
Якщо кур’єр йде (падає), його полиці автоматично перерозподіляються між рештою — це називається ребаланс.
Consumer Group "order-processors":
Consumer 1 → читає Партицію 0
Consumer 2 → читає Партицію 1
Consumer 3 → читає Партицію 2
Основні правила
- Кожна партиція обробляється тільки одним консьюмером у групі
- Один консьюмер може читати кілька партицій
- Якщо консьюмерів більше, ніж партицій — зайві простоюють
Навіщо потрібні Consumer Groups?
- Масштабування — більше консьюмерів = більше паралелізм
- Відмовостійкість — якщо один консьюмер впаде, інші підхоплять його партиції
- Ізоляція — різні групи читають один топик незалежно
Простий приклад
// Consumer 1
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));
// Consumer 2 (в тій самій групі)
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));
// Обидва консьюмери будуть читати різні партиції
🟡 Middle Level
Коли НЕ використовувати Consumer Group
- Потрібно щоб кожне повідомлення прочитали ВСІ консьюмери — запустіть різні group.id
- Потрібен broadcast-паттерн (кожен консьюмер читає все) — різні group.id
Балансування партицій
3 партиції, 2 консьюмери:
C1 → P0, P1
C2 → P2
3 партиції, 3 консьюмери:
C1 → P0
C2 → P1
C3 → P2 ← ідеальний баланс
3 партиції, 5 консьюмерів:
C1 → P0
C2 → P1
C3 → P2
C4 → idle (простоює)
C5 → idle (простоює)
Partition Assignment Strategies
| Стратегія | Як працює | Плюси | Мінуси |
|---|---|---|---|
| Range | Ділить партиції по діапазону | Проста | Нерівномірність при непарному числі |
| RoundRobin | Чергує партиції | Більш рівномірна | Переміщує багато при ребалансі |
| Sticky | Мінімізує переміщення | Стабільність | Складніше в реалізації |
| CooperativeSticky | Покрокове переміщення | Мінімальний простой | Kafka 2.3+ |
Range Assignor
Партиції 0,1,2,3,4 → 2 консьюмери:
C1: 0, 1, 2 (3 партиції)
C2: 3, 4 (2 партиції)
→ Нерівномірно!
RoundRobin Assignor
Партиції 0,1,2,3,4 → 2 консьюмери:
C1: 0, 2, 4 (3 партиції)
C2: 1, 3 (2 партиції)
→ Краще, але все ще не ідеально
Rebalancing
Відбувається при:
- Додаванні/видаленні консьюмера
- Додаванні партицій в топик
- Зміні підписки на топики
- Таймауті сесії консьюмера
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // Коммітити offsets перед ребалансом
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Ініціалізація нових партицій
}
});
Multiple Consumer Groups
Один топик "orders" читається двома групами:
Group "order-processing":
C1 → P0, P1
C2 → P2, P3
Group "analytics":
C3 → P0, P1, P2, P3 // читає всі дані незалежно
Кожна група отримує повну копію даних!
Типові помилки
| Помилка | Наслідок | Рішення |
|---|---|---|
| Більше консьюмерів ніж партицій | Зайві простоюють, витрачають ресурси | Consumer count == Partition count |
| Без ConsumerRebalanceListener | Втрата offsets при ребалансі | Завжди використовуйте listener |
| Однаковий group.id для різних додатків | Ділять партиції → кожен отримує частину даних | Унікальний group.id per application |
| Довга обробка між poll | Виключення з групи → ребаланс | Зменшіть max.poll.records або оптимізуйте |
🔴 Senior Level
Group Coordinator Protocol — внутрішній устрій
Group Coordinator — це один з брокерів кластера, обраний контролером. Відповідає за:
- Управління членством у групі (Join/Leave)
- Розподіл партицій (Assignment)
- Зберігання committed offsets в
__consumer_offsets
Протокол групи (Kafka < 3.0, Consumer Protocol v0):
1. JoinGroup Request:
Consumer → Coordinator: "Я хочу вступити в групу"
Coordinator → Leader: "Ось список всіх members"
2. Leader обчислює assignment:
Leader (один з консьюмерів) запускає PartitionAssignor
Результат: Map<memberId, List<TopicPartition>>
3. SyncGroup Request:
Leader → Coordinator: assignment для всіх members
Coordinator → кожен Consumer: його персональний assignment
4. Heartbeat:
Кожен Consumer → Coordinator: "Я живий" (кожні heartbeat.interval.ms)
5. LeaveGroup:
Consumer → Coordinator: "Я йду"
→ Trigger rebalance
Kafka 3.0+ Consumer Protocol v1 (KIP-848):
KIP (Kafka Improvement Proposal) — процес пропозицій щодо покращення Kafka.
KIP-848 — пропозиція про server-side assignment.
Group Coordinator — спеціальний брокер, який управляє членством у групі.
Новий протокол на основі heartbeat RPC:
- Assignment обчислюється на Coordinator (не на Leader)
- Усунуто SyncGroup round-trip
- Підтримує Server-Side Assignment
- Швидший ребаланс (на 1-2 RTT менше)
__consumer_offsets — Internal Topic
Тема: __consumer_offsets
Партиції: 50 (за замовчуванням, offsets.topic.num.partitions)
RF: 3 (за замовчуванням)
Cleanup: compact (зберігає тільки останній offset для кожної групи)
Format ключа: [group.id, topic, partition]
Format значення: [offset, metadata, commitTimestamp, expireTimestamp]
Приклад:
Key: ["order-processors", "orders", 0]
Value: [offset=42567, metadata="", timestamp=1712345678]
Зміщення комміту:
// Auto commit (enable.auto.commit=true)
// Комміт кожні auto.commit.interval.ms (5 секунд за замовчуванням)
// Ризик: повідомлення можуть бути втрачені при crash між commit і processing
// Manual commit (enable.auto.commit=false)
consumer.commitSync(); // Синхронний — блокує до підтвердження
consumer.commitAsync(); // Асинхронний — не блокує, retry callback
Sticky Assignor — алгоритм
Алгоритм Sticky Assignor (Kafka 2.3+):
1. Поточний assignment: C1={P0,P1}, C2={P2}
2. Новий консьюмер C3 приєднується
3. Sticky зберігає поточні призначення:
C1: {P0, P1} → зберігає (не переміщує)
C2: {P2} → зберігає (не переміщує)
4. Решта партиції розподіляються:
C3: {} → отримує вільні партиції
Якщо партицій немає → C3 = idle
Переваги перед Range/RoundRobin:
- Мінімізує partition movement (менше disruption)
- Зберігає locality даних (page cache warm)
- Менше reprocessing при ребалансі
Cooperative Rebalancing (KIP-429, Kafka 2.3+)
Eager Rebalancing (старий):
1. Trigger rebalance
2. ALL consumers revoke ALL partitions
3. Stop processing
4. Compute new assignment
5. Resume processing
→ Full stop всій групі на 5-30 секунд
Cooperative Rebalancing (новий):
1. Trigger rebalance
2. Кожен consumer відкликає ТІЛЬКИ деякі партиції
3. Continue processing з рештою партицій
4. Incremental assignment для відкликаних партицій
5. Repeat until stable
→ Мінімальний простой, processing продовжується
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Static Membership (KIP-345)
props.put("group.instance.id", "consumer-1"); // Унікальний ID інстанса
Поведінка:
- При перезапуску консьюмер з тим самим
group.instance.idне викликає ребаланс - Coordinator чекає
session.timeout.msперед виключенням - При поверненні — отримує ті самі партиції
- Зменшує churn при rolling deploy
Порівняння: | | Dynamic Membership | Static Membership | | —————– | —————————- | —————————————– | | group.instance.id | Не заданий | Заданий | | Restart | Rebalance | No rebalance (в межах session.timeout) | | Rolling deploy | N ребалансів для N інстансів | 0 ребалансів | | Crash detection | session.timeout.ms | session.timeout.ms |
Edge Cases (3+)
-
Split-Brain Consumer Group: При network partition консьюмери можуть втратити зв’язок з coordinator. Якщо
session.timeout.msспливає, coordinator виключає їх і запускає ребаланс. При відновленні мережі «старі» консьюмери думають, що вони все ще в групі, а «нові» вже отримали їх партиції. Результат: duplicate processing — два консьюмери читають одну партицію. Рішення:session.timeout.msмає бути достатньо великим, використовуйте static membership. -
Offset Commit Race Condition: Consumer A читає offset 100-150, починає обробку. Rebalance → партиція переходить до Consumer B. Consumer A завершує обробку і коммітить offset 151. Consumer B починає з offset 151, але повідомлення 100-150 ще не оброблені Consumer A (обробка була async). Рішення: commitSync в
onPartitionsRevoked+ blocking processing. -
__consumer_offsets Partition Unavailable: Якщо партиція
__consumer_offsets, що зберігає offsets групи, стає недоступною (leader down, ISR < min.insync.replicas), консьюмери не можуть коммітити offsets. Вони продовжують читати, але при перезапуску почнуть з останнього збереженого offset → можливе дублювання. Приmin.insync.replicas=2і RF=3 це малоймовірно, але можливе при подвійному збої. -
Max Poll Records + Slow Processing:
max.poll.records=500, обробка одного повідомлення займає 1 секунду. Обробка 500 повідомлень = 500 секунд.max.poll.interval.ms=300000(5 хвилин). Consumer не встигає викликатиpoll()протягом 5 хвилин → coordinator виключає → rebalance. Рішення: Зменштеmax.poll.recordsабо збільштеmax.poll.interval.ms. -
Consumer Group з 0 активних консьюмерів: Група існує в
__consumer_offsets, але всі консьюмери offline. При запуску нового консьюмера він отримує всі партиції і починає з останнього committed offset. Якщо offsets старі (retention expired), почне зauto.offset.reset(earliest/latest). Це може викликати massive replay або skip даних.
Performance Numbers
| Метрика | Значення | Умови |
|---|---|---|
| Rebalance time (eager) | 5-30 секунд | Залежить від розміру групи |
| Rebalance time (cooperative) | 1-5 секунд | Покрокове переміщення |
| Heartbeat interval | 3-10 секунд | 1/3 від session.timeout.ms |
| Offset commit latency | 1-5 ms | Синхронний, локальний брокер |
| __consumer_offsets throughput | ~10K commits/sec | 50 партицій |
| Max consumers per group | Обмежено партиціями | Практично: сотні |
Production War Story
Ситуація: Платіжний процесор з 15 консьюмерами в групі
payment-processors(15 партицій). Використовували Range assignor і eager rebalancing. При rolling deploy (послідовний перезапуск усіх 15 подів в Kubernetes) кожен под викликав ребаланс.Проблема: 15 послідовних ребалансів зайняли 15 × 15 секунд = 225 секунд простою. За цей час lag виріс до 200K повідомлень. Після відновлення деякі платежі обробилися двічі (offset commit race condition).
Діагностика:
kafka-consumer-groups.sh --describe --group payment-processors # Показав: STATE=Dead на 225 секунд, потім STATE=Empty → Rebalancing # Consumer lag: 0 → 200K → 0 (після catchup з duplicates)Рішення:
- Перейшли на
CooperativeStickyAssignor— ребаланс став 2-3 секунди- Ввели
group.instance.id=${POD_NAME}— при rolling deploy не було ребалансів- Додали
ConsumerRebalanceListenerз commitSync вonPartitionsRevoked- Налаштували Pod Disruption Budget в K8s — максимум 1 под вниз одночасно
- Збільшили
session.timeout.msдо 45 секунд,heartbeat.interval.msдо 15 секундРезультат: Rolling deploy — 0 ребалансів, 0 downtime, 0 duplicates.
Урок: Eager rebalancing + rolling deploy = cascading failure. Завжди використовуйте cooperative rebalancing + static membership для production.
Моніторинг (JMX + Burrow)
JMX метрики:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-metrics,client-id=*,key=commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,key=records-lag-max
Burrow:
- Consumer group status: OK, WARN, ERR, STOP, STALL
- Per-partition lag with trend analysis
- HTTP API → Grafana dashboard → PagerDuty alerts
- Не потребує модифікації консьюмерів (читає
__consumer_offsetsнапряму)
Highload Best Practices
- Consumer count == Partition count — ідеальний баланс
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — для rolling deploy stability - Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3 - ConsumerRebalanceListener обов’язковий — commitSync + state cleanup
- Monitor rebalance frequency — alert при > 1 rebalance за 10 хвилин
- enable.auto.commit=false — завжди коммітьте після обробки
- max.poll.records підбирайте під latency обробки (ціль: poll < max.poll.interval.ms)
- Burrow для monitoring — consumer-centric, не потребує instrumentation
- KIP-848 (Server-Side Assignment) — використовуйте при міграції на Kafka 3.0+
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Consumer Group — група консьюмерів, спільно читаючих топик; кожна партиція = один консьюмер
__consumer_offsets— внутрішній топик для зберігання зміщень (compact)- Partition Assignment Strategies: Range, RoundRobin, Sticky, CooperativeSticky
- Rebalancing відбувається при додаванні/видаленні консьюмера, зміні партицій
- CooperativeStickyAssignor (Kafka 2.3+) — мінімальний простой при ребалансі
- Static Membership (
group.instance.id) — усуває ребаланс при rolling deploy - Eager rebalancing = full stop; Cooperative = incremental, processing продовжується
- Ideal: consumer count == partition count
Часті уточнюючі запитання:
- Що якщо консьюмерів більше ніж партицій? — Зайві простоюють (idle), споживають ресурси.
- Що таке __consumer_offsets? — Внутрішній compact-топик, зберігає offsets для всіх consumer groups.
- Чим Cooperative відрізняється від Eager? — Eager: всі зупиняються. Cooperative: тільки партиції, що переміщуються.
- Навіщо потрібен group.instance.id? — Static membership: при перезапуску той самий консьюмер отримує ті самі партиції без ребалансу.
Червоні прапорці (НЕ говорити):
- «Два консьюмери можуть читати одну партицію в групі» — тільки один
- «Автокомміт — найкращий вибір для production» — ручний комміт після обробки
- «Ребаланс — це швидко» — eager: 5-30 секунд простою
- «Range assignor — найкращий для production» — нерівномірний, використовуйте CooperativeSticky
Пов’язані теми:
- [[6. Як працює балансування консьюмерів у групі]]
- [[7. Чи можна мати більше консьюмерів, ніж партицій]]
- [[8. Що станеться при додаванні нового консьюмера в групі]]
- [[15. Що таке ребаланс і коли він відбувається]]
- [[13. Як працює commit offset]]