Как работает балансировка консьюмеров в группе
Представьте пиццерию, где нужно разрезать пиццу (партиции) между друзьями (консьюмерами):
🟢 Junior Level
Что такое балансировка?
Балансировка — это процесс автоматического распределения партиций между консьюмерами в группе. Kafka сама решает, какой консьюмер какие партиции будет читать.
Зачем: без неё пришлось бы вручную назначать партиции каждому консьюмеру. При изменении числа консьюмеров или партиций — всё пересчитывается автоматически.
Аналогия
Представьте пиццерию, где нужно разрезать пиццу (партиции) между друзьями (консьюмерами):
- 3 куска пиццы, 3 друга → каждый получает 1 кусок
- 3 куска, 2 друга → один получает 2 куска, другой — 1
- 3 куска, 5 друзей → 3 получают по куску, 2 остаются голодными
Основное правило
Количество активных консьюмеров <= Количество партиций
Пример
3 партиции, 3 консьюмера:
C1 → P0
C2 → P1
C3 → P2 ← идеальный баланс
Добавили C4:
C1 → P0
C2 → P1
C3 → P2
C4 → ждёт (нет свободных партиций)
Как это работает в коде
// Все консьюмеры с одинаковым group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматически распределит партиции
🟡 Middle Level
Стратегии балансировки
| Стратегия | Алгоритм | Когда использовать |
|---|---|---|
| Range | Делит партиции по диапазону | Подходит когда число партиций кратно числу консьюмеров. При нечётном делении — неравномерность. Для production с динамическим масштабированием лучше CooperativeSticky. |
| RoundRobin | Чередует партиции | Когда нужна равномерность |
| Sticky | Минимизирует перемещение | Production (Kafka 2.2+) |
| CooperativeSticky | Пошаговое перемещение | Production (Kafka 2.3+, рекомендуется) |
Range Assignor — подробно
Партиции 0,1,2,3 → 2 консьюмера:
C1: 0, 1 (2 партиции)
C2: 2, 3 (2 партиции)
→ Равномерно
Партиции 0,1,2,3,4 → 2 консьюмера:
C1: 0, 1, 2 (3 партиции)
C2: 3, 4 (2 партиции)
→ Неравномерно! (Range = numPartitions / numConsumers)
RoundRobin Assignor
Партиции 0,1,2,3,4 → 2 консьюмера:
C1: 0, 2, 4 (3 партиции)
C2: 1, 3 (2 партиции)
→ Чуть лучше при нечётном числе
Процесс ребаланса
1. Событие-триггер (новый консьюмер, падение, таймаут)
2. Group coordinator начинает ребаланс
3. Все консьюмеры получают уведомление
4. Все консьюмеры останавливают чтение (eager)
или продолжают с частью партиций (cooperative)
5. Вычисляется новый assignment
6. Assignment рассылается консьюмерам
7. Консьюмеры начинают читать новые партиции
ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 1. Коммитить offsets перед потерей партиций
consumer.commitSync();
// 2. Очистить локальные ресурсы
cleanup(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 3. Инициализация новых партиций
initialize(partitions);
}
});
Chaos Rebalancing Problem
session.timeout.ms=10000, обработка батча = 15 секунд,
max.poll.interval.ms=300000. Консьюмер не успевает poll() → исключён →
ребаланс → нагрузка на оставшихся растёт → ещё один падает → лавина.
Консьюмеры падают → ребаланс → новые консьюмеры падают → цикл
Причины:
- Слишком короткий session.timeout.ms
- Долгая обработка (max.poll.interval.ms exceeded)
- Проблемы с памятью/CPU
Решение:
- Увеличить таймауты
- Оптимизировать обработку
- Добавить мониторинг
Сравнение стратегий
| Критерий | Range | RoundRobin | Sticky | CooperativeSticky |
|---|---|---|---|---|
| Равномерность | ⚠️ | ✅ | ✅ | ✅ |
| Минимизация перемещения | ❌ | ❌ | ✅ | ✅ |
| Простой при ребалансе | Полный | Полный | Полный | Частичный |
| Версия Kafka | Все | Все | 2.2+ | 2.3+ |
| Server-Side Assignment (KIP-848) | 3.0+ | Устраняет SyncGroup round-trip, быстрее ребаланс |
Типичные ошибки
| Ошибка | Последствие | Решение |
|---|---|---|
| Частые ребалансы | Постоянный простой системы | Увеличьте таймауты, static membership |
| Без ConsumerRebalanceListener | Потеря offsets при ребалансе | Всегда используйте listener |
| Неравномерное распределение | Один консьюмер перегружен | CooperativeSticky assignor |
| Range assignor в production | Неравномерность + полный простой | Перейдите на CooperativeSticky |
🔴 Senior Level
Group Coordinator Protocol — полная диаграмма
Phase 1: JoinGroup
┌──────────────┐ ┌──────────────────┐ ┌──────────────┐
│ Consumer A │ │ Group Coordinator│ │ Consumer B │
└──────┬───────┘ └────────┬─────────┘ └──────┬───────┘
│ JoinGroup Request │ │
│ (group.id, memberId) │ │
│───────────────────────────►│ │
│ │ JoinGroup Request │
│ │◄─────────────────────────│
│ │ │
│ Leader выбирается (первый │ │
│ в списке members) │ │
│ │ │
│◄───────────────────────────│ JoinGroup Response │
│ (leaderId, allMembers) │ (memberId, leaderId) │
│ │─────────────────────────►│
Phase 2: Assignment (на Leader)
│ │ │
│ Leader вычисляет │ │
│ assignment через │ │
│ PartitionAssignor │ │
│ assignment = { │ │
│ A: [P0, P1], │ │
│ B: [P2] │ │
│ } │ │
Phase 3: SyncGroup
│ │ │
│ SyncGroup Request │ │
│ (assignment для всех) │ │
│───────────────────────────►│ │
│ │ │
│ │◄─────────────────────────│ SyncGroup Response
│ │ │ (свой assignment)
│◄───────────────────────────│ │
│ SyncGroup Response │ │
│ (assignment: P0, P1) │ │
Kafka 3.0+ (KIP-848): Assignment вычисляется на Coordinator, а не на Leader. Eliminate SyncGroup round-trip.
Eager vs Cooperative Rebalancing — internals
Eager Rebalancing:
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator
// onJoinComplete:
// 1. onPartitionsRevoked(ALL partitions) // Останавливаем ВСЁ
// 2. newAssignment = computeAssignment()
// 3. onPartitionsAssigned(newAssignment) // Запускаем НОВОЕ
// Полная остановка = full stop
Cooperative Rebalancing:
// CooperativeStickyAssignor:
// 1. onPartitionsRevoked(SUBSET partitions) // Только те, что уходят
// 2. onPartitionsAssigned(SUBSET partitions) // Только новые
// 3. Продолжаем работать с оставшимися партициями
// Алгоритм:
// currentAssignment = {P0, P1, P2}
// newAssignment = {P0, P3} // P1 и P2 ушли, P3 добавлен
// revoked = {P1, P2} // Только эти
// assigned = {P3} // Только эти
// continue processing P0 // Не останавливаемся!
Incremental Cooperative Rebalancing:
При большом количестве изменений:
Round 1: revoke P1, assign P3
Round 2: revoke P2, assign P4
...
Каждый round — короткий (обычно < 1 секунды)
Обработка продолжается на оставшихся партициях
Static Membership — как это работает внутри
props.put("group.instance.id", "consumer-1");
Protocol:
Dynamic consumer (без group.instance.id):
Join → assigned memberId = UUID → restart = new memberId → rebalance
Static consumer (с group.instance.id):
Join → assigned memberId = group.instance.id
→ restart = same memberId → NO rebalance (если в пределах session.timeout)
→ Coordinator ждёт session.timeout.ms перед исключением
Условия для отсутствия ребаланса:
- Consumer вернулся в пределах
session.timeout.ms group.instance.idне используется другим консьюмером- Partition count не изменился
Duplicate group.instance.id:
Если два консьюмера с одинаковым group.instance.id:
Второй получает FencedInstanceIdException
→ Второй консьюмер завершается
→ Это предотвращает duplicate processing
Production Configuration
# Оптимальная конфигурация для production
group.id: order-processors
group.instance.id: consumer-${HOSTNAME} # Static membership
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000 # 30 секунд
heartbeat.interval.ms: 10000 # 1/3 от session timeout
max.poll.interval.ms: 300000 # 5 минут на обработку
max.poll.records: 500 # батч для обработки
enable.auto.commit: false # Ручной коммит
Edge Cases (3+)
-
Cascading Rebalance (Thundering Herd): Один консьюмер падает → ребаланс → оставшиеся консьюмеры получают больше партиций → нагрузка растёт → ещё один консьюмер падает от overload → ещё ребаланс → лавина. Решение: Увеличьте
session.timeout.ms, используйтеmax.poll.recordsдля контроля нагрузки, настройте monitoring alerts на consumer lag growth rate. -
Rebalance during Deployment: При rolling deploy N инстансов с eager rebalancing происходит N ребалансов. Каждый ребаланс = 5-30 секунд простоя. Для 20 инстансов = 100-600 секунд downtime. Решение: Static membership + cooperative rebalancing = 0 ребалансов при rolling deploy.
-
Partition Stall after Rebalance: После ребаланса новый консьюмер получает партицию и начинает с committed offset. Если обработка stateful (например, windowed aggregation), новый консьюмер не имеет локального state → incorrect results до полного пересчёта. Решение: External state store (RocksDB, Redis) с партицией как ключ; или state replay при assignment.
-
Cross-Rack Rebalance: В multi-rack кластере при назначении партиций консьюмеру в другом rack увеличивается latency (network round-trip). Range/RoundRobin не учитывают rack-awareness. Решение: Custom PartitionAssignor с rack-aware assignment; или Consumer Sidecar в том же rack, что и брокеры.
-
Rebalance Storm с K8s HPA: Horizontal Pod Autoscaler добавляет поды при росте CPU → новые консьюмеры вызывают ребаланс → обработка замедляется (rebalance overhead) → CPU растёт ещё больше → HPA добавляет ещё поды → бесконечный цикл. Решение: Не используйте HPA для Kafka консьюмеров. Вместо этого — custom metrics (consumer lag) с thresholds и cooldown period.
Performance Numbers
| Метрика | Eager | Cooperative |
|---|---|---|
| Rebalance latency | 5-30 секунд | 1-5 секунд |
| Processing during rebalance | Full stop | Continues (partial) |
| Partition movement | All | Incremental |
| Impact on consumer lag | High (2-10x spike) | Low (1.2-2x spike) |
| Number of rebalances per rolling deploy | N (per instance) | 0 (with static membership) |
Production War Story
Ситуация: Striming-сервис с 25 консьюмерами в группе
event-processors(25 партиций). Range assignor, dynamic membership. Каждую ночь в 02:00 — автоматический rolling deploy (обновление контейнеров).Проблема: 25 последовательных ребалансов × 15 секунд = 375 секунд (6+ минут) простоя каждую ночь. За это время lag вырастал до 500K событий. Утренние пользователи видели «отстающие» рекомендации.
Дополнительная проблема: Range assignor при 25 партициях и 24 консьюмерах (один на ребалансе)分配л 2 партиции одному консьюмеру и 1 — остальным. Перегруженный консьюмер падал → ещё ребаланс → ещё падение → cascading failure.
Диагностика:
kafka-consumer-groups.sh --describe --group event-processors # STATE=Rebalancing → STATE=Stable → STATE=Rebalancing (цикл) # Lag: 0 → 500K → 0 (за 15 минут после deploy)Решение:
CooperativeStickyAssignor— ребаланс стал 2 секунды вместо 15group.instance.id=${POD_NAME}— rolling deploy = 0 ребалансовsession.timeout.ms=45000,heartbeat.interval.ms=15000— запас по таймауту- ConsumerRebalanceListener с commitSync + state flush
- K8s PodDisruptionBudget: maxUnavailable=1
- Lag-based HPA вместо CPU-based (custom metric через Prometheus Adapter)
Результат: Rolling deploy — 0 ребалансов, 0 downtime, lag < 1K.
Урок: Eager rebalancing + dynamic membership + rolling deploy = guaranteed cascading failure. Cooperative + static membership = zero-downtime deploys.
Мониторинг (JMX + Burrow)
JMX метрики:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=failed-rebalance-total
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-heartbeat-seconds-ago
Burrow:
- Group status: OK, WARN, ERR, STOP, STALL
- Per-partition lag with trend
- HTTP API → Grafana → Alertmanager
Alert правила:
- alert: KafkaRebalanceTooFrequent
expr: rate(kafka_consumer_coordinator_rebalance_rate[5m]) > 0.1
for: 5m
severity: warning
- alert: KafkaConsumerStalled
expr: kafka_consumer_records_lag_max > 100000
for: 10m
severity: critical
Highload Best Practices
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — eliminates rebalance on rolling deploy - Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3 - ConsumerRebalanceListener — commitSync + state cleanup в
onPartitionsRevoked - Monitor rebalance frequency — alert при > 1 rebalance за 10 минут
- max.poll.records подбирайте под latency — цель: processing < max.poll.interval.ms
- Не используйте HPA по CPU — используйте custom metric (consumer lag)
- PodDisruptionBudget — maxUnavailable=1 для предотвращения cascading rebalances
- Cross-rack awareness — custom assignor для multi-rack кластеров
- Stateful processing — external state store (RocksDB, Redis) keyed by partition
🎯 Шпаргалка для интервью
Обязательно знать:
- Балансировка = автоматическое распределение партиций между консьюмерами в группе
- Стратегии: Range (неравномерный), RoundRobin, Sticky, CooperativeSticky (рекомендуется)
- Cooperative rebalancing: пошаговое перемещение, processing продолжается на оставшихся партициях
- ConsumerRebalanceListener обязателен: commitSync в onPartitionsRevoked
- Static membership (
group.instance.id) — 0 ребалансов при rolling deploy - Chaos rebalancing: один падает → нагрузка растёт → ещё один падает → лавина
- Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3
Частые уточняющие вопросы:
- Что такое cascading rebalance? — Один консьюмер падает → нагрузка на остальных → ещё падает → лавина.
- Почему HPA по CPU плох для Kafka? — Ребаланс замедляет обработку → CPU растёт → ещё поды → бесконечный цикл.
- Что делает ConsumerRebalanceListener? — Коммитит offsets и очищает state перед потерей партиций.
- Как избежать ребаланса при деплое? — Static membership + CooperativeStickyAssignor.
Красные флаги (НЕ говорить):
- «Range assignor — стандарт для production» — неравномерный, используйте CooperativeSticky
- «Больше консьюмеров = всегда больше throughput» — ограничено числом партиций
- «Ребаланс мгновенный» — 5-30 секунд (eager), 1-5 (cooperative)
- «Можно игнорировать session.timeout.ms» — слишком короткий = ложные ребалансы
Связанные темы:
- [[5. Что такое Consumer Group]]
- [[8. Что произойдёт при добавлении нового консьюмера в группе]]
- [[15. Что такое rebalancing и когда он происходит]]
- [[7. Можно ли иметь больше консьюмеров, чем партиций]]