Question 6 Β· Section 15

How does consumer balancing work in a group

Think of a pizzeria where you need to slice pizza (partitions) among friends (consumers):

Language versions: English Russian Ukrainian

🟒 Junior Level

What is balancing?

Balancing is the automatic process of distributing partitions among consumers in a group. Kafka itself decides which consumer reads which partitions.

Why: without it, you’d have to manually assign partitions to each consumer. When the number of consumers or partitions changes β€” everything is recalculated automatically.

Analogy

Think of a pizzeria where you need to slice pizza (partitions) among friends (consumers):

  • 3 slices, 3 friends β†’ each gets 1 slice
  • 3 slices, 2 friends β†’ one gets 2 slices, the other gets 1
  • 3 slices, 5 friends β†’ 3 get a slice each, 2 go hungry

Basic rule

Number of active consumers <= Number of partitions

Example

3 partitions, 3 consumers:
  C1 β†’ P0
  C2 β†’ P1
  C3 β†’ P2  ← ideal balance

Added C4:
  C1 β†’ P0
  C2 β†’ P1
  C3 β†’ P2
  C4 β†’ waiting (no free partitions)

How it works in code

// All consumers with the same group.id
props.put("group.id", "my-group");
consumer.subscribe(List.of("orders"));
// Kafka will automatically distribute partitions

🟑 Middle Level

Balancing strategies

Strategy Algorithm When to use
Range Divides partitions by range Works when partition count is a multiple of consumer count. With odd division β€” unevenness. For production with dynamic scaling, CooperativeSticky is better.
RoundRobin Alternates partitions When you need uniformity
Sticky Minimizes movement Production (Kafka 2.2+)
CooperativeSticky Step-by-step movement Production (Kafka 2.3+, recommended)

Range Assignor β€” in detail

Partitions 0,1,2,3 β†’ 2 consumers:
  C1: 0, 1  (2 partitions)
  C2: 2, 3  (2 partitions)
  β†’ Even

Partitions 0,1,2,3,4 β†’ 2 consumers:
  C1: 0, 1, 2  (3 partitions)
  C2: 3, 4     (2 partitions)
  β†’ Uneven! (Range = numPartitions / numConsumers)

RoundRobin Assignor

Partitions 0,1,2,3,4 β†’ 2 consumers:
  C1: 0, 2, 4  (3 partitions)
  C2: 1, 3     (2 partitions)
  β†’ Slightly better with odd numbers

Rebalancing process

1. Trigger event (new consumer, crash, timeout)
2. Group coordinator starts rebalance
3. All consumers receive notification
4. All consumers stop reading (eager)
   or continue with some partitions (cooperative)
5. New assignment is computed
6. Assignment is distributed to consumers
7. Consumers start reading new partitions

ConsumerRebalanceListener

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 1. Commit offsets before losing partitions
        consumer.commitSync();
        // 2. Clean up local resources
        cleanup(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 3. Initialize new partitions
        initialize(partitions);
    }
});

Chaos Rebalancing Problem

session.timeout.ms=10000, batch processing = 15 seconds,
max.poll.interval.ms=300000. Consumer can't call poll() in time β†’ excluded β†’
rebalance β†’ load on remaining increases β†’ another one crashes β†’ avalanche.

Consumers crash β†’ rebalance β†’ new consumers crash β†’ cycle

Causes:
- session.timeout.ms too short
- Long processing (max.poll.interval.ms exceeded)
- Memory/CPU issues

Solution:
- Increase timeouts
- Optimize processing
- Add monitoring

Strategy comparison

Criteria Range RoundRobin Sticky CooperativeSticky
Uniformity ⚠️ βœ… βœ… βœ…
Minimize movement ❌ ❌ βœ… βœ…
Downtime on rebalance Full Full Full Partial
Kafka version All All 2.2+ 2.3+
Server-Side Assignment (KIP-848) 3.0+ Eliminates SyncGroup round-trip, faster rebalance Β  Β 

Common mistakes

Mistake Consequence Solution
Frequent rebalances Constant system downtime Increase timeouts, static membership
No ConsumerRebalanceListener Offset loss on rebalance Always use listener
Uneven distribution One consumer is overloaded CooperativeSticky assignor
Range assignor in production Unevenness + full downtime Switch to CooperativeSticky

πŸ”΄ Senior Level

Group Coordinator Protocol β€” full diagram

Phase 1: JoinGroup
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer A  β”‚          β”‚ Group Coordinatorβ”‚          β”‚  Consumer B  β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚ JoinGroup Request          β”‚                          β”‚
       β”‚ (group.id, memberId)       β”‚                          β”‚
       │───────────────────────────►│                          β”‚
       β”‚                            β”‚ JoinGroup Request        β”‚
       β”‚                            │◄─────────────────────────│
       β”‚                            β”‚                          β”‚
       β”‚ Leader chosen (first       β”‚                          β”‚
       β”‚ in member list)            β”‚                          β”‚
       β”‚                            β”‚                          β”‚
       │◄───────────────────────────│ JoinGroup Response       β”‚
       β”‚ (leaderId, allMembers)     β”‚ (memberId, leaderId)     β”‚
       β”‚                            │─────────────────────────►│

Phase 2: Assignment (on Leader)
       β”‚                            β”‚                          β”‚
       β”‚ Leader computes            β”‚                          β”‚
       β”‚ assignment via             β”‚                          β”‚
       β”‚ PartitionAssignor          β”‚                          β”‚
       β”‚ assignment = {             β”‚                          β”‚
       β”‚   A: [P0, P1],             β”‚                          β”‚
       β”‚   B: [P2]                  β”‚                          β”‚
       β”‚ }                          β”‚                          β”‚

Phase 3: SyncGroup
       β”‚                            β”‚                          β”‚
       β”‚ SyncGroup Request          β”‚                          β”‚
       β”‚ (assignment for all)       β”‚                          β”‚
       │───────────────────────────►│                          β”‚
       β”‚                            β”‚                          β”‚
       β”‚                            │◄─────────────────────────│ SyncGroup Response
       β”‚                            β”‚                          β”‚ (own assignment)
       │◄───────────────────────────│                          β”‚
       β”‚ SyncGroup Response         β”‚                          β”‚
       β”‚ (assignment: P0, P1)       β”‚                          β”‚

Kafka 3.0+ (KIP-848): Assignment is computed on Coordinator, not Leader. Eliminates SyncGroup round-trip.

Eager vs Cooperative Rebalancing β€” internals

Eager Rebalancing:

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator
// onJoinComplete:
//   1. onPartitionsRevoked(ALL partitions)  // Stop EVERYTHING
//   2. newAssignment = computeAssignment()
//   3. onPartitionsAssigned(newAssignment)  // Start NEW
// Full stop = full stop

Cooperative Rebalancing:

// CooperativeStickyAssignor:
//   1. onPartitionsRevoked(SUBSET partitions)  // Only those leaving
//   2. onPartitionsAssigned(SUBSET partitions)  // Only new ones
//   3. Continue working with remaining partitions

// Algorithm:
//   currentAssignment = {P0, P1, P2}
//   newAssignment = {P0, P3}  // P1 and P2 left, P3 added
//   revoked = {P1, P2}         // Only these
//   assigned = {P3}            // Only these
//   continue processing P0     // Don't stop!

Incremental Cooperative Rebalancing:

With many changes:
  Round 1: revoke P1, assign P3
  Round 2: revoke P2, assign P4
  ...
  Each round β€” short (usually < 1 second)
  Processing continues on remaining partitions

Static Membership β€” how it works internally

props.put("group.instance.id", "consumer-1");

Protocol:

Dynamic consumer (no group.instance.id):
  Join β†’ assigned memberId = UUID β†’ restart = new memberId β†’ rebalance

Static consumer (with group.instance.id):
  Join β†’ assigned memberId = group.instance.id
  β†’ restart = same memberId β†’ NO rebalance (within session.timeout)
  β†’ Coordinator waits session.timeout.ms before exclusion

Conditions for no rebalance:

  • Consumer returns within session.timeout.ms
  • group.instance.id is not used by another consumer
  • Partition count hasn’t changed

Duplicate group.instance.id:

If two consumers have the same group.instance.id:
  Second one gets FencedInstanceIdException
  β†’ Second consumer terminates
  β†’ This prevents duplicate processing

Production Configuration

# Optimal production configuration
group.id: order-processors
group.instance.id: consumer-${HOSTNAME}     # Static membership
partition.assignment.strategy: cooperative-sticky
session.timeout.ms: 30000                    # 30 seconds
heartbeat.interval.ms: 10000                 # 1/3 of session timeout
max.poll.interval.ms: 300000                 # 5 minutes
max.poll.records: 500                        # batch for processing
enable.auto.commit: false                    # Manual commit

Edge Cases (3+)

  1. Cascading Rebalance (Thundering Herd): One consumer crashes β†’ rebalance β†’ remaining consumers get more partitions β†’ load increases β†’ another consumer crashes from overload β†’ another rebalance β†’ avalanche. Solution: Increase session.timeout.ms, use max.poll.records for load control, set up monitoring alerts on consumer lag growth rate.

  2. Rebalance during Deployment: With rolling deploy of N instances and eager rebalancing, N rebalances occur. Each rebalance = 5-30 seconds downtime. For 20 instances = 100-600 seconds downtime. Solution: Static membership + cooperative rebalancing = 0 rebalances during rolling deploy.

  3. Partition Stall after Rebalance: After rebalance, a new consumer gets a partition and starts from the committed offset. If processing is stateful (e.g., windowed aggregation), the new consumer has no local state β†’ incorrect results until full recalculation. Solution: External state store (RocksDB, Redis) keyed by partition; or state replay on assignment.

  4. Cross-Rack Rebalance: In a multi-rack cluster, when assigning partitions to a consumer in a different rack, latency increases (network round-trip). Range/RoundRobin don’t account for rack-awareness. Solution: Custom PartitionAssignor with rack-aware assignment; or Consumer Sidecar in the same rack as brokers.

  5. Rebalance Storm with K8s HPA: Horizontal Pod Autoscaler adds pods on CPU increase β†’ new consumers trigger rebalance β†’ processing slows down (rebalance overhead) β†’ CPU rises even more β†’ HPA adds more pods β†’ infinite loop. Solution: Don’t use HPA for Kafka consumers. Instead β€” custom metrics (consumer lag) with thresholds and cooldown period.

Performance Numbers

Metric Eager Cooperative
Rebalance latency 5-30 seconds 1-5 seconds
Processing during rebalance Full stop Continues (partial)
Partition movement All Incremental
Impact on consumer lag High (2-10x spike) Low (1.2-2x spike)
Number of rebalances per rolling deploy N (per instance) 0 (with static membership)

Production War Story

Situation: Streaming service with 25 consumers in group event-processors (25 partitions). Range assignor, dynamic membership. Every night at 02:00 β€” automatic rolling deploy (container update).

Problem: 25 sequential rebalances Γ— 15 seconds = 375 seconds (6+ minutes) of downtime every night. During this time, lag grew to 500K events. Morning users saw β€œlagging” recommendations.

Additional problem: Range assignor with 25 partitions and 24 consumers (one on rebalance) assigned 2 partitions to one consumer and 1 to the rest. The overloaded consumer crashed β†’ another rebalance β†’ another crash β†’ cascading failure.

Diagnosis:

kafka-consumer-groups.sh --describe --group event-processors
# STATE=Rebalancing β†’ STATE=Stable β†’ STATE=Rebalancing (cycle)
# Lag: 0 β†’ 500K β†’ 0 (within 15 minutes after deploy)

Solution:

  1. CooperativeStickyAssignor β€” rebalance became 2 seconds instead of 15
  2. group.instance.id=${POD_NAME} β€” rolling deploy = 0 rebalances
  3. session.timeout.ms=45000, heartbeat.interval.ms=15000 β€” timeout margin
  4. ConsumerRebalanceListener with commitSync + state flush
  5. K8s PodDisruptionBudget: maxUnavailable=1
  6. Lag-based HPA instead of CPU-based (custom metric via Prometheus Adapter)

Result: Rolling deploy β€” 0 rebalances, 0 downtime, lag < 1K.

Lesson: Eager rebalancing + dynamic membership + rolling deploy = guaranteed cascading failure. Cooperative + static membership = zero-downtime deploys.

Monitoring (JMX + Burrow)

JMX metrics:

kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=rebalance-rate-avg
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-rebalance-seconds-ago
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=assigned-partitions
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=failed-rebalance-total
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,key=last-heartbeat-seconds-ago

Burrow:

  • Group status: OK, WARN, ERR, STOP, STALL
  • Per-partition lag with trend
  • HTTP API β†’ Grafana β†’ Alertmanager

Alert rules:

- alert: KafkaRebalanceTooFrequent
  expr: rate(kafka_consumer_coordinator_rebalance_rate[5m]) > 0.1
  for: 5m
  severity: warning

- alert: KafkaConsumerStalled
  expr: kafka_consumer_records_lag_max > 100000
  for: 10m
  severity: critical

Highload Best Practices

  1. CooperativeStickyAssignor β€” standard for production (Kafka 2.3+)
  2. Static membership (group.instance.id) β€” eliminates rebalance on rolling deploy
  3. Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3
  4. ConsumerRebalanceListener β€” commitSync + state cleanup in onPartitionsRevoked
  5. Monitor rebalance frequency β€” alert on > 1 rebalance per 10 minutes
  6. Tune max.poll.records to latency β€” goal: processing < max.poll.interval.ms
  7. Don’t use CPU-based HPA β€” use custom metric (consumer lag)
  8. PodDisruptionBudget β€” maxUnavailable=1 to prevent cascading rebalances
  9. Cross-rack awareness β€” custom assignor for multi-rack clusters
  10. Stateful processing β€” external state store (RocksDB, Redis) keyed by partition

🎯 Interview Cheat Sheet

Must know:

  • Balancing = automatic distribution of partitions among consumers in a group
  • Strategies: Range (uneven), RoundRobin, Sticky, CooperativeSticky (recommended)
  • Cooperative rebalancing: step-by-step movement, processing continues on remaining partitions
  • ConsumerRebalanceListener is mandatory: commitSync in onPartitionsRevoked
  • Static membership (group.instance.id) β€” 0 rebalances on rolling deploy
  • Chaos rebalance: one crashes β†’ load increases β†’ another crashes β†’ avalanche
  • Heartbeat tuning: heartbeat.interval.ms = session.timeout.ms / 3

Common follow-up questions:

  • What is cascading rebalance? β€” One consumer crashes β†’ load on others β†’ another crashes β†’ avalanche.
  • Why is CPU-based HPA bad for Kafka? β€” Rebalance slows processing β†’ CPU rises β†’ more pods β†’ infinite loop.
  • What does ConsumerRebalanceListener do? β€” Commits offsets and cleans state before losing partitions.
  • How to avoid rebalance on deploy? β€” Static membership + CooperativeStickyAssignor.

Red flags (DO NOT say):

  • β€œRange assignor is standard for production” β€” uneven, use CooperativeSticky
  • β€œMore consumers = always more throughput” β€” limited by partition count
  • β€œRebalancing is instant” β€” 5-30 seconds (eager), 1-5 (cooperative)
  • β€œYou can ignore session.timeout.ms” β€” too short = false rebalances

Related topics:

  • [[5. What is Consumer Group]]
  • [[8. What happens when you add a new consumer to the group]]
  • [[15. What is rebalancing and when does it happen]]
  • [[7. Can you have more consumers than partitions]]