Що станеться при додаванні нового консьюмера в групі
При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються між усіма консьюмерами.
Рівень Junior
Коротка відповідь
При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються між усіма консьюмерами.
Приклад
До додавання:
C1 → P0, P1
C2 → P2
Додали C3:
C1 → P0
C2 → P1
C3 → P2
Що відбувається під час ребалансу?
Eager rebalancing: 5-30 секунд повного простою.
Cooperative rebalancing: 1-5 секунд з продовженням обробки на решті партицій.
Повідомлення накопичуються в Kafka
Після ребалансу читання відновлюється
Як додати консьюмер?
// Просто запустіть новий консьюмер з тим самим group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматично запустить ребаланс
Рівень Middle
Процес ребалансу (по кроках)
1. Новий консьюмер підключається до брокера
2. Надсилає JoinGroup request
3. Group coordinator починає ребаланс
4. Усі консьюмери отримують сповіщення
5. Усі консьюмери зупиняють читання (eager)
або продовжують з частиною партицій (cooperative)
6. Обчислюється новий assignment
7. Assignment розсилається консьюмерам
8. Консьюмери починають читати нові партиції
Rebalancing Strategies
1. Eager (старий підхід):
Усі консьюмери зупиняються повністю
Повне перерозподіл
Усі продовжують роботу
→ Простій усій групі
2. Cooperative (Kafka 2.3+):
Поступове переміщення партицій
Консьюмери продовжують працювати з рештою
→ Мінімальний простой
ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Коммітити offsets перед втратою партицій
consumer.commitSync();
// Очистити локальний state
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Ініціалізація нових партицій
initializeState(partitions);
// Можна почати читання з певного offset
for (TopicPartition tp : partitions) {
consumer.seek(tp, getSavedOffset(tp));
}
}
});
Часті ребаланси — проблема
Консьюмери нестабільні → постійні ребаланси → простой системи
Причини:
- session.timeout.ms занадто короткий
- max.poll.interval.ms exceeded (довга обробка)
- Проблеми з мережею
- GC pauses
Симптоми:
- Повідомлення в логах: "Rebalance triggered"
- Зростання consumer lag
- Падіння throughput
Типові помилки
- Без ConsumerRebalanceListener:
Втрата даних при ребалансі (не закоммічені offsets) - Часті ребаланси:
Консьюмери падають → ребаланс → знову падають → цикл - Довга обробка між poll:
max.poll.interval.ms exceeded → консьюмер виключений → ребаланс
Рівень Senior
Cooperative Rebalancing — детально
Incremental Cooperative Rebalancing (Kafka 2.3+):
Eager:
Відкликання усіх партицій → Assignment нових → Resume
→ Повний простой
Cooperative:
Покрокове переміщення:
1. Відкликання частини партицій
2. Assignment нових партицій
3. Продовження роботи
4. Повтор при необхідності
→ Мінімальний простой
Налаштування:
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Group Coordinator Protocol
Протокол ребалансу:
JoinGroup — консьюмер каже «хочу в групу».
SyncGroup — лідер розсилає результат розподілу.
1. Join Phase:
- Консьюмер → JoinGroup request
- Coordinator → збирає всіх members
- Coordinator → вибирає leader
2. Assignment Phase:
- Leader → обчислює assignment
- Leader → SyncGroup request з assignment
- Coordinator → розсилає assignments
3. Steady State:
- Консьюмери → Heartbeat requests
- Coordinator → підтверджує alive
- Консьюмери → обробляють повідомлення
Production Configuration
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000 # 30 секунд
heartbeat.interval.ms: 10000 # 10 секунд (1/3)
max.poll.interval.ms: 300000 # 5 хвилин
max.poll.records: 500 # батч
enable.auto.commit: false
Static Membership
// Постійний ID консьюмера
props.put("group.instance.id", "consumer-1");
// При перезапуску:
// - Не викликає ребаланс
// - Повертає ті самі партиції
// - Чекає session.timeout.ms перед ребалансом
Monitoring Rebalancing
Ключові метрики:
kafka.consumer.coordinator:num-rebalances
kafka.consumer.coordinator:time-since-last-rebalance
kafka.consumer:assigned-partitions
kafka.consumer:commit-latency-avg
Alerts:
- Ребаланс частіше ніж раз в 10 хвилин → warning
- Ребаланс частіше ніж раз в 5 хвилин → critical
- Консьюмер не надсилає heartbeat > 30s → critical
- Consumer lag росте → warning
Troubleshooting Rebalancing
1. Визначення причини:
// Логи показують:
"Heartbeat session expired" → session.timeout.ms занадто короткий
"Max poll interval exceeded" → обробка занадто довга
"Member is dead" → консьюмер впав
2. Рішення проблем:
Heartbeat issues:
Збільшити session.timeout.ms
Збільшити heartbeat.interval.ms
Poll interval issues:
Зменшити max.poll.records
Оптимізувати обробку повідомлення
Збільшити max.poll.interval.ms
Network issues:
Перевірити connectivity до брокерів
Перевірити firewall rules
Advanced Patterns
1. Gradual Rollout:
Додавайте консьюмери поступово:
1. Запустіть один консьюмер
2. Дочекайтесь стабілізації
3. Перевірте lag і throughput
4. Додайте наступний
2. Partition-aware State Management:
public class StatefulConsumer {
private final Map<TopicPartition, State> states = new HashMap<>();
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Зберегти state для кожної партиції
for (TopicPartition tp : partitions) {
states.get(tp).save();
states.remove(tp);
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Завантажити state для нових партицій
for (TopicPartition tp : partitions) {
states.put(tp, State.load(tp));
}
}
}
Best Practices
✅ Cooperative-sticky assignor
✅ Достатній session timeout (30s+)
✅ Швидка обробка повідомлень
✅ ConsumerRebalanceListener для graceful handling
✅ Static membership для стабільних інстансів
✅ Моніторинг частоти ребалансів
❌ Часті ребаланси без розслідування
❌ Eager assignor допустимий для статичних кластерів. Для production з rolling deploy — використовуйте CooperativeSticky.
❌ Занадто короткий timeout
❌ Без обробки onPartitionsRevoked
❌ Ігнорування consumer lag
Архітектурні рішення
- Cooperative rebalancing — стандарт для production
- Static membership — для стабільних інстансів
- State management — збереження state при ребалансі
- Monitoring — раннє виявлення проблем
Резюме для Senior
- Ребаланс — критичний процес для availability групи
- Cooperative rebalancing мінімізує disruption
- ConsumerRebalanceListener обов’язковий для production
- Static membership зменшує unnecessary ребаланси
- Моніторинг частоти ребалансів — ранній індикатор проблем
- Troubleshooting потребує аналізу логів і метрик
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються
- Eager rebalancing: 5-30 секунд повного простою; Cooperative: 1-5 секунд
- ConsumerRebalanceListener обов’язковий: commitSync в onPartitionsRevoked
- Часті ребаланси — проблема: session.timeout exceeded, max.poll.interval exceeded
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — усуває ребаланс при rolling deploy - Gradual rollout: додавайте консьюмери поступово, перевіряйте lag
Часті уточнюючі запитання:
- Що буде якщо консьюмер довго обробляє? — max.poll.interval exceeded → виключення → ребаланс.
- Як прискорити ребаланс? — CooperativeStickyAssignor + static membership.
- Що робити з stateful consumer при ребалансі? — Зберегти state в onPartitionsRevoked, завантажити в onPartitionsAssigned.
- Чи можна відключити ребаланс? — Ні, але static membership мінімізує при відомих інстансах.
Червоні прапорці (НЕ говорити):
- «Ребаланс відбувається миттєво» — 5-30 секунд eager, 1-5 cooperative
- «Можна ігнорувати onPartitionsRevoked» — втрата offsets і state
- «Часті ребаланси — це нормально» — ознака проблем з таймаутами або обробкою
- «Eager rebalancing — найкращий вибір» — cooperative мінімізує disruption
Пов’язані теми:
- [[15. Що таке ребаланс і коли він відбувається]]
- [[6. Як працює балансування консьюмерів у групі]]
- [[5. Що таке Consumer Group]]
- [[14. У чому різниця між auto commit і manual commit]]