Вопрос 6 · Раздел 15

Как работает балансировка консьюмеров в группе

Представьте пиццерию, где нужно разрезать пиццу (партиции) между друзьями (консьюмерами):

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Что такое балансировка?

Балансировка — это процесс автоматического распределения партиций между консьюмерами в группе. Kafka сама решает, какой консьюмер какие партиции будет читать.

Зачем: без неё пришлось бы вручную назначать партиции каждому консьюмеру. При изменении числа консьюмеров или партиций — всё пересчитывается автоматически.

Аналогия

Представьте пиццерию, где нужно разрезать пиццу (партиции) между друзьями (консьюмерами):

  • 3 куска пиццы, 3 друга → каждый получает 1 кусок
  • 3 куска, 2 друга → один получает 2 куска, другой — 1
  • 3 куска, 5 друзей → 3 получают по куску, 2 остаются голодными

Основное правило

Количество активных консьюмеров <= Количество партиций

Пример

3 партиции, 3 консьюмера:
  C1 → P0
  C2 → P1
  C3 → P2  ← идеальный баланс

Добавили C4:
  C1 → P0
  C2 → P1
  C3 → P2
  C4 → ждёт (нет свободных партиций)

Как это работает в коде

// Все консьюмеры с одинаковым group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматически распределит партиции

🟡 Middle Level

Стратегии балансировки

Стратегия Алгоритм Когда использовать
Range Делит партиции по диапазону Подходит когда число партиций кратно числу консьюмеров. При нечётном делении — неравномерность. Для production с динамическим масштабированием лучше CooperativeSticky.
RoundRobin Чередует партиции Когда нужна равномерность
Sticky Минимизирует перемещение Production (Kafka 2.2+)
CooperativeSticky Пошаговое перемещение Production (Kafka 2.3+, рекомендуется)

Range Assignor — подробно

Партиции 0,1,2,3 → 2 консьюмера:
  C1: 0, 1  (2 партиции)
  C2: 2, 3  (2 партиции)
  → Равномерно

Партиции 0,1,2,3,4 → 2 консьюмера:
  C1: 0, 1, 2  (3 партиции)
  C2: 3, 4     (2 партиции)
  → Неравномерно! (Range = numPartitions / numConsumers)

RoundRobin Assignor

Партиции 0,1,2,3,4 → 2 консьюмера:
  C1: 0, 2, 4  (3 партиции)
  C2: 1, 3     (2 партиции)
  → Чуть лучше при нечётном числе

Процесс ребаланса

1. Событие-триггер (новый консьюмер, падение, таймаут)
2. Group coordinator начинает ребаланс
3. Все консьюмеры получают уведомление
4. Все консьюмеры останавливают чтение (eager)
   или продолжают с частью партиций (cooperative)
5. Вычисляется новый assignment
6. Assignment рассылается консьюмерам
7. Консьюмеры начинают читать новые партиции

ConsumerRebalanceListener

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 1. Коммитить offsets перед потерей партиций
        consumer.commitSync();
        // 2. Очистить локальные ресурсы
        cleanup(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 3. Инициализация новых партиций
        initialize(partitions);
    }
});

Chaos Rebalancing Problem

session.timeout.ms=10000, обработка батча = 15 секунд,
max.poll.interval.ms=300000. Консьюмер не успевает poll() → исключён →
ребаланс → нагрузка на оставшихся растёт → ещё один падает → лавина.

Консьюмеры падают → ребаланс → новые консьюмеры падают → цикл

Причины:
- Слишком короткий session.timeout.ms
- Долгая обработка (max.poll.interval.ms exceeded)
- Проблемы с памятью/CPU

Решение:
- Увеличить таймауты
- Оптимизировать обработку
- Добавить мониторинг

Сравнение стратегий

Критерий Range RoundRobin Sticky CooperativeSticky
Равномерность ⚠️
Минимизация перемещения
Простой при ребалансе Полный Полный Полный Частичный
Версия Kafka Все Все 2.2+ 2.3+
Server-Side Assignment (KIP-848) 3.0+ Устраняет SyncGroup round-trip, быстрее ребаланс    

Типичные ошибки

Ошибка Последствие Решение
Частые ребалансы Постоянный простой системы Увеличьте таймауты, static membership
Без ConsumerRebalanceListener Потеря offsets при ребалансе Всегда используйте listener
Неравномерное распределение Один консьюмер перегружен CooperativeSticky assignor
Range assignor в production Неравномерность + полный простой Перейдите на CooperativeSticky

🔴 Senior Level

Group Coordinator Protocol — полная диаграмма

Phase 1: JoinGroup
┌──────────────┐          ┌──────────────────┐          ┌──────────────┐
│  Consumer A  │          │ Group Coordinator│          │  Consumer B  │
└──────┬───────┘          └────────┬─────────┘          └──────┬───────┘
       │ JoinGroup Request          │                          │
       │ (group.id, memberId)       │                          │
       │───────────────────────────►│                          │
       │                            │ JoinGroup Request        │
       │                            │◄─────────────────────────│
       │                            │                          │
       │ Leader выбирается (первый  │                          │
       │ в списке members)          │                          │
       │                            │                          │
       │◄───────────────────────────│ JoinGroup Response       │
       │ (leaderId, allMembers)     │ (memberId, leaderId)     │
       │                            │─────────────────────────►│

Phase 2: Assignment (на Leader)
       │                            │                          │
       │ Leader вычисляет           │                          │
       │ assignment через           │                          │
       │ PartitionAssignor          │                          │
       │ assignment = {             │                          │
       │   A: [P0, P1],             │                          │
       │   B: [P2]                  │                          │
       │ }                          │                          │

Phase 3: SyncGroup
       │                            │                          │
       │ SyncGroup Request          │                          │
       │ (assignment для всех)      │                          │
       │───────────────────────────►│                          │
       │                            │                          │
       │                            │◄─────────────────────────│ SyncGroup Response
       │                            │                          │ (свой assignment)
       │◄───────────────────────────│                          │
       │ SyncGroup Response         │                          │
       │ (assignment: P0, P1)       │                          │

Kafka 3.0+ (KIP-848): Assignment вычисляется на Coordinator, а не на Leader. Eliminate SyncGroup round-trip.

Eager vs Cooperative Rebalancing — internals

Eager Rebalancing:

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator
// onJoinComplete:
//   1. onPartitionsRevoked(ALL partitions)  // Останавливаем ВСЁ
//   2. newAssignment = computeAssignment()
//   3. onPartitionsAssigned(newAssignment)  // Запускаем НОВОЕ
// Полная остановка = full stop

Cooperative Rebalancing:

// CooperativeStickyAssignor:
//   1. onPartitionsRevoked(SUBSET partitions)  // Только те, что уходят
//   2. onPartitionsAssigned(SUBSET partitions)  // Только новые
//   3. Продолжаем работать с оставшимися партициями

// Алгоритм:
//   currentAssignment = {P0, P1, P2}
//   newAssignment = {P0, P3}  // P1 и P2 ушли, P3 добавлен
//   revoked = {P1, P2}         // Только эти
//   assigned = {P3}            // Только эти
//   continue processing P0     // Не останавливаемся!

Incremental Cooperative Rebalancing:

При большом количестве изменений:
  Round 1: revoke P1, assign P3
  Round 2: revoke P2, assign P4
  ...
  Каждый round — короткий (обычно < 1 секунды)
  Обработка продолжается на оставшихся партициях

Static Membership — как это работает внутри

props.put("group.instance.id", "consumer-1");

Protocol:

Dynamic consumer (без group.instance.id):
  Join → assigned memberId = UUID → restart = new memberId → rebalance

Static consumer (с group.instance.id):
  Join → assigned memberId = group.instance.id
  → restart = same memberId → NO rebalance (если в пределах session.timeout)
  → Coordinator ждёт session.timeout.ms перед исключением

Условия для отсутствия ребаланса:

  • Consumer вернулся в пределах session.timeout.ms
  • group.instance.id не используется другим консьюмером
  • Partition count не изменился

Duplicate group.instance.id:

Если два консьюмера с одинаковым group.instance.id:
  Второй получает FencedInstanceIdException
  → Второй консьюмер завершается
  → Это предотвращает duplicate processing

Production Configuration

# Оптимальная конфигурация для production
group.id: order-processors
group.instance.id: consumer-${HOSTNAME}     # Static membership
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000                    # 30 секунд
heartbeat.interval.ms: 10000                 # 1/3 от session timeout
max.poll.interval.ms: 300000                 # 5 минут на обработку
max.poll.records: 500                        # батч для обработки
enable.auto.commit: false                    # Ручной коммит

Edge Cases (3+)

  1. Cascading Rebalance (Thundering Herd): Один консьюмер падает → ребаланс → оставшиеся консьюмеры получают больше партиций → нагрузка растёт → ещё один консьюмер падает от overload → ещё ребаланс → лавина. Решение: Увеличьте session.timeout.ms, используйте max.poll.records для контроля нагрузки, настройте monitoring alerts на consumer lag growth rate.

  2. Rebalance during Deployment: При rolling deploy N инстансов с eager rebalancing происходит N ребалансов. Каждый ребаланс = 5-30 секунд простоя. Для 20 инстансов = 100-600 секунд downtime. Решение: Static membership + cooperative rebalancing = 0 ребалансов при rolling deploy.

  3. Partition Stall after Rebalance: После ребаланса новый консьюмер получает партицию и начинает с committed offset. Если обработка stateful (например, windowed aggregation), новый консьюмер не имеет локального state → incorrect results до полного пересчёта. Решение: External state store (RocksDB, Redis) с партицией как ключ; или state replay при assignment.

  4. Cross-Rack Rebalance: В multi-rack кластере при назначении партиций консьюмеру в другом rack увеличивается latency (network round-trip). Range/RoundRobin не учитывают rack-awareness. Решение: Custom PartitionAssignor с rack-aware assignment; или Consumer Sidecar в том же rack, что и брокеры.

  5. Rebalance Storm с K8s HPA: Horizontal Pod Autoscaler добавляет поды при росте CPU → новые консьюмеры вызывают ребаланс → обработка замедляется (rebalance overhead) → CPU растёт ещё больше → HPA добавляет ещё поды → бесконечный цикл. Решение: Не используйте HPA для Kafka консьюмеров. Вместо этого — custom metrics (consumer lag) с thresholds и cooldown period.

Performance Numbers

Метрика Eager Cooperative
Rebalance latency 5-30 секунд 1-5 секунд
Processing during rebalance Full stop Continues (partial)
Partition movement All Incremental
Impact on consumer lag High (2-10x spike) Low (1.2-2x spike)
Number of rebalances per rolling deploy N (per instance) 0 (with static membership)

Production War Story

Ситуация: Striming-сервис с 25 консьюмерами в группе event-processors (25 партиций). Range assignor, dynamic membership. Каждую ночь в 02:00 — автоматический rolling deploy (обновление контейнеров).

Проблема: 25 последовательных ребалансов × 15 секунд = 375 секунд (6+ минут) простоя каждую ночь. За это время lag вырастал до 500K событий. Утренние пользователи видели «отстающие» рекомендации.

Дополнительная проблема: Range assignor при 25 партициях и 24 консьюмерах (один на ребалансе)分配л 2 партиции одному консьюмеру и 1 — остальным. Перегруженный консьюмер падал → ещё ребаланс → ещё падение → cascading failure.

Диагностика:

kafka-consumer-groups.sh --describe --group event-processors
# STATE=Rebalancing → STATE=Stable → STATE=Rebalancing (цикл)
# Lag: 0 → 500K → 0 (за 15 минут после deploy)

Решение:

  1. CooperativeStickyAssignor — ребаланс стал 2 секунды вместо 15
  2. group.instance.id=${POD_NAME} — rolling deploy = 0 ребалансов
  3. session.timeout.ms=45000, heartbeat.interval.ms=15000 — запас по таймауту
  4. ConsumerRebalanceListener с commitSync + state flush
  5. K8s PodDisruptionBudget: maxUnavailable=1
  6. Lag-based HPA вместо CPU-based (custom metric через Prometheus Adapter)

Результат: Rolling deploy — 0 ребалансов, 0 downtime, lag < 1K.

Урок: Eager rebalancing + dynamic membership + rolling deploy = guaranteed cascading failure. Cooperative + static membership = zero-downtime deploys.

Мониторинг (JMX + Burrow)

JMX метрики:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=failed-rebalance-total
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-heartbeat-seconds-ago

Burrow:

  • Group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend
  • HTTP API → Grafana → Alertmanager

Alert правила:

- alert: KafkaRebalanceTooFrequent
  expr: rate(kafka_consumer_coordinator_rebalance_rate[5m]) > 0.1
  for: 5m
  severity: warning
  
- alert: KafkaConsumerStalled
  expr: kafka_consumer_records_lag_max > 100000
  for: 10m
  severity: critical

Highload Best Practices

  1. CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  2. Static membership (group.instance.id) — eliminates rebalance on rolling deploy
  3. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  4. ConsumerRebalanceListener — commitSync + state cleanup в onPartitionsRevoked
  5. Monitor rebalance frequency — alert при > 1 rebalance за 10 минут
  6. max.poll.records подбирайте под latency — цель: processing < max.poll.interval.ms
  7. Не используйте HPA по CPU — используйте custom metric (consumer lag)
  8. PodDisruptionBudget — maxUnavailable=1 для предотвращения cascading rebalances
  9. Cross-rack awareness — custom assignor для multi-rack кластеров
  10. Stateful processing — external state store (RocksDB, Redis) keyed by partition

🎯 Шпаргалка для интервью

Обязательно знать:

  • Балансировка = автоматическое распределение партиций между консьюмерами в группе
  • Стратегии: Range (неравномерный), RoundRobin, Sticky, CooperativeSticky (рекомендуется)
  • Cooperative rebalancing: пошаговое перемещение, processing продолжается на оставшихся партициях
  • ConsumerRebalanceListener обязателен: commitSync в onPartitionsRevoked
  • Static membership (group.instance.id) — 0 ребалансов при rolling deploy
  • Chaos rebalancing: один падает → нагрузка растёт → ещё один падает → лавина
  • Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3

Частые уточняющие вопросы:

  • Что такое cascading rebalance? — Один консьюмер падает → нагрузка на остальных → ещё падает → лавина.
  • Почему HPA по CPU плох для Kafka? — Ребаланс замедляет обработку → CPU растёт → ещё поды → бесконечный цикл.
  • Что делает ConsumerRebalanceListener? — Коммитит offsets и очищает state перед потерей партиций.
  • Как избежать ребаланса при деплое? — Static membership + CooperativeStickyAssignor.

Красные флаги (НЕ говорить):

  • «Range assignor — стандарт для production» — неравномерный, используйте CooperativeSticky
  • «Больше консьюмеров = всегда больше throughput» — ограничено числом партиций
  • «Ребаланс мгновенный» — 5-30 секунд (eager), 1-5 (cooperative)
  • «Можно игнорировать session.timeout.ms» — слишком короткий = ложные ребалансы

Связанные темы:

  • [[5. Что такое Consumer Group]]
  • [[8. Что произойдёт при добавлении нового консьюмера в группе]]
  • [[15. Что такое rebalancing и когда он происходит]]
  • [[7. Можно ли иметь больше консьюмеров, чем партиций]]