Як працює балансування консьюмерів у групі
Уявіть піцерію, де потрібно розрізати піцу (партиції) між друзями (консьюмерами):
🟢 Junior Level
Що таке балансування?
Балансування — це процес автоматичного розподілу партицій між консьюмерами в групі. Kafka сама вирішує, який консьюмер які партиції читатиме.
Навіщо: без неї довелося б вручну призначати партиції кожному консьюмеру. При зміні числа консьюмерів або партицій — все перераховується автоматично.
Аналогія
Уявіть піцерію, де потрібно розрізати піцу (партиції) між друзями (консьюмерами):
- 3 шматки піци, 3 друзі → кожен отримує 1 шматок
- 3 шматки, 2 друзі → один отримує 2 шматки, інший — 1
- 3 шматки, 5 друзів → 3 отримують по шматку, 2 залишаються голодними
Основне правило
Кількість активних консьюмерів <= Кількість партицій
Приклад
3 партиції, 3 консьюмери:
C1 → P0
C2 → P1
C3 → P2 ← ідеальний баланс
Додали C4:
C1 → P0
C2 → P1
C3 → P2
C4 → чекає (немає вільних партицій)
Як це працює в коді
// Усі консьюмери з однаковим group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматично розподілить партиції
🟡 Middle Level
Стратегії балансування
| Стратегія | Алгоритм | Коли використовувати |
|---|---|---|
| Range | Ділить партиції по діапазону | Підходить коли число партицій кратне числу консьюмерів. При непарному діленні — нерівномірність. Для production з динамічним масштабуванням краще CooperativeSticky. |
| RoundRobin | Чергує партиції | Коли потрібна рівномірність |
| Sticky | Мінімізує переміщення | Production (Kafka 2.2+) |
| CooperativeSticky | Покрокове переміщення | Production (Kafka 2.3+, рекомендується) |
Range Assignor — докладно
Партиції 0,1,2,3 → 2 консьюмери:
C1: 0, 1 (2 партиції)
C2: 2, 3 (2 партиції)
→ Рівномірно
Партиції 0,1,2,3,4 → 2 консьюмери:
C1: 0, 1, 2 (3 партиції)
C2: 3, 4 (2 партиції)
→ Нерівномірно! (Range = numPartitions / numConsumers)
RoundRobin Assignor
Партиції 0,1,2,3,4 → 2 консьюмери:
C1: 0, 2, 4 (3 партиції)
C2: 1, 3 (2 партиції)
→ Трохи краще при непарному числі
Процес ребалансу
1. Подія-тригер (новий консьюмер, падіння, таймаут)
2. Group coordinator починає ребаланс
3. Усі консьюмери отримують сповіщення
4. Усі консьюмери зупиняють читання (eager)
або продовжують з частиною партицій (cooperative)
5. Обчислюється новий assignment
6. Assignment розсилається консьюмерам
7. Консьюмери починають читати нові партиції
ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 1. Коммітити offsets перед втратою партицій
consumer.commitSync();
// 2. Очистити локальні ресурси
cleanup(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 3. Ініціалізація нових партицій
initialize(partitions);
}
});
Chaos Rebalancing Problem
session.timeout.ms=10000, обробка батча = 15 секунд,
max.poll.interval.ms=300000. Консьюмер не встигає poll() → виключений →
ребаланс → навантаження на решті росте → ще один падає → лавина.
Консьюмери падають → ребаланс → нові консьюмери падають → цикл
Причини:
- Занадто короткий session.timeout.ms
- Довга обробка (max.poll.interval.ms exceeded)
- Проблеми з пам'яттю/CPU
Рішення:
- Збільшити таймаути
- Оптимізувати обробку
- Додати моніторинг
Порівняння стратегій
| Критерій | Range | RoundRobin | Sticky | CooperativeSticky |
|---|---|---|---|---|
| Рівномірність | ⚠️ | ✅ | ✅ | ✅ |
| Мінімізація переміщення | ❌ | ❌ | ✅ | ✅ |
| Простой при ребалансі | Повний | Повний | Повний | Частковий |
| Версія Kafka | Усі | Усі | 2.2+ | 2.3+ |
| Server-Side Assignment (KIP-848) | 3.0+ | Усунуто SyncGroup round-trip, швидший ребаланс |
Типові помилки
| Помилка | Наслідок | Рішення |
|---|---|---|
| Часті ребаланси | Постійний простой системи | Збільште таймаути, static membership |
| Без ConsumerRebalanceListener | Втрата offsets при ребалансі | Завжди використовуйте listener |
| Нерівномірний розподіл | Один консьюмер перевантажений | CooperativeSticky assignor |
| Range assignor в production | Нерівномірність + повний простой | Перейдіть на CooperativeSticky |
🔴 Senior Level
Group Coordinator Protocol — повна діаграма
Phase 1: JoinGroup
┌──────────────┐ ┌──────────────────┐ ┌──────────────┐
│ Consumer A │ │ Group Coordinator│ │ Consumer B │
└──────┬───────┘ └────────┬─────────┘ └──────┬───────┘
│ JoinGroup Request │ │
│ (group.id, memberId) │ │
│───────────────────────────►│ │
│ │ JoinGroup Request │
│ │◄─────────────────────────│
│ │ │
│ Leader вибирається (перший │ │
│ у списку members) │ │
│ │ │
│◄───────────────────────────│ JoinGroup Response │
│ (leaderId, allMembers) │ (memberId, leaderId) │
│ │─────────────────────────►│
Phase 2: Assignment (на Leader)
│ │ │
│ Leader обчислює │ │
│ assignment через │ │
│ PartitionAssignor │ │
│ assignment = { │ │
│ A: [P0, P1], │ │
│ B: [P2] │ │
│ } │ │
Phase 3: SyncGroup
│ │ │
│ SyncGroup Request │ │
│ (assignment для всіх) │ │
│───────────────────────────►│ │
│ │ │
│ │◄─────────────────────────│ SyncGroup Response
│ │ │ (свій assignment)
│◄───────────────────────────│ │
│ SyncGroup Response │ │
│ (assignment: P0, P1) │ │
Kafka 3.0+ (KIP-848): Assignment обчислюється на Coordinator, а не на Leader. Усунуто SyncGroup round-trip.
Eager vs Cooperative Rebalancing — internals
Eager Rebalancing:
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator
// onJoinComplete:
// 1. onPartitionsRevoked(ALL partitions) // Зупиняємо ВСЕ
// 2. newAssignment = computeAssignment()
// 3. onPartitionsAssigned(newAssignment) // Запускаємо НОВЕ
// Повна зупинка = full stop
Cooperative Rebalancing:
// CooperativeStickyAssignor:
// 1. onPartitionsRevoked(SUBSET partitions) // Тільки ті, що йдуть
// 2. onPartitionsAssigned(SUBSET partitions) // Тільки нові
// 3. Продовжуємо працювати з рештою партицій
// Алгоритм:
// currentAssignment = {P0, P1, P2}
// newAssignment = {P0, P3} // P1 і P2 пішли, P3 додано
// revoked = {P1, P2} // Тільки ці
// assigned = {P3} // Тільки ці
// continue processing P0 // Не зупиняємось!
Incremental Cooperative Rebalancing:
При великій кількості змін:
Round 1: revoke P1, assign P3
Round 2: revoke P2, assign P4
...
Кожен round — короткий (зазвичай < 1 секунди)
Обробка продовжується на решті партицій
Static Membership — як це працює всередині
props.put("group.instance.id", "consumer-1");
Protocol:
Dynamic consumer (без group.instance.id):
Join → assigned memberId = UUID → restart = new memberId → rebalance
Static consumer (з group.instance.id):
Join → assigned memberId = group.instance.id
→ restart = same memberId → NO rebalance (якщо в межах session.timeout)
→ Coordinator чекає session.timeout.ms перед виключенням
Умови для відсутності ребалансу:
- Consumer повернувся в межах
session.timeout.ms group.instance.idне використовується іншим консьюмером- Partition count не змінився
Duplicate group.instance.id:
Якщо два консьюмери з однаковим group.instance.id:
Другий отримує FencedInstanceIdException
→ Другий консьюмер завершується
→ Це запобігає duplicate processing
Production Configuration
# Оптимальна конфігурація для production
group.id: order-processors
group.instance.id: consumer-${HOSTNAME} # Static membership
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000 # 30 секунд
heartbeat.interval.ms: 10000 # 1/3 від session timeout
max.poll.interval.ms: 300000 # 5 хвилин на обробку
max.poll.records: 500 # батч для обробки
enable.auto.commit: false # Ручний комміт
Edge Cases (3+)
-
Cascading Rebalance (Thundering Herd): Один консьюмер падає → ребаланс → решта консьюмерів отримують більше партицій → навантаження росте → ще один консьюмер падає від перевантаження → ще ребаланс → лавина. Рішення: Збільште
session.timeout.ms, використовуйтеmax.poll.recordsдля контролю навантаження, налаштуйте monitoring alerts на consumer lag growth rate. -
Rebalance during Deployment: При rolling deploy N інстансів з eager rebalancing відбувається N ребалансів. Кожен ребаланс = 5-30 секунд простою. Для 20 інстансів = 100-600 секунд downtime. Рішення: Static membership + cooperative rebalancing = 0 ребалансів при rolling deploy.
-
Partition Stall after Rebalance: Після ребалансу новий консьюмер отримує партицію і починає з committed offset. Якщо обробка stateful (наприклад, windowed aggregation), новий консьюмер не має локального state → incorrect results до повного перерахунку. Рішення: External state store (RocksDB, Redis) з партицією як ключ; або state replay при assignment.
-
Cross-Rack Rebalance: В multi-rack кластері при призначенні партицій консьюмеру в іншому rack збільшується latency (network round-trip). Range/RoundRobin не враховують rack-awareness. Рішення: Custom PartitionAssignor з rack-aware assignment; або Consumer Sidecar в тому ж rack, що і брокери.
-
Rebalance Storm з K8s HPA: Horizontal Pod Autoscaler додає поди при рості CPU → нові консьюмери викликають ребаланс → обробка сповільнюється (rebalance overhead) → CPU росте ще більше → HPA додає ще поди → нескінченний цикл. Рішення: Не використовуйте HPA для Kafka консьюмерів. Натомість — custom metrics (consumer lag) з thresholds і cooldown period.
Performance Numbers
| Метрика | Eager | Cooperative |
|---|---|---|
| Rebalance latency | 5-30 секунд | 1-5 секунд |
| Processing during rebalance | Full stop | Continues (partial) |
| Partition movement | All | Incremental |
| Impact on consumer lag | High (2-10x spike) | Low (1.2-2x spike) |
| Number of rebalances per rolling deploy | N (per instance) | 0 (with static membership) |
Production War Story
Ситуація: Стрімінг-сервіс з 25 консьюмерами в групі
event-processors(25 партицій). Range assignor, dynamic membership. Кожної ночі о 02:00 — автоматичний rolling deploy (оновлення контейнерів).Проблема: 25 послідовних ребалансів × 15 секунд = 375 секунд (6+ хвилин) простою кожної ночі. За цей час lag виростав до 500K подій. Ранкові користувачі бачили «відстаючі» рекомендації.
Додаткова проблема: Range assignor при 25 партиціях і 24 консьюмерах (один на ребалансі) призначав 2 партиції одному консьюмеру і 1 — решті. Перевантажений консьюмер падав → ще ребаланс → ще падіння → cascading failure.
Діагностика:
kafka-consumer-groups.sh --describe --group event-processors # STATE=Rebalancing → STATE=Stable → STATE=Rebalancing (цикл) # Lag: 0 → 500K → 0 (за 15 хвилин після deploy)Рішення:
CooperativeStickyAssignor— ребаланс став 2 секунди замість 15group.instance.id=${POD_NAME}— rolling deploy = 0 ребалансівsession.timeout.ms=45000,heartbeat.interval.ms=15000— запас по таймауту- ConsumerRebalanceListener з commitSync + state flush
- K8s PodDisruptionBudget: maxUnavailable=1
- Lag-based HPA замість CPU-based (custom metric через Prometheus Adapter)
Результат: Rolling deploy — 0 ребалансів, 0 downtime, lag < 1K.
Урок: Eager rebalancing + dynamic membership + rolling deploy = guaranteed cascading failure. Cooperative + static membership = zero-downtime deploys.
Моніторинг (JMX + Burrow)
JMX метрики:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=failed-rebalance-total
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-heartbeat-seconds-ago
Burrow:
- Group status: OK, WARN, ERR, STOP, STALL
- Per-partition lag with trend
- HTTP API → Grafana → Alertmanager
Alert правила:
- alert: KafkaRebalanceTooFrequent
expr: rate(kafka_consumer_coordinator_rebalance_rate[5m]) > 0.1
for: 5m
severity: warning
- alert: KafkaConsumerStalled
expr: kafka_consumer_records_lag_max > 100000
for: 10m
severity: critical
Highload Best Practices
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — усуває ребаланс на rolling deploy - Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3 - ConsumerRebalanceListener — commitSync + state cleanup в
onPartitionsRevoked - Monitor rebalance frequency — alert при > 1 rebalance за 10 хвилин
- max.poll.records підбирайте під latency — ціль: processing < max.poll.interval.ms
- Не використовуйте HPA по CPU — використовуйте custom metric (consumer lag)
- PodDisruptionBudget — maxUnavailable=1 для запобігання cascading rebalances
- Cross-rack awareness — custom assignor для multi-rack кластерів
- Stateful processing — external state store (RocksDB, Redis) keyed by partition
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Балансування = автоматичний розподіл партицій між консьюмерами в групі
- Стратегії: Range (нерівномірний), RoundRobin, Sticky, CooperativeSticky (рекомендується)
- Cooperative rebalancing: покрокове переміщення, processing продовжується на решті партицій
- ConsumerRebalanceListener обов’язковий: commitSync в onPartitionsRevoked
- Static membership (
group.instance.id) — 0 ребалансів при rolling deploy - Chaos rebalancing: один падає → навантаження росте → ще один падає → лавина
- Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3
Часті уточнюючі запитання:
- Що таке cascading rebalance? — Один консьюмер падає → навантаження на решті → ще падає → лавина.
- Чому HPA по CPU поганий для Kafka? — Ребаланс сповільнює обробку → CPU росте → ще поди → нескінченний цикл.
- Що робить ConsumerRebalanceListener? — Коммітить offsets і очищає state перед втратою партицій.
- Як уникнути ребалансу при деплої? — Static membership + CooperativeStickyAssignor.
Червоні прапорці (НЕ говорити):
- «Range assignor — стандарт для production» — нерівномірний, використовуйте CooperativeSticky
- «Більше консьюмерів = завжди більше throughput» — обмежено числом партицій
- «Ребаланс миттєвий» — 5-30 секунд (eager), 1-5 (cooperative)
- «Можна ігнорувати session.timeout.ms» — занадто короткий = хибні ребаланси
Пов’язані теми:
- [[5. Що таке Consumer Group]]
- [[8. Що станеться при додаванні нового консьюмера в групі]]
- [[15. Що таке ребаланс і коли він відбувається]]
- [[7. Чи можна мати більше консьюмерів, ніж партицій]]