Питання 8 · Розділ 15

Що станеться при додаванні нового консьюмера в групі

При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються між усіма консьюмерами.

Мовні версії: English Russian Ukrainian

Рівень Junior

Коротка відповідь

При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються між усіма консьюмерами.

Приклад

До додавання:
  C1 → P0, P1
  C2 → P2

Додали C3:
  C1 → P0
  C2 → P1
  C3 → P2

Що відбувається під час ребалансу?

Eager rebalancing: 5-30 секунд повного простою.
Cooperative rebalancing: 1-5 секунд з продовженням обробки на решті партицій.
Повідомлення накопичуються в Kafka
Після ребалансу читання відновлюється

Як додати консьюмер?

// Просто запустіть новий консьюмер з тим самим group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматично запустить ребаланс

Рівень Middle

Процес ребалансу (по кроках)

1. Новий консьюмер підключається до брокера
2. Надсилає JoinGroup request
3. Group coordinator починає ребаланс
4. Усі консьюмери отримують сповіщення
5. Усі консьюмери зупиняють читання (eager)
   або продовжують з частиною партицій (cooperative)
6. Обчислюється новий assignment
7. Assignment розсилається консьюмерам
8. Консьюмери починають читати нові партиції

Rebalancing Strategies

1. Eager (старий підхід):

Усі консьюмери зупиняються повністю
Повне перерозподіл
Усі продовжують роботу
→ Простій усій групі

2. Cooperative (Kafka 2.3+):

Поступове переміщення партицій
Консьюмери продовжують працювати з рештою
→ Мінімальний простой

ConsumerRebalanceListener

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Коммітити offsets перед втратою партицій
        consumer.commitSync();
        // Очистити локальний state
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Ініціалізація нових партицій
        initializeState(partitions);
        // Можна почати читання з певного offset
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, getSavedOffset(tp));
        }
    }
});

Часті ребаланси — проблема

Консьюмери нестабільні → постійні ребаланси → простой системи

Причини:
- session.timeout.ms занадто короткий
- max.poll.interval.ms exceeded (довга обробка)
- Проблеми з мережею
- GC pauses

Симптоми:
- Повідомлення в логах: "Rebalance triggered"
- Зростання consumer lag
- Падіння throughput

Типові помилки

  1. Без ConsumerRebalanceListener:
    Втрата даних при ребалансі (не закоммічені offsets)
    
  2. Часті ребаланси:
    Консьюмери падають → ребаланс → знову падають → цикл
    
  3. Довга обробка між poll:
    max.poll.interval.ms exceeded → консьюмер виключений → ребаланс
    

Рівень Senior

Cooperative Rebalancing — детально

Incremental Cooperative Rebalancing (Kafka 2.3+):

Eager:
  Відкликання усіх партицій → Assignment нових → Resume
  → Повний простой

Cooperative:
  Покрокове переміщення:
  1. Відкликання частини партицій
  2. Assignment нових партицій
  3. Продовження роботи
  4. Повтор при необхідності
  → Мінімальний простой

Налаштування:

props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Group Coordinator Protocol

Протокол ребалансу:

JoinGroup — консьюмер каже «хочу в групу».
SyncGroup — лідер розсилає результат розподілу.

1. Join Phase:
   - Консьюмер → JoinGroup request
   - Coordinator → збирає всіх members
   - Coordinator → вибирає leader

2. Assignment Phase:
   - Leader → обчислює assignment
   - Leader → SyncGroup request з assignment
   - Coordinator → розсилає assignments

3. Steady State:
   - Консьюмери → Heartbeat requests
   - Coordinator → підтверджує alive
   - Консьюмери → обробляють повідомлення

Production Configuration

partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000           # 30 секунд
heartbeat.interval.ms: 10000        # 10 секунд (1/3)
max.poll.interval.ms: 300000        # 5 хвилин
max.poll.records: 500               # батч
enable.auto.commit: false

Static Membership

// Постійний ID консьюмера
props.put("group.instance.id", "consumer-1");

// При перезапуску:
// - Не викликає ребаланс
// - Повертає ті самі партиції
// - Чекає session.timeout.ms перед ребалансом

Monitoring Rebalancing

Ключові метрики:

kafka.consumer.coordinator:num-rebalances
kafka.consumer.coordinator:time-since-last-rebalance
kafka.consumer:assigned-partitions
kafka.consumer:commit-latency-avg

Alerts:

- Ребаланс частіше ніж раз в 10 хвилин → warning
- Ребаланс частіше ніж раз в 5 хвилин → critical
- Консьюмер не надсилає heartbeat > 30s → critical
- Consumer lag росте → warning

Troubleshooting Rebalancing

1. Визначення причини:

// Логи показують:
"Heartbeat session expired"  session.timeout.ms занадто короткий
"Max poll interval exceeded"  обробка занадто довга
"Member is dead"  консьюмер впав

2. Рішення проблем:

Heartbeat issues:
  Збільшити session.timeout.ms
  Збільшити heartbeat.interval.ms

Poll interval issues:
  Зменшити max.poll.records
  Оптимізувати обробку повідомлення
  Збільшити max.poll.interval.ms

Network issues:
  Перевірити connectivity до брокерів
  Перевірити firewall rules

Advanced Patterns

1. Gradual Rollout:

Додавайте консьюмери поступово:
1. Запустіть один консьюмер
2. Дочекайтесь стабілізації
3. Перевірте lag і throughput
4. Додайте наступний

2. Partition-aware State Management:

public class StatefulConsumer {
    private final Map<TopicPartition, State> states = new HashMap<>();

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Зберегти state для кожної партиції
        for (TopicPartition tp : partitions) {
            states.get(tp).save();
            states.remove(tp);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Завантажити state для нових партицій
        for (TopicPartition tp : partitions) {
            states.put(tp, State.load(tp));
        }
    }
}

Best Practices

✅ Cooperative-sticky assignor
✅ Достатній session timeout (30s+)
✅ Швидка обробка повідомлень
✅ ConsumerRebalanceListener для graceful handling
✅ Static membership для стабільних інстансів
✅ Моніторинг частоти ребалансів

❌ Часті ребаланси без розслідування
❌ Eager assignor допустимий для статичних кластерів. Для production з rolling deploy — використовуйте CooperativeSticky.
❌ Занадто короткий timeout
❌ Без обробки onPartitionsRevoked
❌ Ігнорування consumer lag

Архітектурні рішення

  1. Cooperative rebalancing — стандарт для production
  2. Static membership — для стабільних інстансів
  3. State management — збереження state при ребалансі
  4. Monitoring — раннє виявлення проблем

Резюме для Senior

  • Ребаланс — критичний процес для availability групи
  • Cooperative rebalancing мінімізує disruption
  • ConsumerRebalanceListener обов’язковий для production
  • Static membership зменшує unnecessary ребаланси
  • Моніторинг частоти ребалансів — ранній індикатор проблем
  • Troubleshooting потребує аналізу логів і метрик

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • При додаванні нового консьюмера відбувається ребаланс — партиції перерозподіляються
  • Eager rebalancing: 5-30 секунд повного простою; Cooperative: 1-5 секунд
  • ConsumerRebalanceListener обов’язковий: commitSync в onPartitionsRevoked
  • Часті ребаланси — проблема: session.timeout exceeded, max.poll.interval exceeded
  • CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  • Static membership (group.instance.id) — усуває ребаланс при rolling deploy
  • Gradual rollout: додавайте консьюмери поступово, перевіряйте lag

Часті уточнюючі запитання:

  • Що буде якщо консьюмер довго обробляє? — max.poll.interval exceeded → виключення → ребаланс.
  • Як прискорити ребаланс? — CooperativeStickyAssignor + static membership.
  • Що робити з stateful consumer при ребалансі? — Зберегти state в onPartitionsRevoked, завантажити в onPartitionsAssigned.
  • Чи можна відключити ребаланс? — Ні, але static membership мінімізує при відомих інстансах.

Червоні прапорці (НЕ говорити):

  • «Ребаланс відбувається миттєво» — 5-30 секунд eager, 1-5 cooperative
  • «Можна ігнорувати onPartitionsRevoked» — втрата offsets і state
  • «Часті ребаланси — це нормально» — ознака проблем з таймаутами або обробкою
  • «Eager rebalancing — найкращий вибір» — cooperative мінімізує disruption

Пов’язані теми:

  • [[15. Що таке ребаланс і коли він відбувається]]
  • [[6. Як працює балансування консьюмерів у групі]]
  • [[5. Що таке Consumer Group]]
  • [[14. У чому різниця між auto commit і manual commit]]