Question 29 · Section 15

Can you read messages from a specific partition in Kafka

Imagine a restaurant with multiple chefs (partitions).

Language versions: English Russian Ukrainian

Junior Level

Simple Definition

Yes, you can! A Kafka Consumer can read messages from a specific partition instead of all at once. This is done via the assign() method instead of subscribe().

Analogy

Imagine a restaurant with multiple chefs (partitions).

subscribe() — you sit at a communal table and eat whatever any chef brings (the waiter distributes).

assign() — you choose which chefs you want: “From chef 1 and 3, but not 2.”

Code Example

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// assign() — manual assignment of specific partitions
TopicPartition partition = new TopicPartition("orders", 0);  // topic "orders", partition 0
consumer.assign(List.of(partition));

// Reading ONLY from partition 0
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Offset: " + record.offset());
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());
    }
}

subscribe() vs assign()

// subscribe() — automatic assignment via consumer group
consumer.subscribe(List.of("orders"));
// Kafka itself decides which partitions to give this consumer

// assign() — manual assignment
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0, p1));
// You decide which partitions to read

When to Use

  • Testing — read messages from a specific partition
  • Replay — re-read messages from a certain offset
  • Debugging — investigate a problematic partition
  • Specific tasks — when the logic requires a specific partition

Middle Level

subscribe() vs assign() — Detailed Comparison

Aspect subscribe() assign()
Consumer group Uses consumer group Ignores consumer group
Rebalancing Automatic No rebalancing — if a consumer with assign() crashes, its partitions are NOT reassigned to other consumers. Processing for those partitions stops entirely until restart.
Offset management Automatic (group-based) Manual (you handle it yourself)
Fault tolerance High (rebalancing on failure) Low (manual handling)
Scalability Automatic Manual
Use case Production processing Testing, debugging, special tasks

assign() — How It Works

// 1. Create TopicPartition object
TopicPartition partition = new TopicPartition("orders", 0);

// 2. Assign to consumer
consumer.assign(List.of(partition));

// 3. (Optional) Set offset
consumer.seek(partition, 1000);           // from offset 1000
consumer.seekToBeginning(List.of(partition));  // from the beginning
consumer.seekToEnd(List.of(partition));        // from the end

// 4. Read
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Only from partition 0

Seek — Random Access

TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Read from a specific offset
consumer.seek(partition, 5000);

// Read from the beginning
consumer.seekToBeginning(List.of(partition));

// Read from the end (new messages)
consumer.seekToEnd(List.of(partition));

// Read from a timestamp
long timestamp = System.currentTimeMillis() - 3600_000;  // 1 hour ago
Map<TopicPartition, Long> timestamps = Map.of(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
OffsetAndTimestamp offsetAndTs = offsets.get(partition);
if (offsetAndTs != null) {
    consumer.seek(partition, offsetAndTs.offset());
}

Common Errors Table

Error Symptoms Consequences Solution
assign() + subscribe() together Conflicting behavior Unpredictable behavior Use one or the other
Without manual offset management On restart — reads from start Duplicate processing Save offsets externally
assign() for production processing No rebalancing Consumer failure = data stall Use subscribe() for production
assign() without consumer group id Group.id is ignored OK, but confusing group.id not needed for assign()
Seek without assign() IllegalStateException Exception assign() first, then seek()
Reading non-existent partition Exception or empty results Error Check partition exists

When assign() is Useful

✅ Testing: check a specific partition
✅ Replay: re-read messages from the beginning
✅ Debugging: investigate a specific partition
✅ Data migration: copy a partition to another cluster
✅ Backfill: process historical data from a specific partition
✅ Audit: check data in a specific partition

When NOT to Use assign()

  • Production processing — no fault tolerance
  • Auto-scaling — no rebalancing
  • Consumer groups — assign() ignores groups
  • When load balancing is needed — subscribe() does this automatically

Senior Level

Deep Internals

assign() vs subscribe() — Internal Implementation

// KafkaConsumer internals

subscribe():
  consumer.coordinator  GroupCoordinator protocol
  consumer.subscriptions  dynamic (changes on rebalance)
  consumer.assignment  computed by GroupCoordinator

  Flow:
    1. consumer.subscribe(topics)  JoinGroup request
    2. GroupCoordinator  leader election
    3. Leader computes assignment (PartitionAssignor)
    4. SyncGroup response  each consumer gets its partitions
    5. consumer.onPartitionsAssigned() callback
    6. consumer.assignment  computed assignment

assign():
  consumer.coordinator  NOT used
  consumer.subscriptions  static (user-defined)
  consumer.assignment  direct user input

  Flow:
    1. consumer.assign(partitions)  direct assignment
    2. No JoinGroup, no coordinator, no rebalancing
    3. consumer.assignment  user-provided partitions
    4. consumer.position(partition)  fetch from __consumer_offsets (if group.id set)
       OR  beginning (if auto.offset.reset = earliest)

Offset Management with assign()

// assign() does NOT automatically manage offsets
// You must track offsets yourself

// Option 1: Use group.id (offsets in __consumer_offsets)
props.put("group.id", "my-consumer");
consumer.assign(List.of(partition));
// Offsets can still be committed!
consumer.commitSync();  // saves to __consumer_offsets

// Option 2: External offset storage
Map<TopicPartition, Long> savedOffsets = loadOffsetsFromDB();
consumer.assign(List.of(partition));
for (Map.Entry<TopicPartition, Long> entry : savedOffsets.entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue());
}
// After processing:
saveOffsetsToDB(partition, currentOffset);

// Option 3: No offset tracking
consumer.assign(List.of(partition));
consumer.seekToBeginning(List.of(partition));
// Each run — reads from the beginning

ConsumerRebalanceListener — Only for subscribe()

// subscribe() + ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Called on rebalancing
        consumer.commitSync();
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Called when receiving partitions
        initializeState(partitions);
    }
});

// assign() — ConsumerRebalanceListener is NOT CALLED
// No rebalancing → no callbacks
// You are fully responsible for the lifecycle

Trade-offs

Aspect subscribe() assign()
Complexity Low (automatic) High (manual management)
Fault tolerance High Low
Control Low Maximum
Operational overhead Low High
Suitable for production Yes Rarely
Suitable for testing No (can’t choose partition) Yes

Edge Cases

1. assign() and consumer group offsets:

// Scenario: group.id is set, assign() is used
props.put("group.id", "order-service");
consumer.assign(List.of(new TopicPartition("orders", 0)));

// On first poll:
// consumer.position(partition) → checks __consumer_offsets
// If committed offset exists → continues from it
// If not → auto.offset.reset applies (earliest/latest)

// This means assign() CAN use committed offsets!
// BUT: doesn't commit automatically, needs consumer.commitSync()

2. Mixed assign() and subscribe() — undefined behavior:

// DON'T DO THIS:
consumer.subscribe(List.of("orders"));
consumer.assign(List.of(new TopicPartition("orders", 0)));

// Result:
// subscribe() registration may be overwritten
// assign() may conflict with group assignment
// Behavior: undefined, depends on Kafka version

3. Seek on a non-assigned partition:

TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0));

// Exception!
consumer.seek(p1, 1000);
// IllegalStateException: You can only check the position for partitions assigned to this consumer.

4. assign() with dynamic partition changes:

// assign() — static assignment
// If new partitions are added to the topic — assign() does NOT see them

Topic: orders (3 partitions: 0, 1, 2)
consumer.assign(List.of(p0, p1, p2));

// Admin adds partition 3 to topic "orders"
// consumer continues reading only p0, p1, p2
// partition 3 — not assigned, not read

// Resolution: periodic re-assignment
while (running) {
    Set<TopicPartition> currentAssignment = consumer.assignment();
    Set<TopicPartition> desiredAssignment = getPartitionsForTopic("orders");
    if (!currentAssignment.equals(desiredAssignment)) {
        consumer.assign(desiredAssignment);
    }
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // process...
}

5. assign() with manual offset commit — exactly-once pattern:

// assign() + manual offset = full control
props.put("group.id", "order-service");
props.put("enable.auto.commit", "false");

TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Load saved offset
long savedOffset = loadOffsetFromDB(partition);
consumer.seek(partition, savedOffset);

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processWithTransaction(record);  // DB write + offset in same transaction
    }
    // Offset committed in same DB transaction as processing
    // Exactly-once semantics!
}

6. Reading from a specific partition for debugging:

// Investigation: partition 2 has higher lag
TopicPartition suspiciousPartition = new TopicPartition("orders", 2);
consumer.assign(List.of(suspiciousPartition));
consumer.seekToBeginning(List.of(suspiciousPartition));

// Read all messages from partition 2
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) break;

    for (ConsumerRecord<String, String> record : records) {
        count++;
        System.out.printf("Offset %d: key=%s, value=%s%n",
                         record.offset(), record.key(), record.value());
        if (count > 10000) break;  // limit for safety
    }
}
System.out.println("Total messages in partition 2: " + count);

Performance (Production Numbers)

Scenario Throughput Latency Reliability
subscribe() (10 consumers, 30 partitions) 100K msg/s 10ms High
assign() (1 consumer, 1 partition) 10K msg/s 5ms Low
assign() (1 consumer, 30 partitions) 100K msg/s 5ms Low
subscribe() (1 consumer, 1 partition) 10K msg/s 10ms High

Production War Story

Situation: E-commerce, 10 partitions, 10 consumers (subscribe). One consumer started falling behind — lag grew only on partition 7.

Investigation:

# Check which consumer has partition 7
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-service

GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
order-service   orders   7          500000          550000          50000  consumer-7-abc123

Root cause: Consumer-7 processed messages slower — it had a slow DB connection.

Debugging with assign():

// assign() to partition 7 for detailed analysis
TopicPartition p7 = new TopicPartition("orders", 7);
consumer.assign(List.of(p7));
consumer.seek(p7, 500000);  // start from lag offset

// Analyze messages
Map<String, Integer> keyDistribution = new HashMap<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) break;

    for (ConsumerRecord<String, String> record : records) {
        String key = record.key();
        keyDistribution.merge(key, 1, Integer::sum);
    }
}

// Discovered: 80% of messages had key "user-42"
// → All messages for user-42 in one partition (hash of key)
// → One user = hot key = single consumer bottleneck

Resolution: Custom partitioner for better key distribution, not relying on default hash.

Monitoring (JMX, Prometheus, Burrow)

JMX metrics (consumer):

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
  - records-lag-max: max lag across assigned partitions
  - records-lag-avg: average lag
  - bytes-consumed-rate
  - records-consumed-rate

Prometheus:

- alert: KafkaConsumerSinglePartitionLag
  expr: kafka_consumergroup_lag > 50000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Single partition lag > 50K  potential hot key"

- alert: KafkaConsumerPartitionImbalance
  expr: stddev(kafka_consumergroup_lag) by (group) > 10000
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "High variance in partition lag  uneven distribution"

Burrow:

GET /v3/kafka/production/consumer/order-service/status

Shows lag per partition:
  partition 0: lag 100   OK
  partition 1: lag 200   OK
  ...
  partition 7: lag 50000 ERROR  ← hot partition!

Highload Best Practices

✅ subscribe() for production processing
✅ assign() for testing, debugging, replay
✅ ConsumerRebalanceListener for subscribe()
✅ External offset storage for assign()
✅ assign() + seek() for targeted debugging
✅ Monitor lag per partition (hot key detection)
✅ Custom partitioner for even distribution

❌ assign() for production (no fault tolerance)
❌ Mixed subscribe() + assign() (undefined behavior)
❌ assign() without offset management (duplicate processing on restart)
❌ Seek on non-assigned partition (IllegalStateException)
❌ assign() with auto-scaling (no rebalancing)
❌ Ignoring partition lag imbalance (hot key detection)

Architectural Decisions

  1. subscribe() = production default — auto-scaling, fault tolerance, rebalancing
  2. assign() = specialist tool — testing, debugging, replay, migration
  3. assign() + external offset = exactly-once with transactional processing
  4. Hot key detection — partition lag imbalance → partitioner issue
  5. Seek + assign = powerful debugging and data investigation tool

Summary for Senior

  • assign() — direct control over partition assignment, without consumer group management
  • subscribe() — automatic assignment via GroupCoordinator, with rebalancing
  • assign() can use committed offsets if group.id is set
  • assign() does NOT trigger ConsumerRebalanceListener — you are fully responsible
  • Mixed subscribe() + assign() = undefined behavior — don’t do it
  • assign() + seek() = powerful tool for debugging and investigation
  • Hot key detection through partition lag monitoring
  • Production processing: subscribe() — almost always the correct choice
  • assign() — specialist tool for testing, replay, debugging, migration

🎯 Interview Cheat Sheet

Must know:

  • assign() — manual assignment of specific partitions; subscribe() — automatic via consumer group
  • assign() ignores consumer group: no rebalancing, no fault tolerance
  • If a consumer with assign() crashes — its partitions are NOT reassigned, processing stops
  • Seek: read from any offset, beginning, end, or by timestamp
  • assign() can use committed offsets if group.id is set
  • Mixed subscribe() + assign() = undefined behavior — don’t do it
  • Production: subscribe() (fault tolerance); assign() — testing, debugging, replay

Common follow-up questions:

  • Can you commit offsets with assign()? — Yes, if group.id is set, but manually (commitSync).
  • What happens on seek to a non-assigned partition? — IllegalStateException.
  • Does assign() see new partitions when added? — No, needs periodic re-assignment.
  • Does ConsumerRebalanceListener work with assign()? — No, no rebalancing → no callbacks.

Red flags (DO NOT say):

  • “assign() for production processing” — no fault tolerance
  • “assign() automatically rebalances” — no rebalancing at all
  • “subscribe() and assign() can be mixed” — undefined behavior
  • “assign() automatically manages offsets” — you are fully responsible

Related topics:

  • [[5. What is a Consumer Group]]
  • [[6. How does consumer load balancing work in a group]]
  • [[12. What is offset in Kafka]]
  • [[15. What is rebalancing and when does it happen]]