Question 5 Β· Section 15

What is Consumer Group

Think of a courier team:

Language versions: English Russian Ukrainian

🟒 Junior Level

What is a Consumer Group?

Consumer Group is a group of consumers that jointly read data from a topic. Each consumer in the group processes its own subset of partitions.

Why: the mechanism that allows multiple consumers to SHARE partitions among themselves so that each message is processed by exactly one consumer in the group. Without a group, each consumer would read all messages (duplication).

Analogy

Think of a courier team:

  • Topic is a warehouse with orders (partitions = different shelves)
  • Consumer Group is one courier team
  • Each courier (consumer) picks orders from their own shelves
  • Two couriers don’t pick orders from the same shelf (avoid duplication)

If a courier leaves (crashes), their shelves are automatically redistributed among the remaining ones β€” this is called rebalancing.

Consumer Group "order-processors":
  Consumer 1 β†’ reads Partition 0
  Consumer 2 β†’ reads Partition 1
  Consumer 3 β†’ reads Partition 2

Basic rules

  • Each partition is processed by only one consumer in the group
  • One consumer can read multiple partitions
  • If there are more consumers than partitions β€” the extra ones are idle

Why are Consumer Groups needed?

  1. Scalability β€” more consumers = more parallelism
  2. Fault tolerance β€” if one consumer crashes, others pick up its partitions
  3. Isolation β€” different groups read the same topic independently

Simple example

// Consumer 1
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Consumer 2 (in the same group)
props.put("group.id", "order-processors");
consumer.subscribe(List.of("orders"));

// Both consumers will read different partitions

🟑 Middle Level

When NOT to use Consumer Group

  1. You need every message read by ALL consumers β€” run different group.id
  2. You need a broadcast pattern (each consumer reads everything) β€” different group.id

Partition balancing

3 partitions, 2 consumers:
  C1 β†’ P0, P1
  C2 β†’ P2

3 partitions, 3 consumers:
  C1 β†’ P0
  C2 β†’ P1
  C3 β†’ P2  ← ideal balance

3 partitions, 5 consumers:
  C1 β†’ P0
  C2 β†’ P1
  C3 β†’ P2
  C4 β†’ idle
  C5 β†’ idle

Partition Assignment Strategies

Strategy How it works Pros Cons
Range Divides partitions by range Simple Uneven with odd numbers
RoundRobin Alternates partitions More even Moves many on rebalance
Sticky Minimizes movement Stability More complex implementation
CooperativeSticky Step-by-step movement Minimal downtime Kafka 2.3+

Range Assignor

Partitions 0,1,2,3,4 β†’ 2 consumers:
  C1: 0, 1, 2  (3 partitions)
  C2: 3, 4     (2 partitions)
  β†’ Uneven!

RoundRobin Assignor

Partitions 0,1,2,3,4 β†’ 2 consumers:
  C1: 0, 2, 4  (3 partitions)
  C2: 1, 3     (2 partitions)
  β†’ Better, but still not ideal

Rebalancing

Occurs when:

  • Consumer added/removed
  • Partitions added to topic
  • Topic subscription changed
  • Consumer session timeout
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync();  // Commit offsets before rebalance
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Initialize new partitions
    }
});

Multiple Consumer Groups

One topic "orders" read by two groups:

Group "order-processing":
  C1 β†’ P0, P1
  C2 β†’ P2, P3

Group "analytics":
  C3 β†’ P0, P1, P2, P3  // reads all data independently

Each group gets a full copy of the data!

Common mistakes

Mistake Consequence Solution
More consumers than partitions Extra ones are idle, waste resources Consumer count == Partition count
No ConsumerRebalanceListener Offset loss on rebalance Always use listener
Same group.id for different applications Share partitions β†’ each gets partial data Unique group.id per application
Long processing between poll Excluded from group β†’ rebalance Reduce max.poll.records or optimize

πŸ”΄ Senior Level

Group Coordinator Protocol β€” internals

Group Coordinator is one broker in the cluster, chosen by the controller. Responsible for:

  • Group membership management (Join/Leave)
  • Partition assignment
  • Storing committed offsets in __consumer_offsets
Group protocol (Kafka < 3.0, Consumer Protocol v0):

1. JoinGroup Request:
   Consumer β†’ Coordinator: "I want to join the group"
   Coordinator β†’ Leader: "Here's the list of all members"

2. Leader computes assignment:
   Leader (one of the consumers) runs PartitionAssignor
   Result: Map<memberId, List<TopicPartition>>

3. SyncGroup Request:
   Leader β†’ Coordinator: assignment for all members
   Coordinator β†’ each Consumer: their personal assignment

4. Heartbeat:
   Each Consumer β†’ Coordinator: "I'm alive" (every heartbeat.interval.ms)

5. LeaveGroup:
   Consumer β†’ Coordinator: "I'm leaving"
   β†’ Trigger rebalance

Kafka 3.0+ Consumer Protocol v1 (KIP-848):

KIP (Kafka Improvement Proposal) β€” process for proposing Kafka improvements.
KIP-848 β€” proposal for server-side assignment.
Group Coordinator β€” special broker that manages group membership.

New heartbeat-based RPC protocol:
- Assignment computed on Coordinator (not on Leader)
- Eliminates SyncGroup round-trip
- Supports Server-Side Assignment
- Faster rebalance (1-2 RTT less)

__consumer_offsets β€” Internal Topic

Topic: __consumer_offsets
Partitions: 50 (default, offsets.topic.num.partitions)
RF: 3 (default)
Cleanup: compact (keeps only the latest offset for each group)

Key format: [group.id, topic, partition]
Value format: [offset, metadata, commitTimestamp, expireTimestamp]

Example:
  Key: ["order-processors", "orders", 0]
  Value: [offset=42567, metadata="", timestamp=1712345678]

Offset commit:

// Auto commit (enable.auto.commit=true)
// Commits every auto.commit.interval.ms (5 seconds by default)
// Risk: messages may be lost on crash between commit and processing

// Manual commit (enable.auto.commit=false)
consumer.commitSync();      // Synchronous β€” blocks until confirmed
consumer.commitAsync();     // Asynchronous β€” non-blocking, retry callback

Sticky Assignor β€” algorithm

Sticky Assignor algorithm (Kafka 2.3+):

1. Current assignment: C1={P0,P1}, C2={P2}
2. New consumer C3 joins
3. Sticky preserves current assignments:
   C1: {P0, P1} β†’ keeps (no move)
   C2: {P2} β†’ keeps (no move)
4. Remaining partitions distributed:
   C3: {} β†’ gets free partitions
   If no partitions β†’ C3 = idle

Advantages over Range/RoundRobin:

  • Minimizes partition movement (less disruption)
  • Preserves data locality (page cache warm)
  • Less reprocessing on rebalance

Cooperative Rebalancing (KIP-429, Kafka 2.3+)

Eager Rebalancing (old):

1. Trigger rebalance
2. ALL consumers revoke ALL partitions
3. Stop processing
4. Compute new assignment
5. Resume processing
β†’ Full stop of entire group for 5-30 seconds

Cooperative Rebalancing (new):

1. Trigger rebalance
2. Each consumer revokes ONLY some partitions
3. Continue processing with remaining partitions
4. Incremental assignment for revoked partitions
5. Repeat until stable
β†’ Minimal downtime, processing continues
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Static Membership (KIP-345)

props.put("group.instance.id", "consumer-1");  // Unique instance ID

Behavior:

  • On restart, consumer with the same group.instance.id does not trigger rebalance
  • Coordinator waits session.timeout.ms before exclusion
  • On return β€” gets the same partitions
  • Reduces churn during rolling deploy

Comparison: | | Dynamic Membership | Static Membership | | —————– | β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” | β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”- | | group.instance.id | Not set | Set | | Restart | Rebalance | No rebalance (within session.timeout) | | Rolling deploy | N rebalances for N instances | 0 rebalances | | Crash detection | session.timeout.ms | session.timeout.ms |

Edge Cases (3+)

  1. Split-Brain Consumer Group: On network partition, consumers may lose connection to coordinator. If session.timeout.ms expires, coordinator excludes them and triggers rebalance. On network recovery, β€œold” consumers think they’re still in the group, while β€œnew” ones already got their partitions. Result: duplicate processing β€” two consumers read the same partition. Solution: session.timeout.ms should be large enough, use static membership.

  2. Offset Commit Race Condition: Consumer A reads offset 100-150, starts processing. Rebalance β†’ partition moves to Consumer B. Consumer A finishes processing and commits offset 151. Consumer B starts from offset 151, but messages 100-150 weren’t yet processed by Consumer A (processing was async). Solution: commitSync in onPartitionsRevoked + blocking processing.

  3. __consumer_offsets Partition Unavailable: If the __consumer_offsets partition holding group offsets becomes unavailable (leader down, ISR < min.insync.replicas), consumers cannot commit offsets. They continue reading, but on restart will start from the last saved offset β†’ possible duplication. With min.insync.replicas=2 and RF=3 this is unlikely, but possible with double failure.

  4. Max Poll Records + Slow Processing: max.poll.records=500, processing one message takes 1 second. Processing 500 messages = 500 seconds. max.poll.interval.ms=300000 (5 minutes). Consumer doesn’t call poll() within 5 minutes β†’ coordinator excludes β†’ rebalance. Solution: Reduce max.poll.records or increase max.poll.interval.ms.

  5. Consumer Group with 0 active consumers: Group exists in __consumer_offsets, but all consumers are offline. When a new consumer starts, it gets all partitions and starts from the last committed offset. If offsets are old (retention expired), it starts from auto.offset.reset (earliest/latest). This can cause massive replay or data skip.

Performance Numbers

Metric Value Conditions
Rebalance time (eager) 5-30 seconds Depends on group size
Rebalance time (cooperative) 1-5 seconds Step-by-step movement
Heartbeat interval 3-10 seconds 1/3 of session.timeout.ms
Offset commit latency 1-5 ms Synchronous, local broker
__consumer_offsets throughput ~10K commits/sec 50 partitions
Max consumers per group Limited by partitions Practically: hundreds

Production War Story

Situation: Payment processor with 15 consumers in group payment-processors (15 partitions). Used Range assignor and eager rebalancing. During rolling deploy (sequential restart of all 15 pods in Kubernetes), each pod triggered a rebalance.

Problem: 15 sequential rebalances took 15 Γ— 15 seconds = 225 seconds of downtime. During this time, lag grew to 200K messages. After recovery, some payments were processed twice (offset commit race condition).

Diagnosis:

kafka-consumer-groups.sh --describe --group payment-processors
# Showed: STATE=Dead for 225 seconds, then STATE=Empty β†’ Rebalancing
# Consumer lag: 0 β†’ 200K β†’ 0 (after catchup with duplicates)

Solution:

  1. Switched to CooperativeStickyAssignor β€” rebalance became 2-3 seconds
  2. Introduced group.instance.id=${POD_NAME} β€” no rebalances during rolling deploy
  3. Added ConsumerRebalanceListener with commitSync in onPartitionsRevoked
  4. Configured Pod Disruption Budget in K8s β€” max 1 pod down at a time
  5. Increased session.timeout.ms to 45 seconds, heartbeat.interval.ms to 15 seconds

Result: Rolling deploy β€” 0 rebalances, 0 downtime, 0 duplicates.

Lesson: Eager rebalancing + rolling deploy = cascading failure. Always use cooperative rebalancing + static membership for production.

Monitoring (JMX + Burrow)

JMX metrics:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-metrics,client-id=*,key=commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,key=records-lag-max

Burrow:

  • Consumer group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend analysis
  • HTTP API β†’ Grafana dashboard β†’ PagerDuty alerts
  • Requires no consumer modifications (reads __consumer_offsets directly)

Highload Best Practices

  1. Consumer count == Partition count β€” ideal balance
  2. CooperativeStickyAssignor β€” standard for production (Kafka 2.3+)
  3. Static membership (group.instance.id) β€” for rolling deploy stability
  4. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  5. ConsumerRebalanceListener is mandatory β€” commitSync + state cleanup
  6. Monitor rebalance frequency β€” alert on > 1 rebalance per 10 minutes
  7. enable.auto.commit=false β€” always commit after processing
  8. Tune max.poll.records to processing latency (goal: poll < max.poll.interval.ms)
  9. Burrow for monitoring β€” consumer-centric, no instrumentation needed
  10. KIP-848 (Server-Side Assignment) β€” use when migrating to Kafka 3.0+

🎯 Interview Cheat Sheet

Must know:

  • Consumer Group β€” group of consumers jointly reading a topic; each partition = one consumer
  • __consumer_offsets β€” internal topic for storing offsets (compact)
  • Partition Assignment Strategies: Range, RoundRobin, Sticky, CooperativeSticky
  • Rebalancing occurs on consumer add/remove, partition changes
  • CooperativeStickyAssignor (Kafka 2.3+) β€” minimal downtime on rebalance
  • Static Membership (group.instance.id) β€” eliminates rebalance on rolling deploy
  • Eager rebalancing = full stop; Cooperative = incremental, processing continues
  • Ideal: consumer count == partition count

Common follow-up questions:

  • What if there are more consumers than partitions? β€” Extra ones are idle, consuming resources.
  • What is __consumer_offsets? β€” Internal compact topic, stores offsets for all consumer groups.
  • How does Cooperative differ from Eager? β€” Eager: all stop. Cooperative: only moved partitions.
  • Why is group.instance.id needed? β€” Static membership: on restart, the same consumer gets the same partitions without rebalance.

Red flags (DO NOT say):

  • β€œTwo consumers can read one partition in a group” β€” only one
  • β€œAuto commit is the best choice for production” β€” manual commit after processing
  • β€œRebalancing is fast” β€” eager: 5-30 seconds downtime
  • β€œRange assignor is best for production” β€” uneven, use CooperativeSticky

Related topics:

  • [[6. How does consumer balancing work in a group]]
  • [[7. Can you have more consumers than partitions]]
  • [[8. What happens when you add a new consumer to the group]]
  • [[15. What is rebalancing and when does it happen]]
  • [[13. How does offset commit work]]