Что такое Consumer Group
Представьте команду курьеров:
🟢 Junior Level
Что такое Consumer Group?
Consumer Group — это группа консьюмеров, которые совместно читают данные из топика. Каждый консьюмер в группе обрабатывает свою часть партиций.
Зачем: механизм, который позволяет нескольким консьюмерам ДЕЛИТЬ партиции между собой так, чтобы каждое сообщение обрабатывалось ровно одним консьюмером из группы. Без группы каждый консьюмер читал бы все сообщения (дублирование).
Аналогия
Представьте команду курьеров:
- Топик — это склад с заказами (партиции = разные полки)
- Consumer Group — это одна команда курьеров
- Каждый курьер (консьюмер) берёт заказы со своих полок
- Два курьера не берут заказы с одной полки (избегают дублирования)
Если курьер уходит (падает), его полки автоматически перераспределяются между оставшимися — это называется ребаланс.
Consumer Group "order-processors":
Consumer 1 → читает Партицию 0
Consumer 2 → читает Партицию 1
Consumer 3 → читает Партицию 2
Основные правила
- Каждая партиция обрабатывается только одним консьюмером в группе
- Один консьюмер может читать несколько партиций
- Если консьюмеров больше, чем партиций — лишние простаивают
Зачем нужны Consumer Groups?
- Масштабирование — больше консьюмеров = больше параллелизм
- Отказоустойчивость — если один консьюмер упадёт, другие подхватят его партиции
- Изоляция — разные группы читают один топик независимо
Простой пример
// Consumer 1
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));
// Consumer 2 (в той же группе)
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));
// Оба консьюмера будут читать разные партиции
🟡 Middle Level
Когда НЕ использовать Consumer Group
- Нужно чтобы каждое сообщение прочитали ВСЕ консьюмеры — запустите разные group.id
- Нужен broadcast-паттерн (каждый консьюмер читает всё) — разные group.id
Балансировка партиций
3 партиции, 2 консьюмера:
C1 → P0, P1
C2 → P2
3 партиции, 3 консьюмера:
C1 → P0
C2 → P1
C3 → P2 ← идеальный баланс
3 партиции, 5 консьюмеров:
C1 → P0
C2 → P1
C3 → P2
C4 → idle (простаивает)
C5 → idle (простаивает)
Partition Assignment Strategies
| Стратегия | Как работает | Плюсы | Минусы |
|---|---|---|---|
| Range | Делит партиции по диапазону | Простая | Неравномерность при нечётном числе |
| RoundRobin | Чередует партиции | Более равномерная | Перемещает много при ребалансе |
| Sticky | Минимизирует перемещение | Стабильность | Сложнее в реализации |
| CooperativeSticky | Пошаговое перемещение | Минимальный простой | Kafka 2.3+ |
Range Assignor
Партиции 0,1,2,3,4 → 2 консьюмера:
C1: 0, 1, 2 (3 партиции)
C2: 3, 4 (2 партиции)
→ Неравномерно!
RoundRobin Assignor
Партиции 0,1,2,3,4 → 2 консьюмера:
C1: 0, 2, 4 (3 партиции)
C2: 1, 3 (2 партиции)
→ Лучше, но всё ещё не идеально
Rebalancing
Происходит при:
- Добавлении/удалении консьюмера
- Добавлении партиций в топик
- Изменении подписки на топики
- Таймауте сессии консьюмера
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // Коммитить offsets перед ребалансом
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Инициализация новых партиций
}
});
Multiple Consumer Groups
Один топик "orders" читается двумя группами:
Group "order-processing":
C1 → P0, P1
C2 → P2, P3
Group "analytics":
C3 → P0, P1, P2, P3 // читает все данные независимо
Каждая группа получает полную копию данных!
Типичные ошибки
| Ошибка | Последствие | Решение |
|---|---|---|
| Больше консьюмеров чем партиций | Лишние простаивают, тратят ресурсы | Consumer count == Partition count |
| Без ConsumerRebalanceListener | Потеря offsets при ребалансе | Всегда используйте listener |
| Одинаковый group.id для разных приложений | Делят партиции → каждый получает часть данных | Уникальный group.id per application |
| Долгая обработка между poll | Исключение из группы → ребаланс | Уменьшите max.poll.records или оптимизируйте |
🔴 Senior Level
Group Coordinator Protocol — внутреннее устройство
Group Coordinator — это один из брокеров кластера, выбранный контроллером. Отвечает за:
- Управление членством в группе (Join/Leave)
- Распределение партиций (Assignment)
- Хранение committed offsets в
__consumer_offsets
Протокол группы (Kafka < 3.0, Consumer Protocol v0):
1. JoinGroup Request:
Consumer → Coordinator: "Я хочу вступить в группу"
Coordinator → Leader: "Вот список всех members"
2. Leader вычисляет assignment:
Leader (один из консьюмеров) запускает PartitionAssignor
Результат: Map<memberId, List<TopicPartition>>
3. SyncGroup Request:
Leader → Coordinator: assignment для всех members
Coordinator → каждый Consumer: его персональный assignment
4. Heartbeat:
Каждый Consumer → Coordinator: "Я жив" (каждые heartbeat.interval.ms)
5. LeaveGroup:
Consumer → Coordinator: "Я ухожу"
→ Trigger rebalance
Kafka 3.0+ Consumer Protocol v1 (KIP-848):
KIP (Kafka Improvement Proposal) — процесс предложений по улучшению Kafka.
KIP-848 — предложение о server-side assignment.
Group Coordinator — специальный брокер, который управляет членством в группе.
Новый протокол на основе heartbeat RPC:
- Assignment вычисляется на Coordinator (не на Leader)
- Устраняет SyncGroup round-trip
- Поддерживает Server-Side Assignment
- Быстрее ребаланс (на 1-2 RTT меньше)
__consumer_offsets — Internal Topic
Тема: __consumer_offsets
Партиции: 50 (по умолчанию, offsets.topic.num.partitions)
RF: 3 (по умолчанию)
Cleanup: compact (хранит только последний offset для каждой группы)
Format ключа: [group.id, topic, partition]
Format значения: [offset, metadata, commitTimestamp, expireTimestamp]
Пример:
Key: ["order-processors", "orders", 0]
Value: [offset=42567, metadata="", timestamp=1712345678]
Смещение коммита:
// Auto commit (enable.auto.commit=true)
// Коммит каждые auto.commit.interval.ms (5 секунд по умолчанию)
// Риск: сообщения могут быть потеряны при crash между commit и processing
// Manual commit (enable.auto.commit=false)
consumer.commitSync(); // Синхронный — блокирует до подтверждения
consumer.commitAsync(); // Асинхронный — не блокирует, retry callback
Sticky Assignor — алгоритм
Алгоритм Sticky Assignor (Kafka 2.3+):
1. Текущий assignment: C1={P0,P1}, C2={P2}
2. Новый консьюмер C3 присоединяется
3. Sticky сохраняет текущие назначения:
C1: {P0, P1} → сохраняет (не перемещает)
C2: {P2} → сохраняет (не перемещает)
4. Оставшиеся партиции распределяются:
C3: {} → получает свободные партиции
Если партиций нет → C3 = idle
Преимущества перед Range/RoundRobin:
- Минимизирует partition movement (меньше disruption)
- Сохраняет locality данных (page cache warm)
- Меньше reprocessing при ребалансе
Cooperative Rebalancing (KIP-429, Kafka 2.3+)
Eager Rebalancing (старый):
1. Trigger rebalance
2. ALL consumers revoke ALL partitions
3. Stop processing
4. Compute new assignment
5. Resume processing
→ Full stop всей группы на 5-30 секунд
Cooperative Rebalancing (новый):
1. Trigger rebalance
2. Each consumer revokes ONLY some partitions
3. Continue processing with remaining partitions
4. Incremental assignment for revoked partitions
5. Repeat until stable
→ Минимальный простой, processing продолжается
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Static Membership (KIP-345)
props.put("group.instance.id", "consumer-1"); // Уникальный ID инстанса
Поведение:
- При рестарте консьюмер с тем же
group.instance.idне вызывает ребаланс - Coordinator ждёт
session.timeout.msперед исключением - При возврате — получает те же партиции
- Уменьшает churn при rolling deploy
Сравнение: | | Dynamic Membership | Static Membership | | —————– | —————————- | —————————————– | | group.instance.id | Не задан | Задан | | Restart | Rebalance | No rebalance (в пределах session.timeout) | | Rolling deploy | N ребалансов для N инстансов | 0 ребалансов | | Crash detection | session.timeout.ms | session.timeout.ms |
Edge Cases (3+)
-
Split-Brain Consumer Group: При network partition консьюмеры могут потерять связь с coordinator. Если
session.timeout.msистекает, coordinator исключает их и запускает ребаланс. При восстановлении сети «старые» консьюмеры думают, что они всё ещё в группе, а «новые» уже получили их партиции. Результат: duplicate processing — два консьюмера читают одну партицию. Решение:session.timeout.msдолжен быть достаточно большим, используйте static membership. -
Offset Commit Race Condition: Consumer A читает offset 100-150, начинает обработку. Rebalance → партиция переходит к Consumer B. Consumer A завершает обработку и коммитит offset 151. Consumer B начинает с offset 151, но сообщения 100-150 ещё не обработаны Consumer A (обработка была async). Решение: commitSync в
onPartitionsRevoked+ blocking processing. -
__consumer_offsets Partition Unavailable: Если партиция
__consumer_offsets, хранящая offsets группы, становится недоступной (leader down, ISR < min.insync.replicas), консьюмеры не могут коммитить offsets. Они продолжают читать, но при рестарте начнут с последнего сохранённого offset → возможное дублирование. Приmin.insync.replicas=2и RF=3 это маловероятно, но возможно при двойном сбое. -
Max Poll Records + Slow Processing:
max.poll.records=500, обработка одного сообщения занимает 1 секунду. Обработка 500 сообщений = 500 секунд.max.poll.interval.ms=300000(5 минут). Consumer не успевает вызватьpoll()в течение 5 минут → coordinator исключает → rebalance. Решение: Уменьшитеmax.poll.recordsили увеличьтеmax.poll.interval.ms. -
Consumer Group с 0 активных консьюмеров: Группа существует в
__consumer_offsets, но все консьюмеры offline. При запуске нового консьюмера он получает все партиции и начинает с последнего committed offset. Если offsets старые (retention expired), начнёт сauto.offset.reset(earliest/latest). Это может вызвать massive replay или skip данных.
Performance Numbers
| Метрика | Значение | Условия |
|---|---|---|
| Rebalance time (eager) | 5-30 секунд | Зависит от группы размера |
| Rebalance time (cooperative) | 1-5 секунд | Пошаговое перемещение |
| Heartbeat interval | 3-10 секунд | 1/3 от session.timeout.ms |
| Offset commit latency | 1-5 ms | Синхронный, локальный брокер |
| __consumer_offsets throughput | ~10K commits/sec | 50 партиций |
| Max consumers per group | Limited by partitions | Практически: сотни |
Production War Story
Ситуация: Платёжный процессор с 15 консьюмерами в группе
payment-processors(15 партиций). Использовали Range assignor и eager rebalancing. При rolling deploy (последовательный рестарт всех 15 подов в Kubernetes) каждый под вызывал ребаланс.Проблема: 15 последовательных ребалансов заняли 15 × 15 секунд = 225 секунд простоя. За это время lag вырос до 200K сообщений. После восстановления некоторые платежи обработались дважды (offset commit race condition).
Диагностика:
kafka-consumer-groups.sh --describe --group payment-processors # Показал: STATE=Dead на 225 секунд, затем STATE=Empty → Rebalancing # Consumer lag: 0 → 200K → 0 (после catchup с duplicates)Решение:
- Перешли на
CooperativeStickyAssignor— ребаланс стал 2-3 секунды- Ввели
group.instance.id=${POD_NAME}— при rolling deploy не было ребалансов- Добавили
ConsumerRebalanceListenerс commitSync вonPartitionsRevoked- Настроили Pod Disruption Budget в K8s — максимум 1 под вниз одновременно
- Увеличили
session.timeout.msдо 45 секунд,heartbeat.interval.msдо 15 секундРезультат: Rolling deploy — 0 ребалансов, 0 downtime, 0 duplicates.
Урок: Eager rebalancing + rolling deploy = cascading failure. Всегда используйте cooperative rebalancing + static membership для production.
Мониторинг (JMX + Burrow)
JMX метрики:
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-metrics,client-id=*,key=commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,key=records-lag-max
Burrow:
- Consumer group status: OK, WARN, ERR, STOP, STALL
- Per-partition lag with trend analysis
- HTTP API → Grafana dashboard → PagerDuty alerts
- Не требует модификации консьюмеров (читает
__consumer_offsetsнапрямую)
Highload Best Practices
- Consumer count == Partition count — идеальный баланс
- CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
- Static membership (
group.instance.id) — для rolling deploy stability - Heartbeat tuning:
heartbeat.interval.ms = session.timeout.ms / 3 - ConsumerRebalanceListener обязателен — commitSync + state cleanup
- Monitor rebalance frequency — alert при > 1 rebalance за 10 минут
- enable.auto.commit=false — всегда коммитьте после обработки
- max.poll.records подбирайте под latency обработки (цель: poll < max.poll.interval.ms)
- Burrow для monitoring — consumer-centric, не требует instrumentation
- KIP-848 (Server-Side Assignment) — используйте при миграции на Kafka 3.0+
🎯 Шпаргалка для интервью
Обязательно знать:
- Consumer Group — группа консьюмеров, совместно читающих топик; каждая партиция = один консьюмер
__consumer_offsets— внутренний топик для хранения смещений (compact)- Partition Assignment Strategies: Range, RoundRobin, Sticky, CooperativeSticky
- Rebalancing происходит при добавлении/удалении консьюмера, изменении партиций
- CooperativeStickyAssignor (Kafka 2.3+) — минимальный простой при ребалансе
- Static Membership (
group.instance.id) — eliminates rebalance при rolling deploy - Eager rebalancing = full stop; Cooperative = incremental, processing продолжается
- Ideal: consumer count == partition count
Частые уточняющие вопросы:
- Что если консьюмеров больше чем партиций? — Лишние простаивают (idle), потребляют ресурсы.
- Что такое __consumer_offsets? — Внутренний compact-топик, хранит offsets для всех consumer groups.
- Чем Cooperative отличается от Eager? — Eager: все останавливаются. Cooperative: только перемещаемые партиции.
- Зачем нужен group.instance.id? — Static membership: при рестарте тот же консьюмер получает те же партиции без ребаланса.
Красные флаги (НЕ говорить):
- «Два консьюмера могут читать одну партицию в группе» — только один
- «Автокоммит — лучший выбор для production» —手动 коммит после обработки
- «Ребаланс — это быстро» — eager: 5-30 секунд простоя
- «Range assignor — лучший для production» — неравномерный, используйте CooperativeSticky
Связанные темы:
- [[6. Как работает балансировка консьюмеров в группе]]
- [[7. Можно ли иметь больше консьюмеров, чем партиций]]
- [[8. Что произойдёт при добавлении нового консьюмера в группе]]
- [[15. Что такое rebalancing и когда он происходит]]
- [[13. Как работает commit offset]]