Вопрос 5 · Раздел 15

Что такое Consumer Group

Представьте команду курьеров:

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Что такое Consumer Group?

Consumer Group — это группа консьюмеров, которые совместно читают данные из топика. Каждый консьюмер в группе обрабатывает свою часть партиций.

Зачем: механизм, который позволяет нескольким консьюмерам ДЕЛИТЬ партиции между собой так, чтобы каждое сообщение обрабатывалось ровно одним консьюмером из группы. Без группы каждый консьюмер читал бы все сообщения (дублирование).

Аналогия

Представьте команду курьеров:

  • Топик — это склад с заказами (партиции = разные полки)
  • Consumer Group — это одна команда курьеров
  • Каждый курьер (консьюмер) берёт заказы со своих полок
  • Два курьера не берут заказы с одной полки (избегают дублирования)

Если курьер уходит (падает), его полки автоматически перераспределяются между оставшимися — это называется ребаланс.

Consumer Group "order-processors":
  Consumer 1 → читает Партицию 0
  Consumer 2 → читает Партицию 1
  Consumer 3 → читает Партицию 2

Основные правила

  • Каждая партиция обрабатывается только одним консьюмером в группе
  • Один консьюмер может читать несколько партиций
  • Если консьюмеров больше, чем партиций — лишние простаивают

Зачем нужны Consumer Groups?

  1. Масштабирование — больше консьюмеров = больше параллелизм
  2. Отказоустойчивость — если один консьюмер упадёт, другие подхватят его партиции
  3. Изоляция — разные группы читают один топик независимо

Простой пример

// Consumer 1
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Consumer 2 (в той же группе)
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Оба консьюмера будут читать разные партиции

🟡 Middle Level

Когда НЕ использовать Consumer Group

  1. Нужно чтобы каждое сообщение прочитали ВСЕ консьюмеры — запустите разные group.id
  2. Нужен broadcast-паттерн (каждый консьюмер читает всё) — разные group.id

Балансировка партиций

3 партиции, 2 консьюмера:
  C1 → P0, P1
  C2 → P2

3 партиции, 3 консьюмера:
  C1 → P0
  C2 → P1
  C3 → P2  ← идеальный баланс

3 партиции, 5 консьюмеров:
  C1 → P0
  C2 → P1
  C3 → P2
  C4 → idle (простаивает)
  C5 → idle (простаивает)

Partition Assignment Strategies

Стратегия Как работает Плюсы Минусы
Range Делит партиции по диапазону Простая Неравномерность при нечётном числе
RoundRobin Чередует партиции Более равномерная Перемещает много при ребалансе
Sticky Минимизирует перемещение Стабильность Сложнее в реализации
CooperativeSticky Пошаговое перемещение Минимальный простой Kafka 2.3+

Range Assignor

Партиции 0,1,2,3,4 → 2 консьюмера:
  C1: 0, 1, 2  (3 партиции)
  C2: 3, 4     (2 партиции)
  → Неравномерно!

RoundRobin Assignor

Партиции 0,1,2,3,4 → 2 консьюмера:
  C1: 0, 2, 4  (3 партиции)
  C2: 1, 3     (2 партиции)
  → Лучше, но всё ещё не идеально

Rebalancing

Происходит при:

  • Добавлении/удалении консьюмера
  • Добавлении партиций в топик
  • Изменении подписки на топики
  • Таймауте сессии консьюмера
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync();  // Коммитить offsets перед ребалансом
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Инициализация новых партиций
    }
});

Multiple Consumer Groups

Один топик "orders" читается двумя группами:

Group "order-processing":
  C1 → P0, P1
  C2 → P2, P3

Group "analytics":
  C3 → P0, P1, P2, P3  // читает все данные независимо

Каждая группа получает полную копию данных!

Типичные ошибки

Ошибка Последствие Решение
Больше консьюмеров чем партиций Лишние простаивают, тратят ресурсы Consumer count == Partition count
Без ConsumerRebalanceListener Потеря offsets при ребалансе Всегда используйте listener
Одинаковый group.id для разных приложений Делят партиции → каждый получает часть данных Уникальный group.id per application
Долгая обработка между poll Исключение из группы → ребаланс Уменьшите max.poll.records или оптимизируйте

🔴 Senior Level

Group Coordinator Protocol — внутреннее устройство

Group Coordinator — это один из брокеров кластера, выбранный контроллером. Отвечает за:

  • Управление членством в группе (Join/Leave)
  • Распределение партиций (Assignment)
  • Хранение committed offsets в __consumer_offsets
Протокол группы (Kafka < 3.0, Consumer Protocol v0):

1. JoinGroup Request:
   Consumer → Coordinator: "Я хочу вступить в группу"
   Coordinator → Leader: "Вот список всех members"
   
2. Leader вычисляет assignment:
   Leader (один из консьюмеров) запускает PartitionAssignor
   Результат: Map<memberId, List<TopicPartition>>
   
3. SyncGroup Request:
   Leader → Coordinator: assignment для всех members
   Coordinator → каждый Consumer: его персональный assignment
   
4. Heartbeat:
   Каждый Consumer → Coordinator: "Я жив" (каждые heartbeat.interval.ms)
   
5. LeaveGroup:
   Consumer → Coordinator: "Я ухожу"
   → Trigger rebalance

Kafka 3.0+ Consumer Protocol v1 (KIP-848):

KIP (Kafka Improvement Proposal) — процесс предложений по улучшению Kafka.
KIP-848 — предложение о server-side assignment.
Group Coordinator — специальный брокер, который управляет членством в группе.

Новый протокол на основе heartbeat RPC:
- Assignment вычисляется на Coordinator (не на Leader)
- Устраняет SyncGroup round-trip
- Поддерживает Server-Side Assignment
- Быстрее ребаланс (на 1-2 RTT меньше)

__consumer_offsets — Internal Topic

Тема: __consumer_offsets
Партиции: 50 (по умолчанию, offsets.topic.num.partitions)
RF: 3 (по умолчанию)
Cleanup: compact (хранит только последний offset для каждой группы)

Format ключа: [group.id, topic, partition]
Format значения: [offset, metadata, commitTimestamp, expireTimestamp]

Пример:
  Key: ["order-processors", "orders", 0]
  Value: [offset=42567, metadata="", timestamp=1712345678]

Смещение коммита:

// Auto commit (enable.auto.commit=true)
// Коммит каждые auto.commit.interval.ms (5 секунд по умолчанию)
// Риск: сообщения могут быть потеряны при crash между commit и processing

// Manual commit (enable.auto.commit=false)
consumer.commitSync();      // Синхронный — блокирует до подтверждения
consumer.commitAsync();     // Асинхронный — не блокирует, retry callback

Sticky Assignor — алгоритм

Алгоритм Sticky Assignor (Kafka 2.3+):

1. Текущий assignment: C1={P0,P1}, C2={P2}
2. Новый консьюмер C3 присоединяется
3. Sticky сохраняет текущие назначения:
   C1: {P0, P1} → сохраняет (не перемещает)
   C2: {P2} → сохраняет (не перемещает)
4. Оставшиеся партиции распределяются:
   C3: {} → получает свободные партиции
   Если партиций нет → C3 = idle

Преимущества перед Range/RoundRobin:

  • Минимизирует partition movement (меньше disruption)
  • Сохраняет locality данных (page cache warm)
  • Меньше reprocessing при ребалансе

Cooperative Rebalancing (KIP-429, Kafka 2.3+)

Eager Rebalancing (старый):

1. Trigger rebalance
2. ALL consumers revoke ALL partitions
3. Stop processing
4. Compute new assignment
5. Resume processing
→ Full stop всей группы на 5-30 секунд

Cooperative Rebalancing (новый):

1. Trigger rebalance
2. Each consumer revokes ONLY some partitions
3. Continue processing with remaining partitions
4. Incremental assignment for revoked partitions
5. Repeat until stable
→ Минимальный простой, processing продолжается
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Static Membership (KIP-345)

props.put("group.instance.id", "consumer-1");  // Уникальный ID инстанса

Поведение:

  • При рестарте консьюмер с тем же group.instance.id не вызывает ребаланс
  • Coordinator ждёт session.timeout.ms перед исключением
  • При возврате — получает те же партиции
  • Уменьшает churn при rolling deploy

Сравнение: | | Dynamic Membership | Static Membership | | —————– | —————————- | —————————————– | | group.instance.id | Не задан | Задан | | Restart | Rebalance | No rebalance (в пределах session.timeout) | | Rolling deploy | N ребалансов для N инстансов | 0 ребалансов | | Crash detection | session.timeout.ms | session.timeout.ms |

Edge Cases (3+)

  1. Split-Brain Consumer Group: При network partition консьюмеры могут потерять связь с coordinator. Если session.timeout.ms истекает, coordinator исключает их и запускает ребаланс. При восстановлении сети «старые» консьюмеры думают, что они всё ещё в группе, а «новые» уже получили их партиции. Результат: duplicate processing — два консьюмера читают одну партицию. Решение: session.timeout.ms должен быть достаточно большим, используйте static membership.

  2. Offset Commit Race Condition: Consumer A читает offset 100-150, начинает обработку. Rebalance → партиция переходит к Consumer B. Consumer A завершает обработку и коммитит offset 151. Consumer B начинает с offset 151, но сообщения 100-150 ещё не обработаны Consumer A (обработка была async). Решение: commitSync в onPartitionsRevoked + blocking processing.

  3. __consumer_offsets Partition Unavailable: Если партиция __consumer_offsets, хранящая offsets группы, становится недоступной (leader down, ISR < min.insync.replicas), консьюмеры не могут коммитить offsets. Они продолжают читать, но при рестарте начнут с последнего сохранённого offset → возможное дублирование. При min.insync.replicas=2 и RF=3 это маловероятно, но возможно при двойном сбое.

  4. Max Poll Records + Slow Processing: max.poll.records=500, обработка одного сообщения занимает 1 секунду. Обработка 500 сообщений = 500 секунд. max.poll.interval.ms=300000 (5 минут). Consumer не успевает вызвать poll() в течение 5 минут → coordinator исключает → rebalance. Решение: Уменьшите max.poll.records или увеличьте max.poll.interval.ms.

  5. Consumer Group с 0 активных консьюмеров: Группа существует в __consumer_offsets, но все консьюмеры offline. При запуске нового консьюмера он получает все партиции и начинает с последнего committed offset. Если offsets старые (retention expired), начнёт с auto.offset.reset (earliest/latest). Это может вызвать massive replay или skip данных.

Performance Numbers

Метрика Значение Условия
Rebalance time (eager) 5-30 секунд Зависит от группы размера
Rebalance time (cooperative) 1-5 секунд Пошаговое перемещение
Heartbeat interval 3-10 секунд 1/3 от session.timeout.ms
Offset commit latency 1-5 ms Синхронный, локальный брокер
__consumer_offsets throughput ~10K commits/sec 50 партиций
Max consumers per group Limited by partitions Практически: сотни

Production War Story

Ситуация: Платёжный процессор с 15 консьюмерами в группе payment-processors (15 партиций). Использовали Range assignor и eager rebalancing. При rolling deploy (последовательный рестарт всех 15 подов в Kubernetes) каждый под вызывал ребаланс.

Проблема: 15 последовательных ребалансов заняли 15 × 15 секунд = 225 секунд простоя. За это время lag вырос до 200K сообщений. После восстановления некоторые платежи обработались дважды (offset commit race condition).

Диагностика:

kafka-consumer-groups.sh --describe --group payment-processors
# Показал: STATE=Dead на 225 секунд, затем STATE=Empty → Rebalancing
# Consumer lag: 0 → 200K → 0 (после catchup с duplicates)

Решение:

  1. Перешли на CooperativeStickyAssignor — ребаланс стал 2-3 секунды
  2. Ввели group.instance.id=${POD_NAME} — при rolling deploy не было ребалансов
  3. Добавили ConsumerRebalanceListener с commitSync в onPartitionsRevoked
  4. Настроили Pod Disruption Budget в K8s — максимум 1 под вниз одновременно
  5. Увеличили session.timeout.ms до 45 секунд, heartbeat.interval.ms до 15 секунд

Результат: Rolling deploy — 0 ребалансов, 0 downtime, 0 duplicates.

Урок: Eager rebalancing + rolling deploy = cascading failure. Всегда используйте cooperative rebalancing + static membership для production.

Мониторинг (JMX + Burrow)

JMX метрики:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-metrics,client-id=*,key=commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,key=records-lag-max

Burrow:

  • Consumer group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend analysis
  • HTTP API → Grafana dashboard → PagerDuty alerts
  • Не требует модификации консьюмеров (читает __consumer_offsets напрямую)

Highload Best Practices

  1. Consumer count == Partition count — идеальный баланс
  2. CooperativeStickyAssignor — стандарт для production (Kafka 2.3+)
  3. Static membership (group.instance.id) — для rolling deploy stability
  4. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  5. ConsumerRebalanceListener обязателен — commitSync + state cleanup
  6. Monitor rebalance frequency — alert при > 1 rebalance за 10 минут
  7. enable.auto.commit=false — всегда коммитьте после обработки
  8. max.poll.records подбирайте под latency обработки (цель: poll < max.poll.interval.ms)
  9. Burrow для monitoring — consumer-centric, не требует instrumentation
  10. KIP-848 (Server-Side Assignment) — используйте при миграции на Kafka 3.0+

🎯 Шпаргалка для интервью

Обязательно знать:

  • Consumer Group — группа консьюмеров, совместно читающих топик; каждая партиция = один консьюмер
  • __consumer_offsets — внутренний топик для хранения смещений (compact)
  • Partition Assignment Strategies: Range, RoundRobin, Sticky, CooperativeSticky
  • Rebalancing происходит при добавлении/удалении консьюмера, изменении партиций
  • CooperativeStickyAssignor (Kafka 2.3+) — минимальный простой при ребалансе
  • Static Membership (group.instance.id) — eliminates rebalance при rolling deploy
  • Eager rebalancing = full stop; Cooperative = incremental, processing продолжается
  • Ideal: consumer count == partition count

Частые уточняющие вопросы:

  • Что если консьюмеров больше чем партиций? — Лишние простаивают (idle), потребляют ресурсы.
  • Что такое __consumer_offsets? — Внутренний compact-топик, хранит offsets для всех consumer groups.
  • Чем Cooperative отличается от Eager? — Eager: все останавливаются. Cooperative: только перемещаемые партиции.
  • Зачем нужен group.instance.id? — Static membership: при рестарте тот же консьюмер получает те же партиции без ребаланса.

Красные флаги (НЕ говорить):

  • «Два консьюмера могут читать одну партицию в группе» — только один
  • «Автокоммит — лучший выбор для production» —手动 коммит после обработки
  • «Ребаланс — это быстро» — eager: 5-30 секунд простоя
  • «Range assignor — лучший для production» — неравномерный, используйте CooperativeSticky

Связанные темы:

  • [[6. Как работает балансировка консьюмеров в группе]]
  • [[7. Можно ли иметь больше консьюмеров, чем партиций]]
  • [[8. Что произойдёт при добавлении нового консьюмера в группе]]
  • [[15. Что такое rebalancing и когда он происходит]]
  • [[13. Как работает commit offset]]