Что произойдёт при добавлении нового консьюмера в группе
При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются между всеми консьюмерами.
Уровень Junior
Краткий ответ
При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются между всеми консьюмерами.
Пример
До добавления:
C1 → P0, P1
C2 → P2
Добавили C3:
C1 → P0
C2 → P1
C3 → P2
Что происходит во время ребаланса?
Eager rebalancing: 5-30 секунд полного простоя.
Cooperative rebalancing: 1-5 секунд с продолжением обработки на оставшихся партициях.
Сообщения накапливаются в Kafka
После ребаланса чтение возобновляется
Как добавить консьюмер?
// Просто запустите новый консьюмер с тем же group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматически запустит ребаланс
Уровень Middle
Процесс ребаланса (по шагам)
1. Новый консьюмер подключается к брокеру
2. Отправляет JoinGroup request
3. Group coordinator начинает ребаланс
4. Все консьюмеры получают уведомление
5. Все консьюмеры останавливают чтение (eager)
или продолжают с частью партиций (cooperative)
6. Вычисляется новый assignment
7. Assignment рассылается консьюмерам
8. Консьюмеры начинают читать новые партиции
Rebalancing Strategies
1. Eager (старый подход):
Все консьюмеры останавливаются полностью
Полное перераспределение
Все продолжают работу
→ Простой всей группы
2. Cooperative (Kafka 2.3+):
Постепенное перемещение партиций
Консьюмеры продолжают работать с остальными
→ Минимальный простой
ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Коммитить offsets перед потерей партиций
consumer.commitSync();
// Очистить локальный state
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Инициализация новых партиций
initializeState(partitions);
// Можно начать чтение с определённого offset
for (TopicPartition tp : partitions) {
consumer.seek(tp, getSavedOffset(tp));
}
}
});
Частые ребалансы — проблема
Консьюмеры нестабильны → постоянные ребалансы → простой системы
Причины:
- session.timeout.ms слишком короткий
- max.poll.interval.ms exceeded (долгая обработка)
- Проблемы с сетью
- GC pauses
Симптомы:
- Сообщения в логах: "Rebalance triggered"
- Рост consumer lag
- Падение throughput
Типичные ошибки
- Без ConsumerRebalanceListener:
Потеря данных при ребалансе (не закоммичены offsets) - Частые ребалансы:
Консьюмеры падают → ребаланс → снова падают → цикл - Долгая обработка между poll:
max.poll.interval.ms exceeded → консьюмер исключён → ребаланс
Уровень Senior
Cooperative Rebalancing — детально
Incremental Cooperative Rebalancing (Kafka 2.3+):
Eager:
Revocation всех партиций → Assignment новых → Resume
→ Полный простой
Cooperative:
Пошаговое перемещение:
1. Revocation части партиций
2. Assignment новых партиций
3. Продолжение работы
4. Повтор при необходимости
→ Минимальный простой
Настройка:
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Group Coordinator Protocol
Протокол ребаланса:
JoinGroup — консьюмер говорит «хочу в группу».
SyncGroup — лидер рассылает результат распределения.
1. Join Phase:
- Консьюмер → JoinGroup request
- Coordinator → собирает всех members
- Coordinator → выбирает leader
2. Assignment Phase:
- Leader → вычисляет assignment
- Leader → SyncGroup request с assignment
- Coordinator → рассылает assignments
3. Steady State:
- Консьюмеры → Heartbeat requests
- Coordinator → подтверждает alive
- Консьюмеры → обрабатывают сообщения
Production Configuration
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000 # 30 секунд
heartbeat.interval.ms: 10000 # 10 секунд (1/3)
max.poll.interval.ms: 300000 # 5 минут
max.poll.records: 500 # батч
enable.auto.commit: false
Static Membership
// Постоянный ID консьюмера
props.put("group.instance.id", "consumer-1");
// При рестарте:
// - Не вызывает ребаланс
// - Возвращает те же партиции
// - Ждёт session.timeout.ms перед rebalance
Monitoring Rebalancing
Ключевые метрики:
kafka.consumer.coordinator:num-rebalances
kafka.consumer.coordinator:time-since-last-rebalance
kafka.consumer:assigned-partitions
kafka.consumer:commit-latency-avg
Alerts:
- Ребаланс чаще чем раз в 10 минут → warning
- Ребаланс чаще чем раз в 5 минут → critical
- Консьюмер не отправляет heartbeat > 30s → critical
- Consumer lag растёт → warning
Troubleshooting Rebalancing
1. Определение причины:
// Логи показывают:
"Heartbeat session expired" → session.timeout.ms слишком короткий
"Max poll interval exceeded" → обработка слишком долгая
"Member is dead" → консьюмер упал
2. Решение проблем:
Heartbeat issues:
Увеличить session.timeout.ms
Увеличить heartbeat.interval.ms
Poll interval issues:
Уменьшить max.poll.records
Оптимизировать обработку сообщения
Увеличить max.poll.interval.ms
Network issues:
Проверить connectivity к брокерам
Проверить firewall rules
Advanced Patterns
1. Gradual Rollout:
Добавляйте консьюмеры постепенно:
1. Запустите один консьюмер
2. Дождитесь стабилизации
3. Проверьте lag и throughput
4. Добавьте следующий
2. Partition-aware State Management:
public class StatefulConsumer {
private final Map<TopicPartition, State> states = new HashMap<>();
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Сохранить state для каждой партиции
for (TopicPartition tp : partitions) {
states.get(tp).save();
states.remove(tp);
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Загрузить state для новых партиций
for (TopicPartition tp : partitions) {
states.put(tp, State.load(tp));
}
}
}
Best Practices
✅ Cooperative-sticky assignor
✅ Достаточный session timeout (30s+)
✅ Быстрая обработка сообщений
✅ ConsumerRebalanceListener для graceful handling
✅ Static membership для стабильных инстансов
✅ Мониторинг частоты ребалансов
❌ Частые ребалансы без расследования
❌ Eager assignor допустим для статических кластеров. Для production с rolling deploy — используйте CooperativeSticky.
❌ Слишком короткий timeout
❌ Без обработки onPartitionsRevoked
❌ Игнорирование consumer lag
Архитектурные решения
- Cooperative rebalancing — стандарт для production
- Static membership — для стабильных инстансов
- State management — сохранение state при ребалансе
- Monitoring — раннее обнаружение проблем
Резюме для Senior
- Ребаланс — критический процесс для availability группы
- Cooperative rebalancing минимизирует disruption
- ConsumerRebalanceListener обязателен для production
- Static membership уменьшает unnecessary ребалансы
- Мониторинг частоты ребалансов — ранний индикатор проблем
- Troubleshooting требует анализа логов и метрик
🎯 Шпаргалка для интервью
Обязательно знать:
- При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются
- Eager rebalancing: 5-30 секунд полного простоя; Cooperative: 1-5 секунд
- ConsumerRebalanceListener обязателен: commitSync в onPartitionsRevoked
- Частые ребалансы — проблема: session.timeout exceeded, max.poll.interval exceeded
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — устраняет ребаланс при rolling deploy - Gradual rollout: добавляйте консьюмеры постепенно, проверяйте lag
Частые уточняющие вопросы:
- Что будет если консьюмер долго обрабатывает? — max.poll.interval exceeded → исключение → ребаланс.
- Как ускорить ребаланс? — CooperativeStickyAssignor + static membership.
- Что делать с stateful consumer при ребалансе? — Сохранить state в onPartitionsRevoked, загрузить в onPartitionsAssigned.
- Можно ли отключить ребаланс? — Нет, но static membership минимизирует при известных инстансах.
Красные флаги (НЕ говорить):
- «Ребаланс происходит мгновенно» — 5-30 секунд eager, 1-5 cooperative
- «Можно игнорировать onPartitionsRevoked» — потеря offsets и state
- «Частые ребалансы — это нормально» — признак проблем с таймаутами или обработкой
- «Eager rebalancing — лучший выбор» — cooperative minimizes disruption
Связанные темы:
- [[15. Что такое rebalancing и когда он происходит]]
- [[6. Как работает балансировка консьюмеров в группе]]
- [[5. Что такое Consumer Group]]
- [[14. В чём разница между auto commit и manual commit]]