Питання 6 · Розділ 15

Як працює балансування консьюмерів у групі

Уявіть піцерію, де потрібно розрізати піцу (партиції) між друзями (консьюмерами):

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Що таке балансування?

Балансування — це процес автоматичного розподілу партицій між консьюмерами в групі. Kafka сама вирішує, який консьюмер які партиції читатиме.

Навіщо: без неї довелося б вручну призначати партиції кожному консьюмеру. При зміні числа консьюмерів або партицій — все перераховується автоматично.

Аналогія

Уявіть піцерію, де потрібно розрізати піцу (партиції) між друзями (консьюмерами):

  • 3 шматки піци, 3 друзі → кожен отримує 1 шматок
  • 3 шматки, 2 друзі → один отримує 2 шматки, інший — 1
  • 3 шматки, 5 друзів → 3 отримують по шматку, 2 залишаються голодними

Основне правило

Кількість активних консьюмерів <= Кількість партицій

Приклад

3 партиції, 3 консьюмери:
  C1 → P0
  C2 → P1
  C3 → P2  ← ідеальний баланс

Додали C4:
  C1 → P0
  C2 → P1
  C3 → P2
  C4 → чекає (немає вільних партицій)

Як це працює в коді

// Усі консьюмери з однаковим group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматично розподілить партиції

🟡 Middle Level

Стратегії балансування

Стратегія Алгоритм Коли використовувати
Range Ділить партиції по діапазону Підходить коли число партицій кратне числу консьюмерів. При непарному діленні — нерівномірність. Для production з динамічним масштабуванням краще CooperativeSticky.
RoundRobin Чергує партиції Коли потрібна рівномірність
Sticky Мінімізує переміщення Production (Kafka 2.2+)
CooperativeSticky Покрокове переміщення Production (Kafka 2.3+, рекомендується)

Range Assignor — докладно

Партиції 0,1,2,3 → 2 консьюмери:
  C1: 0, 1  (2 партиції)
  C2: 2, 3  (2 партиції)
  → Рівномірно

Партиції 0,1,2,3,4 → 2 консьюмери:
  C1: 0, 1, 2  (3 партиції)
  C2: 3, 4     (2 партиції)
  → Нерівномірно! (Range = numPartitions / numConsumers)

RoundRobin Assignor

Партиції 0,1,2,3,4 → 2 консьюмери:
  C1: 0, 2, 4  (3 партиції)
  C2: 1, 3     (2 партиції)
  → Трохи краще при непарному числі

Процес ребалансу

1. Подія-тригер (новий консьюмер, падіння, таймаут)
2. Group coordinator починає ребаланс
3. Усі консьюмери отримують сповіщення
4. Усі консьюмери зупиняють читання (eager)
   або продовжують з частиною партицій (cooperative)
5. Обчислюється новий assignment
6. Assignment розсилається консьюмерам
7. Консьюмери починають читати нові партиції

ConsumerRebalanceListener

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 1. Коммітити offsets перед втратою партицій
        consumer.commitSync();
        // 2. Очистити локальні ресурси
        cleanup(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 3. Ініціалізація нових партицій
        initialize(partitions);
    }
});

Chaos Rebalancing Problem

session.timeout.ms=10000, обробка батча = 15 секунд,
max.poll.interval.ms=300000. Консьюмер не встигає poll() → виключений →
ребаланс → навантаження на решті росте → ще один падає → лавина.

Консьюмери падають → ребаланс → нові консьюмери падають → цикл

Причини:
- Занадто короткий session.timeout.ms
- Довга обробка (max.poll.interval.ms exceeded)
- Проблеми з пам'яттю/CPU

Рішення:
- Збільшити таймаути
- Оптимізувати обробку
- Додати моніторинг

Порівняння стратегій

Критерій Range RoundRobin Sticky CooperativeSticky
Рівномірність ⚠️
Мінімізація переміщення
Простой при ребалансі Повний Повний Повний Частковий
Версія Kafka Усі Усі 2.2+ 2.3+
Server-Side Assignment (KIP-848) 3.0+ Усунуто SyncGroup round-trip, швидший ребаланс    

Типові помилки

Помилка Наслідок Рішення
Часті ребаланси Постійний простой системи Збільште таймаути, static membership
Без ConsumerRebalanceListener Втрата offsets при ребалансі Завжди використовуйте listener
Нерівномірний розподіл Один консьюмер перевантажений CooperativeSticky assignor
Range assignor в production Нерівномірність + повний простой Перейдіть на CooperativeSticky

🔴 Senior Level

Group Coordinator Protocol — повна діаграма

Phase 1: JoinGroup
┌──────────────┐          ┌──────────────────┐          ┌──────────────┐
│  Consumer A  │          │ Group Coordinator│          │  Consumer B  │
└──────┬───────┘          └────────┬─────────┘          └──────┬───────┘
       │ JoinGroup Request          │                          │
       │ (group.id, memberId)       │                          │
       │───────────────────────────►│                          │
       │                            │ JoinGroup Request        │
       │                            │◄─────────────────────────│
       │                            │                          │
       │ Leader вибирається (перший │                          │
       │ у списку members)          │                          │
       │                            │                          │
       │◄───────────────────────────│ JoinGroup Response       │
       │ (leaderId, allMembers)     │ (memberId, leaderId)     │
       │                            │─────────────────────────►│

Phase 2: Assignment (на Leader)
       │                            │                          │
       │ Leader обчислює            │                          │
       │ assignment через           │                          │
       │ PartitionAssignor          │                          │
       │ assignment = {             │                          │
       │   A: [P0, P1],             │                          │
       │   B: [P2]                  │                          │
       │ }                          │                          │

Phase 3: SyncGroup
       │                            │                          │
       │ SyncGroup Request          │                          │
       │ (assignment для всіх)      │                          │
       │───────────────────────────►│                          │
       │                            │                          │
       │                            │◄─────────────────────────│ SyncGroup Response
       │                            │                          │ (свій assignment)
       │◄───────────────────────────│                          │
       │ SyncGroup Response         │                          │
       │ (assignment: P0, P1)       │                          │

Kafka 3.0+ (KIP-848): Assignment обчислюється на Coordinator, а не на Leader. Усунуто SyncGroup round-trip.

Eager vs Cooperative Rebalancing — internals

Eager Rebalancing:

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator
// onJoinComplete:
//   1. onPartitionsRevoked(ALL partitions)  // Зупиняємо ВСЕ
//   2. newAssignment = computeAssignment()
//   3. onPartitionsAssigned(newAssignment)  // Запускаємо НОВЕ
// Повна зупинка = full stop

Cooperative Rebalancing:

// CooperativeStickyAssignor:
//   1. onPartitionsRevoked(SUBSET partitions)  // Тільки ті, що йдуть
//   2. onPartitionsAssigned(SUBSET partitions)  // Тільки нові
//   3. Продовжуємо працювати з рештою партицій

// Алгоритм:
//   currentAssignment = {P0, P1, P2}
//   newAssignment = {P0, P3}  // P1 і P2 пішли, P3 додано
//   revoked = {P1, P2}         // Тільки ці
//   assigned = {P3}            // Тільки ці
//   continue processing P0     // Не зупиняємось!

Incremental Cooperative Rebalancing:

При великій кількості змін:
  Round 1: revoke P1, assign P3
  Round 2: revoke P2, assign P4
  ...
  Кожен round — короткий (зазвичай < 1 секунди)
  Обробка продовжується на решті партицій

Static Membership — як це працює всередині

props.put("group.instance.id", "consumer-1");

Protocol:

Dynamic consumer (без group.instance.id):
  Join → assigned memberId = UUID → restart = new memberId → rebalance

Static consumer (з group.instance.id):
  Join → assigned memberId = group.instance.id
  → restart = same memberId → NO rebalance (якщо в межах session.timeout)
  → Coordinator чекає session.timeout.ms перед виключенням

Умови для відсутності ребалансу:

  • Consumer повернувся в межах session.timeout.ms
  • group.instance.id не використовується іншим консьюмером
  • Partition count не змінився

Duplicate group.instance.id:

Якщо два консьюмери з однаковим group.instance.id:
  Другий отримує FencedInstanceIdException
  → Другий консьюмер завершується
  → Це запобігає duplicate processing

Production Configuration

# Оптимальна конфігурація для production
group.id: order-processors
group.instance.id: consumer-${HOSTNAME}     # Static membership
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000                    # 30 секунд
heartbeat.interval.ms: 10000                 # 1/3 від session timeout
max.poll.interval.ms: 300000                 # 5 хвилин на обробку
max.poll.records: 500                        # батч для обробки
enable.auto.commit: false                    # Ручний комміт

Edge Cases (3+)

  1. Cascading Rebalance (Thundering Herd): Один консьюмер падає → ребаланс → решта консьюмерів отримують більше партицій → навантаження росте → ще один консьюмер падає від перевантаження → ще ребаланс → лавина. Рішення: Збільште session.timeout.ms, використовуйте max.poll.records для контролю навантаження, налаштуйте monitoring alerts на consumer lag growth rate.

  2. Rebalance during Deployment: При rolling deploy N інстансів з eager rebalancing відбувається N ребалансів. Кожен ребаланс = 5-30 секунд простою. Для 20 інстансів = 100-600 секунд downtime. Рішення: Static membership + cooperative rebalancing = 0 ребалансів при rolling deploy.

  3. Partition Stall after Rebalance: Після ребалансу новий консьюмер отримує партицію і починає з committed offset. Якщо обробка stateful (наприклад, windowed aggregation), новий консьюмер не має локального state → incorrect results до повного перерахунку. Рішення: External state store (RocksDB, Redis) з партицією як ключ; або state replay при assignment.

  4. Cross-Rack Rebalance: В multi-rack кластері при призначенні партицій консьюмеру в іншому rack збільшується latency (network round-trip). Range/RoundRobin не враховують rack-awareness. Рішення: Custom PartitionAssignor з rack-aware assignment; або Consumer Sidecar в тому ж rack, що і брокери.

  5. Rebalance Storm з K8s HPA: Horizontal Pod Autoscaler додає поди при рості CPU → нові консьюмери викликають ребаланс → обробка сповільнюється (rebalance overhead) → CPU росте ще більше → HPA додає ще поди → нескінченний цикл. Рішення: Не використовуйте HPA для Kafka консьюмерів. Натомість — custom metrics (consumer lag) з thresholds і cooldown period.

Performance Numbers

Метрика Eager Cooperative
Rebalance latency 5-30 секунд 1-5 секунд
Processing during rebalance Full stop Continues (partial)
Partition movement All Incremental
Impact on consumer lag High (2-10x spike) Low (1.2-2x spike)
Number of rebalances per rolling deploy N (per instance) 0 (with static membership)

Production War Story

Ситуація: Стрімінг-сервіс з 25 консьюмерами в групі event-processors (25 партицій). Range assignor, dynamic membership. Кожної ночі о 02:00 — автоматичний rolling deploy (оновлення контейнерів).

Проблема: 25 послідовних ребалансів × 15 секунд = 375 секунд (6+ хвилин) простою кожної ночі. За цей час lag виростав до 500K подій. Ранкові користувачі бачили «відстаючі» рекомендації.

Додаткова проблема: Range assignor при 25 партиціях і 24 консьюмерах (один на ребалансі) призначав 2 партиції одному консьюмеру і 1 — решті. Перевантажений консьюмер падав → ще ребаланс → ще падіння → cascading failure.

Діагностика:

kafka-consumer-groups.sh --describe --group event-processors
# STATE=Rebalancing → STATE=Stable → STATE=Rebalancing (цикл)
# Lag: 0 → 500K → 0 (за 15 хвилин після deploy)

Рішення:

  1. CooperativeStickyAssignor — ребаланс став 2 секунди замість 15
  2. group.instance.id=${POD_NAME} — rolling deploy = 0 ребалансів
  3. session.timeout.ms=45000, heartbeat.interval.ms=15000 — запас по таймауту
  4. ConsumerRebalanceListener з commitSync + state flush
  5. K8s PodDisruptionBudget: maxUnavailable=1
  6. Lag-based HPA замість CPU-based (custom metric через Prometheus Adapter)

Результат: Rolling deploy — 0 ребалансів, 0 downtime, lag < 1K.

Урок: Eager rebalancing + dynamic membership + rolling deploy = guaranteed cascading failure. Cooperative + static membership = zero-downtime deploys.

Моніторинг (JMX + Burrow)

JMX метрики:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=failed-rebalance-total
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-heartbeat-seconds-ago

Burrow:

  • Group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend
  • HTTP API → Grafana → Alertmanager

Alert правила:

- alert: KafkaRebalanceTooFrequent
  expr: rate(kafka_consumer_coordinator_rebalance_rate[5m]) > 0.1
  for: 5m
  severity: warning

- alert: KafkaConsumerStalled
  expr: kafka_consumer_records_lag_max > 100000
  for: 10m
  severity: critical

Highload Best Practices

  1. CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  2. Static membership (group.instance.id) — усуває ребаланс на rolling deploy
  3. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  4. ConsumerRebalanceListener — commitSync + state cleanup в onPartitionsRevoked
  5. Monitor rebalance frequency — alert при > 1 rebalance за 10 хвилин
  6. max.poll.records підбирайте під latency — ціль: processing < max.poll.interval.ms
  7. Не використовуйте HPA по CPU — використовуйте custom metric (consumer lag)
  8. PodDisruptionBudget — maxUnavailable=1 для запобігання cascading rebalances
  9. Cross-rack awareness — custom assignor для multi-rack кластерів
  10. Stateful processing — external state store (RocksDB, Redis) keyed by partition

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Балансування = автоматичний розподіл партицій між консьюмерами в групі
  • Стратегії: Range (нерівномірний), RoundRobin, Sticky, CooperativeSticky (рекомендується)
  • Cooperative rebalancing: покрокове переміщення, processing продовжується на решті партицій
  • ConsumerRebalanceListener обов’язковий: commitSync в onPartitionsRevoked
  • Static membership (group.instance.id) — 0 ребалансів при rolling deploy
  • Chaos rebalancing: один падає → навантаження росте → ще один падає → лавина
  • Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3

Часті уточнюючі запитання:

  • Що таке cascading rebalance? — Один консьюмер падає → навантаження на решті → ще падає → лавина.
  • Чому HPA по CPU поганий для Kafka? — Ребаланс сповільнює обробку → CPU росте → ще поди → нескінченний цикл.
  • Що робить ConsumerRebalanceListener? — Коммітить offsets і очищає state перед втратою партицій.
  • Як уникнути ребалансу при деплої? — Static membership + CooperativeStickyAssignor.

Червоні прапорці (НЕ говорити):

  • «Range assignor — стандарт для production» — нерівномірний, використовуйте CooperativeSticky
  • «Більше консьюмерів = завжди більше throughput» — обмежено числом партицій
  • «Ребаланс миттєвий» — 5-30 секунд (eager), 1-5 (cooperative)
  • «Можна ігнорувати session.timeout.ms» — занадто короткий = хибні ребаланси

Пов’язані теми:

  • [[5. Що таке Consumer Group]]
  • [[8. Що станеться при додаванні нового консьюмера в групі]]
  • [[15. Що таке ребаланс і коли він відбувається]]
  • [[7. Чи можна мати більше консьюмерів, ніж партицій]]