Вопрос 8 · Раздел 15

Что произойдёт при добавлении нового консьюмера в группе

При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются между всеми консьюмерами.

Версии по языкам: English Russian Ukrainian

Уровень Junior

Краткий ответ

При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются между всеми консьюмерами.

Пример

До добавления:
  C1 → P0, P1
  C2 → P2

Добавили C3:
  C1 → P0
  C2 → P1
  C3 → P2

Что происходит во время ребаланса?

Eager rebalancing: 5-30 секунд полного простоя.
Cooperative rebalancing: 1-5 секунд с продолжением обработки на оставшихся партициях.
Сообщения накапливаются в Kafka
После ребаланса чтение возобновляется

Как добавить консьюмер?

// Просто запустите новый консьюмер с тем же group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka автоматически запустит ребаланс

Уровень Middle

Процесс ребаланса (по шагам)

1. Новый консьюмер подключается к брокеру
2. Отправляет JoinGroup request
3. Group coordinator начинает ребаланс
4. Все консьюмеры получают уведомление
5. Все консьюмеры останавливают чтение (eager)
   или продолжают с частью партиций (cooperative)
6. Вычисляется новый assignment
7. Assignment рассылается консьюмерам
8. Консьюмеры начинают читать новые партиции

Rebalancing Strategies

1. Eager (старый подход):

Все консьюмеры останавливаются полностью
Полное перераспределение
Все продолжают работу
→ Простой всей группы

2. Cooperative (Kafka 2.3+):

Постепенное перемещение партиций
Консьюмеры продолжают работать с остальными
→ Минимальный простой

ConsumerRebalanceListener

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Коммитить offsets перед потерей партиций
        consumer.commitSync();
        // Очистить локальный state
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Инициализация новых партиций
        initializeState(partitions);
        // Можно начать чтение с определённого offset
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, getSavedOffset(tp));
        }
    }
});

Частые ребалансы — проблема

Консьюмеры нестабильны → постоянные ребалансы → простой системы

Причины:
- session.timeout.ms слишком короткий
- max.poll.interval.ms exceeded (долгая обработка)
- Проблемы с сетью
- GC pauses

Симптомы:
- Сообщения в логах: "Rebalance triggered"
- Рост consumer lag
- Падение throughput

Типичные ошибки

  1. Без ConsumerRebalanceListener:
    Потеря данных при ребалансе (не закоммичены offsets)
    
  2. Частые ребалансы:
    Консьюмеры падают → ребаланс → снова падают → цикл
    
  3. Долгая обработка между poll:
    max.poll.interval.ms exceeded → консьюмер исключён → ребаланс
    

Уровень Senior

Cooperative Rebalancing — детально

Incremental Cooperative Rebalancing (Kafka 2.3+):

Eager:
  Revocation всех партиций → Assignment новых → Resume
  → Полный простой

Cooperative:
  Пошаговое перемещение:
  1. Revocation части партиций
  2. Assignment новых партиций
  3. Продолжение работы
  4. Повтор при необходимости
  → Минимальный простой

Настройка:

props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Group Coordinator Protocol

Протокол ребаланса:

JoinGroup — консьюмер говорит «хочу в группу».
SyncGroup — лидер рассылает результат распределения.

1. Join Phase:
   - Консьюмер → JoinGroup request
   - Coordinator → собирает всех members
   - Coordinator → выбирает leader

2. Assignment Phase:
   - Leader → вычисляет assignment
   - Leader → SyncGroup request с assignment
   - Coordinator → рассылает assignments

3. Steady State:
   - Консьюмеры → Heartbeat requests
   - Coordinator → подтверждает alive
   - Консьюмеры → обрабатывают сообщения

Production Configuration

partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000           # 30 секунд
heartbeat.interval.ms: 10000        # 10 секунд (1/3)
max.poll.interval.ms: 300000        # 5 минут
max.poll.records: 500               # батч
enable.auto.commit: false

Static Membership

// Постоянный ID консьюмера
props.put("group.instance.id", "consumer-1");

// При рестарте:
// - Не вызывает ребаланс
// - Возвращает те же партиции
// - Ждёт session.timeout.ms перед rebalance

Monitoring Rebalancing

Ключевые метрики:

kafka.consumer.coordinator:num-rebalances
kafka.consumer.coordinator:time-since-last-rebalance
kafka.consumer:assigned-partitions
kafka.consumer:commit-latency-avg

Alerts:

- Ребаланс чаще чем раз в 10 минут → warning
- Ребаланс чаще чем раз в 5 минут → critical
- Консьюмер не отправляет heartbeat > 30s → critical
- Consumer lag растёт → warning

Troubleshooting Rebalancing

1. Определение причины:

// Логи показывают:
"Heartbeat session expired"  session.timeout.ms слишком короткий
"Max poll interval exceeded"  обработка слишком долгая
"Member is dead"  консьюмер упал

2. Решение проблем:

Heartbeat issues:
  Увеличить session.timeout.ms
  Увеличить heartbeat.interval.ms

Poll interval issues:
  Уменьшить max.poll.records
  Оптимизировать обработку сообщения
  Увеличить max.poll.interval.ms

Network issues:
  Проверить connectivity к брокерам
  Проверить firewall rules

Advanced Patterns

1. Gradual Rollout:

Добавляйте консьюмеры постепенно:
1. Запустите один консьюмер
2. Дождитесь стабилизации
3. Проверьте lag и throughput
4. Добавьте следующий

2. Partition-aware State Management:

public class StatefulConsumer {
    private final Map<TopicPartition, State> states = new HashMap<>();

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Сохранить state для каждой партиции
        for (TopicPartition tp : partitions) {
            states.get(tp).save();
            states.remove(tp);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Загрузить state для новых партиций
        for (TopicPartition tp : partitions) {
            states.put(tp, State.load(tp));
        }
    }
}

Best Practices

✅ Cooperative-sticky assignor
✅ Достаточный session timeout (30s+)
✅ Быстрая обработка сообщений
✅ ConsumerRebalanceListener для graceful handling
✅ Static membership для стабильных инстансов
✅ Мониторинг частоты ребалансов

❌ Частые ребалансы без расследования
❌ Eager assignor допустим для статических кластеров. Для production с rolling deploy — используйте CooperativeSticky.
❌ Слишком короткий timeout
❌ Без обработки onPartitionsRevoked
❌ Игнорирование consumer lag

Архитектурные решения

  1. Cooperative rebalancing — стандарт для production
  2. Static membership — для стабильных инстансов
  3. State management — сохранение state при ребалансе
  4. Monitoring — раннее обнаружение проблем

Резюме для Senior

  • Ребаланс — критический процесс для availability группы
  • Cooperative rebalancing минимизирует disruption
  • ConsumerRebalanceListener обязателен для production
  • Static membership уменьшает unnecessary ребалансы
  • Мониторинг частоты ребалансов — ранний индикатор проблем
  • Troubleshooting требует анализа логов и метрик

🎯 Шпаргалка для интервью

Обязательно знать:

  • При добавлении нового консьюмера происходит ребаланс — партиции перераспределяются
  • Eager rebalancing: 5-30 секунд полного простоя; Cooperative: 1-5 секунд
  • ConsumerRebalanceListener обязателен: commitSync в onPartitionsRevoked
  • Частые ребалансы — проблема: session.timeout exceeded, max.poll.interval exceeded
  • CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  • Static membership (group.instance.id) — устраняет ребаланс при rolling deploy
  • Gradual rollout: добавляйте консьюмеры постепенно, проверяйте lag

Частые уточняющие вопросы:

  • Что будет если консьюмер долго обрабатывает? — max.poll.interval exceeded → исключение → ребаланс.
  • Как ускорить ребаланс? — CooperativeStickyAssignor + static membership.
  • Что делать с stateful consumer при ребалансе? — Сохранить state в onPartitionsRevoked, загрузить в onPartitionsAssigned.
  • Можно ли отключить ребаланс? — Нет, но static membership минимизирует при известных инстансах.

Красные флаги (НЕ говорить):

  • «Ребаланс происходит мгновенно» — 5-30 секунд eager, 1-5 cooperative
  • «Можно игнорировать onPartitionsRevoked» — потеря offsets и state
  • «Частые ребалансы — это нормально» — признак проблем с таймаутами или обработкой
  • «Eager rebalancing — лучший выбор» — cooperative minimizes disruption

Связанные темы:

  • [[15. Что такое rebalancing и когда он происходит]]
  • [[6. Как работает балансировка консьюмеров в группе]]
  • [[5. Что такое Consumer Group]]
  • [[14. В чём разница между auto commit и manual commit]]