Can you read messages from a specific partition in Kafka
Imagine a restaurant with multiple chefs (partitions).
Junior Level
Simple Definition
Yes, you can! A Kafka Consumer can read messages from a specific partition instead of all at once. This is done via the assign() method instead of subscribe().
Analogy
Imagine a restaurant with multiple chefs (partitions).
subscribe() — you sit at a communal table and eat whatever any chef brings (the waiter distributes).
assign() — you choose which chefs you want: “From chef 1 and 3, but not 2.”
Code Example
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// assign() — manual assignment of specific partitions
TopicPartition partition = new TopicPartition("orders", 0); // topic "orders", partition 0
consumer.assign(List.of(partition));
// Reading ONLY from partition 0
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Offset: " + record.offset());
System.out.println("Key: " + record.key());
System.out.println("Value: " + record.value());
}
}
subscribe() vs assign()
// subscribe() — automatic assignment via consumer group
consumer.subscribe(List.of("orders"));
// Kafka itself decides which partitions to give this consumer
// assign() — manual assignment
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0, p1));
// You decide which partitions to read
When to Use
- Testing — read messages from a specific partition
- Replay — re-read messages from a certain offset
- Debugging — investigate a problematic partition
- Specific tasks — when the logic requires a specific partition
Middle Level
subscribe() vs assign() — Detailed Comparison
| Aspect | subscribe() | assign() |
|---|---|---|
| Consumer group | Uses consumer group | Ignores consumer group |
| Rebalancing | Automatic | No rebalancing — if a consumer with assign() crashes, its partitions are NOT reassigned to other consumers. Processing for those partitions stops entirely until restart. |
| Offset management | Automatic (group-based) | Manual (you handle it yourself) |
| Fault tolerance | High (rebalancing on failure) | Low (manual handling) |
| Scalability | Automatic | Manual |
| Use case | Production processing | Testing, debugging, special tasks |
assign() — How It Works
// 1. Create TopicPartition object
TopicPartition partition = new TopicPartition("orders", 0);
// 2. Assign to consumer
consumer.assign(List.of(partition));
// 3. (Optional) Set offset
consumer.seek(partition, 1000); // from offset 1000
consumer.seekToBeginning(List.of(partition)); // from the beginning
consumer.seekToEnd(List.of(partition)); // from the end
// 4. Read
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Only from partition 0
Seek — Random Access
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
// Read from a specific offset
consumer.seek(partition, 5000);
// Read from the beginning
consumer.seekToBeginning(List.of(partition));
// Read from the end (new messages)
consumer.seekToEnd(List.of(partition));
// Read from a timestamp
long timestamp = System.currentTimeMillis() - 3600_000; // 1 hour ago
Map<TopicPartition, Long> timestamps = Map.of(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
OffsetAndTimestamp offsetAndTs = offsets.get(partition);
if (offsetAndTs != null) {
consumer.seek(partition, offsetAndTs.offset());
}
Common Errors Table
| Error | Symptoms | Consequences | Solution |
|---|---|---|---|
| assign() + subscribe() together | Conflicting behavior | Unpredictable behavior | Use one or the other |
| Without manual offset management | On restart — reads from start | Duplicate processing | Save offsets externally |
| assign() for production processing | No rebalancing | Consumer failure = data stall | Use subscribe() for production |
| assign() without consumer group id | Group.id is ignored | OK, but confusing | group.id not needed for assign() |
| Seek without assign() | IllegalStateException | Exception | assign() first, then seek() |
| Reading non-existent partition | Exception or empty results | Error | Check partition exists |
When assign() is Useful
✅ Testing: check a specific partition
✅ Replay: re-read messages from the beginning
✅ Debugging: investigate a specific partition
✅ Data migration: copy a partition to another cluster
✅ Backfill: process historical data from a specific partition
✅ Audit: check data in a specific partition
When NOT to Use assign()
- Production processing — no fault tolerance
- Auto-scaling — no rebalancing
- Consumer groups — assign() ignores groups
- When load balancing is needed — subscribe() does this automatically
Senior Level
Deep Internals
assign() vs subscribe() — Internal Implementation
// KafkaConsumer internals
subscribe():
consumer.coordinator — GroupCoordinator protocol
consumer.subscriptions — dynamic (changes on rebalance)
consumer.assignment — computed by GroupCoordinator
Flow:
1. consumer.subscribe(topics) → JoinGroup request
2. GroupCoordinator — leader election
3. Leader computes assignment (PartitionAssignor)
4. SyncGroup response → each consumer gets its partitions
5. consumer.onPartitionsAssigned() callback
6. consumer.assignment ← computed assignment
assign():
consumer.coordinator — NOT used
consumer.subscriptions — static (user-defined)
consumer.assignment ← direct user input
Flow:
1. consumer.assign(partitions) → direct assignment
2. No JoinGroup, no coordinator, no rebalancing
3. consumer.assignment ← user-provided partitions
4. consumer.position(partition) → fetch from __consumer_offsets (if group.id set)
OR → beginning (if auto.offset.reset = earliest)
Offset Management with assign()
// assign() does NOT automatically manage offsets
// You must track offsets yourself
// Option 1: Use group.id (offsets in __consumer_offsets)
props.put("group.id", "my-consumer");
consumer.assign(List.of(partition));
// Offsets can still be committed!
consumer.commitSync(); // saves to __consumer_offsets
// Option 2: External offset storage
Map<TopicPartition, Long> savedOffsets = loadOffsetsFromDB();
consumer.assign(List.of(partition));
for (Map.Entry<TopicPartition, Long> entry : savedOffsets.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());
}
// After processing:
saveOffsetsToDB(partition, currentOffset);
// Option 3: No offset tracking
consumer.assign(List.of(partition));
consumer.seekToBeginning(List.of(partition));
// Each run — reads from the beginning
ConsumerRebalanceListener — Only for subscribe()
// subscribe() + ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Called on rebalancing
consumer.commitSync();
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Called when receiving partitions
initializeState(partitions);
}
});
// assign() — ConsumerRebalanceListener is NOT CALLED
// No rebalancing → no callbacks
// You are fully responsible for the lifecycle
Trade-offs
| Aspect | subscribe() | assign() |
|---|---|---|
| Complexity | Low (automatic) | High (manual management) |
| Fault tolerance | High | Low |
| Control | Low | Maximum |
| Operational overhead | Low | High |
| Suitable for production | Yes | Rarely |
| Suitable for testing | No (can’t choose partition) | Yes |
Edge Cases
1. assign() and consumer group offsets:
// Scenario: group.id is set, assign() is used
props.put("group.id", "order-service");
consumer.assign(List.of(new TopicPartition("orders", 0)));
// On first poll:
// consumer.position(partition) → checks __consumer_offsets
// If committed offset exists → continues from it
// If not → auto.offset.reset applies (earliest/latest)
// This means assign() CAN use committed offsets!
// BUT: doesn't commit automatically, needs consumer.commitSync()
2. Mixed assign() and subscribe() — undefined behavior:
// DON'T DO THIS:
consumer.subscribe(List.of("orders"));
consumer.assign(List.of(new TopicPartition("orders", 0)));
// Result:
// subscribe() registration may be overwritten
// assign() may conflict with group assignment
// Behavior: undefined, depends on Kafka version
3. Seek on a non-assigned partition:
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0));
// Exception!
consumer.seek(p1, 1000);
// IllegalStateException: You can only check the position for partitions assigned to this consumer.
4. assign() with dynamic partition changes:
// assign() — static assignment
// If new partitions are added to the topic — assign() does NOT see them
Topic: orders (3 partitions: 0, 1, 2)
consumer.assign(List.of(p0, p1, p2));
// Admin adds partition 3 to topic "orders"
// consumer continues reading only p0, p1, p2
// partition 3 — not assigned, not read
// Resolution: periodic re-assignment
while (running) {
Set<TopicPartition> currentAssignment = consumer.assignment();
Set<TopicPartition> desiredAssignment = getPartitionsForTopic("orders");
if (!currentAssignment.equals(desiredAssignment)) {
consumer.assign(desiredAssignment);
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// process...
}
5. assign() with manual offset commit — exactly-once pattern:
// assign() + manual offset = full control
props.put("group.id", "order-service");
props.put("enable.auto.commit", "false");
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
// Load saved offset
long savedOffset = loadOffsetFromDB(partition);
consumer.seek(partition, savedOffset);
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processWithTransaction(record); // DB write + offset in same transaction
}
// Offset committed in same DB transaction as processing
// Exactly-once semantics!
}
6. Reading from a specific partition for debugging:
// Investigation: partition 2 has higher lag
TopicPartition suspiciousPartition = new TopicPartition("orders", 2);
consumer.assign(List.of(suspiciousPartition));
consumer.seekToBeginning(List.of(suspiciousPartition));
// Read all messages from partition 2
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> record : records) {
count++;
System.out.printf("Offset %d: key=%s, value=%s%n",
record.offset(), record.key(), record.value());
if (count > 10000) break; // limit for safety
}
}
System.out.println("Total messages in partition 2: " + count);
Performance (Production Numbers)
| Scenario | Throughput | Latency | Reliability |
|---|---|---|---|
| subscribe() (10 consumers, 30 partitions) | 100K msg/s | 10ms | High |
| assign() (1 consumer, 1 partition) | 10K msg/s | 5ms | Low |
| assign() (1 consumer, 30 partitions) | 100K msg/s | 5ms | Low |
| subscribe() (1 consumer, 1 partition) | 10K msg/s | 10ms | High |
Production War Story
Situation: E-commerce, 10 partitions, 10 consumers (subscribe). One consumer started falling behind — lag grew only on partition 7.
Investigation:
# Check which consumer has partition 7
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-service
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
order-service orders 7 500000 550000 50000 consumer-7-abc123
Root cause: Consumer-7 processed messages slower — it had a slow DB connection.
Debugging with assign():
// assign() to partition 7 for detailed analysis
TopicPartition p7 = new TopicPartition("orders", 7);
consumer.assign(List.of(p7));
consumer.seek(p7, 500000); // start from lag offset
// Analyze messages
Map<String, Integer> keyDistribution = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
keyDistribution.merge(key, 1, Integer::sum);
}
}
// Discovered: 80% of messages had key "user-42"
// → All messages for user-42 in one partition (hash of key)
// → One user = hot key = single consumer bottleneck
Resolution: Custom partitioner for better key distribution, not relying on default hash.
Monitoring (JMX, Prometheus, Burrow)
JMX metrics (consumer):
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
- records-lag-max: max lag across assigned partitions
- records-lag-avg: average lag
- bytes-consumed-rate
- records-consumed-rate
Prometheus:
- alert: KafkaConsumerSinglePartitionLag
expr: kafka_consumergroup_lag > 50000
for: 5m
labels:
severity: warning
annotations:
summary: "Single partition lag > 50K — potential hot key"
- alert: KafkaConsumerPartitionImbalance
expr: stddev(kafka_consumergroup_lag) by (group) > 10000
for: 10m
labels:
severity: warning
annotations:
summary: "High variance in partition lag — uneven distribution"
Burrow:
GET /v3/kafka/production/consumer/order-service/status
Shows lag per partition:
partition 0: lag 100 OK
partition 1: lag 200 OK
...
partition 7: lag 50000 ERROR ← hot partition!
Highload Best Practices
✅ subscribe() for production processing
✅ assign() for testing, debugging, replay
✅ ConsumerRebalanceListener for subscribe()
✅ External offset storage for assign()
✅ assign() + seek() for targeted debugging
✅ Monitor lag per partition (hot key detection)
✅ Custom partitioner for even distribution
❌ assign() for production (no fault tolerance)
❌ Mixed subscribe() + assign() (undefined behavior)
❌ assign() without offset management (duplicate processing on restart)
❌ Seek on non-assigned partition (IllegalStateException)
❌ assign() with auto-scaling (no rebalancing)
❌ Ignoring partition lag imbalance (hot key detection)
Architectural Decisions
- subscribe() = production default — auto-scaling, fault tolerance, rebalancing
- assign() = specialist tool — testing, debugging, replay, migration
- assign() + external offset = exactly-once with transactional processing
- Hot key detection — partition lag imbalance → partitioner issue
- Seek + assign = powerful debugging and data investigation tool
Summary for Senior
- assign() — direct control over partition assignment, without consumer group management
- subscribe() — automatic assignment via GroupCoordinator, with rebalancing
- assign() can use committed offsets if group.id is set
- assign() does NOT trigger ConsumerRebalanceListener — you are fully responsible
- Mixed subscribe() + assign() = undefined behavior — don’t do it
- assign() + seek() = powerful tool for debugging and investigation
- Hot key detection through partition lag monitoring
- Production processing: subscribe() — almost always the correct choice
- assign() — specialist tool for testing, replay, debugging, migration
🎯 Interview Cheat Sheet
Must know:
assign()— manual assignment of specific partitions;subscribe()— automatic via consumer groupassign()ignores consumer group: no rebalancing, no fault tolerance- If a consumer with assign() crashes — its partitions are NOT reassigned, processing stops
- Seek: read from any offset, beginning, end, or by timestamp
assign()can use committed offsets ifgroup.idis set- Mixed
subscribe()+assign()= undefined behavior — don’t do it - Production: subscribe() (fault tolerance); assign() — testing, debugging, replay
Common follow-up questions:
- Can you commit offsets with assign()? — Yes, if group.id is set, but manually (commitSync).
- What happens on seek to a non-assigned partition? — IllegalStateException.
- Does assign() see new partitions when added? — No, needs periodic re-assignment.
- Does ConsumerRebalanceListener work with assign()? — No, no rebalancing → no callbacks.
Red flags (DO NOT say):
- “assign() for production processing” — no fault tolerance
- “assign() automatically rebalances” — no rebalancing at all
- “subscribe() and assign() can be mixed” — undefined behavior
- “assign() automatically manages offsets” — you are fully responsible
Related topics:
- [[5. What is a Consumer Group]]
- [[6. How does consumer load balancing work in a group]]
- [[12. What is offset in Kafka]]
- [[15. What is rebalancing and when does it happen]]