How to handle errors when reading messages in Kafka
Imagine a post office. If a letter cannot be delivered (wrong address):
Junior Level
Simple Definition
Error handling when reading is a set of patterns for correctly handling situations where a consumer cannot process a message: parsing errors, database unavailability, external service timeouts, etc.
Analogy
Imagine a post office. If a letter cannot be delivered (wrong address):
- Try again in an hour (retry)
- If after 3 attempts it still fails — put it in the “dead letters” department (DLQ)
- Never throw away a letter without logging it (log everything)
Basic Processing Template
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
log.error("Error processing record: {}", record, e);
// Do NOT commit offset → message will be read again
}
}
consumer.commitSync();
}
} catch (WakeupException e) {
log.info("Shutdown requested");
} finally {
consumer.commitSync(); // final commit
consumer.close();
}
private void processRecord(ConsumerRecord<String, String> record) {
String value = record.value();
Order order = parseOrder(value); // may throw exception
saveToDatabase(order); // may throw exception
}
Graceful Shutdown
// In another thread (e.g., shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
consumer.wakeup(); // interrupts poll()
}));
When This Matters
- Any production application — errors are inevitable
- Financial data — transactions cannot be lost
- External service integrations — timeouts, rate limits
- Data pipelines — corrupt data, schema mismatch
Middle Level
Error Handling Strategies
1. Retry with Exponential Backoff
private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;
private void processWithRetry(ConsumerRecord<String, String> record) {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
processRecord(record);
return; // success
} catch (RetryableException e) {
retries++;
long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
// Exponential backoff: 1s, 2s, 4s
log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
// ⚠️ ANTI-PATTERN: Thread.sleep() in the consumer loop blocks poll()
// and may cause max.poll.interval.ms exceeded.
// Use async retry or DLQ instead of sleep.
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}
2. Dead Letter Queue (DLQ)
KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);
private void processWithDLQ(ConsumerRecord<String, String> record) {
try {
processWithRetry(record);
} catch (Exception e) {
// Send to DLQ
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
"orders-dlq",
record.key(),
record.value()
);
// Enrich with error metadata
dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
log.error("Sent to DLQ: {}", record, e);
}
}
3. Skip and Continue
// For non-critical data — log and skip
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (NonCriticalException e) {
log.warn("Skipping non-critical error: {}", e.getMessage());
// Continue — offset will be committed
} catch (CriticalException e) {
log.error("Critical error, stopping: {}", e.getMessage());
throw e; // Don't commit → retry on restart
}
}
consumer.commitSync();
Common Errors Table
| Error | Symptoms | Consequences | Solution |
|---|---|---|---|
| Infinite retry | Consumer stuck on one message | Consumer lag grows, pipeline is stalled | Max retries + DLQ |
| Commit before processing | Data lost on crash | Silent data loss | Commit AFTER processing |
| Without DLQ | Problematic messages are lost | No ability to analyze | Always use DLQ |
| Thread.sleep in consumer loop | max.poll.interval.ms exceeded | Rebalancing, duplicate processing | Async retry |
| Catch Exception (too broad) | Hides critical errors | Unpredictable behavior | Catch specific exceptions |
| Processing in the same thread | One failing record blocks the entire partition | Throughput = 0 | Separate error handling thread |
Error Classification
| Error Type | Example | Retry? | DLQ? |
|---|---|---|---|
| Transient | DB timeout, network glitch | Yes (3–5×) | If retries exhausted |
| Poison pill | Corrupt data, schema mismatch | No | Immediately to DLQ |
| External dependency | API down, rate limit | Yes (backoff) | If retries exhausted |
| Business logic | Invalid order amount | No | To DLQ with metadata |
| Fatal | OOM, disk full | No | Alert + stop |
When NOT to Use Retry
- Poison pill — corrupt data, schema violations (will never be fixed by retry)
- Business rule violations — negative amount, invalid status (need manual review)
- Authentication/authorization errors — credential issues won’t be resolved by retry
- Rate limit from external API — retry with backoff, but not indefinitely
Senior Level
Possible Follow-up Questions from the Interviewer
- “What if the DLQ producer also fails?” — retry limit → manual-review queue
- “How to handle poison pill in Kafka Streams?” — Quarantine Transformer
- “What if a message cannot be deserialized?” — use bytes deserializer → DLQ
Deep Internals
Error Handling in Spring Kafka
@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
processOrder(record.value());
}
// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
// Retryable exceptions
var retryableExceptions = Map.of(
RetriableException.class, RetryState.ALWAYS_RETRY
);
// FixedBackOff or ExponentialBackOff
var backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(60000L); // max 60s total retry
backOff.setMaxAttempts(5);
// DLQ producer
var dlqSender = new DeadLetterPublishingRecoverer(template);
return new DefaultErrorHandler(dlqSender, backOff);
}
Error Handling in Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
stream.process(() -> new Processor<String, String>() {
@Override
public void process(Record<String, String> record) {
try {
processRecord(record);
} catch (Exception e) {
// Kafka Streams has no built-in DLQ
// Use side output instead
context().forward(record.withValue("ERROR: " + e.getMessage()),
To.child("error-topic"));
}
}
});
ConsumerRebalanceListener + Error Handling
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// On rebalance: commit current offset
// If there is in-flight processing — wait for completion
awaitInFlightProcessing();
consumer.commitSync();
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// On receiving partitions: load state
initializeState(partitions);
}
});
Trade-offs
| Strategy | Data Safety | Throughput | Complexity | When to Use |
|---|---|---|---|---|
| Retry + commit after | High | Low | Medium | Critical data |
| Retry + async DLQ | High | Medium | High | Production standard |
| Skip + log | Low | High | Low | Metrics, logs |
| Block until success | Maximum | 0 on error | Low | Single-record processing |
| Side output (Streams) | High | High | Medium | Kafka Streams apps |
Edge Cases
1. Poison Pill — a message that always fails:
Scenario:
Partition 0: offset 1000 — corrupt JSON
Consumer reads offset 1000 → parse error → retry → error → retry...
Consumer loop stuck on one message infinitely
All subsequent messages (1001, 1002, ...) are NOT processed
Detection:
Consumer lag for partition 0 doesn't change
Log shows repeating error at offset 1000
Resolution:
1. DLQ after max retries
2. Seek past the poison pill:
consumer.seek(new TopicPartition("orders", 0), 1001);
3. Analyze the DLQ message and fix the producer schema
2. max.poll.interval.ms exceeded during retry:
// PROBLEM:
while (retries < MAX_RETRIES) {
try {
process(record);
break;
} catch (Exception e) {
Thread.sleep(10000); // 10s sleep × 5 retries = 50s
retries++;
}
}
// If max.poll.interval.ms = 300000 (5 min) — OK
// If max.poll.interval.ms = 30000 (30s) — consumer is kicked out!
// SOLUTION: async retry
CompletableFuture.runAsync(() -> {
retryWithBackoff(record);
}).whenComplete((result, ex) -> {
if (ex == null) {
commitOffset(record);
} else {
sendToDLQ(record, ex);
commitOffset(record);
}
});
3. Partial Batch Failure:
Scenario:
Consumer got a batch of 500 records
Records 0–499 processed successfully
Record 500 causes OutOfMemoryError
Consumer crashes → no offsets from the batch are committed
On restart:
All 500 records will be read again
Records 0–499 → duplicate processing!
Resolution:
1. Idempotent processing (upsert, dedup keys)
2. Per-record commit (rare, high overhead)
3. Commit at safe points (every 100 records)
4. Transactional processing
4. Error Handling with Manual Partition Assignment:
// With assign() instead of subscribe() — no rebalance
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
consumer.seek(partition, lastProcessedOffset);
// Error handling is simpler — no rebalance interruption
// BUT: you must manually handle:
// - Offset tracking
// - Partition management
// - Consumer lifecycle
5. Concurrent Error Handling:
// Multi-threaded processing with error handling
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> {
try {
processRecord(record);
currentOffsets.compute(
new TopicPartition(record.topic(), record.partition()),
(tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
);
} catch (Exception e) {
sendToDLQ(record, e);
// Still commit — DLQ handled
currentOffsets.compute(
new TopicPartition(record.topic(), record.partition()),
(tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
);
}
});
}
// Commit max offsets
consumer.commitSync(buildOffsetMap(currentOffsets));
}
Performance (Production Numbers)
| Strategy | Throughput | P99 Latency | Error Overhead | CPU Impact |
|---|---|---|---|---|
| No error handling | 100K msg/s | 5ms | 0% | baseline |
| Try-catch + skip | 98K msg/s | 5ms | +2% | +1% |
| Retry (3x) + DLQ | 85K msg/s | 15ms | +15% | +5% |
| Retry (5x) + async DLQ | 80K msg/s | 20ms | +20% | +8% |
| Per-record commit | 50K msg/s | 50ms | +100% | +20% |
Production War Story
Situation: Logistics company, processing GPS tracks from 50K couriers. 200K events/min.
Problem: Every 2-3 hours, one courier sends corrupt GPS data (null lat/lon). Consumer crashes with NullPointerException, restarts → crashes again on the same message. Consumer lag grows at 50K/min during downtime.
Investigation:
- Error handling: catch-all + retry without DLQ
- Retry logic: infinite retry on all exceptions
- Poison pill effect: one corrupt record blocks the partition
- 30 min downtime × 50K events/min = 1.5M backlog
- Rebalancing on each crash → cascade failures
Resolution:
// 1. Retry only for transient errors
Set<Class<? extends Throwable>> retryable = Set.of(
TimeoutException.class,
ConnectException.class,
SocketException.class
);
// 2. DLQ for all others
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 3) // 3 retries, 1s apart
);
handler.setRetryableExceptions(retryable);
// 3. Alert on DLQ messages
// 4. DLQ consumer for manual review and replay
// 5. Schema validation on the producer side (prevention)
Result: 0 downtime for 12 months. DLQ avg 2 messages/day (all corrupt GPS).
Monitoring (JMX, Prometheus, Burrow)
JMX metrics:
kafka.consumer:type=consumer-metrics,client-id=consumer-1
- failed-rebalance-rate-per-hour
- last-heartbeat-seconds-ago
- commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
- records-lag-max (max lag across partitions)
- fetch-latency-avg
Spring Kafka / Application:
app.kafka.consumer.error.rate
app.kafka.consumer.dlq.size
app.kafka.consumer.retry.count
app.kafka.consumer.processing.latency.p99
Prometheus + Alertmanager:
- alert: KafkaConsumerErrorRateHigh
expr: rate(kafka_consumer_error_total[5m]) / rate(kafka_consumer_poll_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer error rate > 5%"
- alert: KafkaDLQMessagesReceived
expr: increase(kafka_dlq_messages_total[1h]) > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Messages sent to DLQ in the last hour"
- alert: KafkaConsumerLagGrowing
expr: rate(kafka_consumer_lag[5m]) > 1000
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer lag growing rapidly — processing bottleneck"
- alert: KafkaConsumerRebalanceStorm
expr: rate(kafka_consumer_rebalance_total[10m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Frequent rebalancing detected"
Burrow — consumer lag monitoring with status assessment:
GET /v3/kafka/cluster/consumer/group/status
{
"status": "ERROR", // lag growing, processing stalled
"status": "WARN", // lag stable but high
"status": "OK" // lag decreasing or zero
}
Highload Best Practices
✅ Retry only for transient errors (define an allowlist)
✅ Max retries (3–5) + DLQ for non-retryable
✅ DLQ with enriched metadata (original topic, offset, exception)
✅ Async error handling for long retries (don't block poll loop)
✅ Exponential backoff with jitter (prevent thundering herd)
✅ Idempotent processing (protect against duplicates on retry)
✅ Alert on DLQ messages (never ignore DLQ)
✅ Graceful shutdown with committing in-flight offsets
✅ Schema validation on the producer side (prevent poison pills)
✅ Separate error handling thread (don't block main processing)
❌ Retry on all exceptions (infinite loop risk)
❌ Retry in main consumer thread (max.poll.interval exceeded)
❌ Without DLQ (lost poison pills)
❌ Commit before processing (data loss on failure)
❌ Without alerting on errors (silent degradation)
❌ Without idempotent processing (duplicates on retry)
Architectural Decisions
- Error classification — transient vs permanent, determines retry strategy
- DLQ enrichment — metadata for debugging and replay capability
- Async error handling — don’t block poll loop, avoid rebalancing
- Idempotent processing — fundamental for safe retry
- Prevention > cure — schema validation on the producer side
Summary for Senior
- Error handling is a critical component of production Kafka systems
- Poison pill — the most frequent cause of consumer stall
- DLQ — mandatory for poison pill isolation and later analysis
- Retry only transient errors — permanent errors go to DLQ immediately
- max.poll.interval.ms — limit on total processing + retry time
- Async error handling for long retries — avoid blocking the poll loop
- Idempotent processing — prerequisite for safe retry
- Monitoring: error rate, DLQ size, consumer lag — all need alerts
- Prevention: schema validation, producer-side checks — reduce errors at source
🎯 Interview Cheat Sheet
Must know:
- Retry only for transient errors (timeout, network); poison pill → immediately to DLQ
- DLQ (Dead Letter Queue) — isolator for problematic messages after max retries
- Thread.sleep in the consumer loop is an anti-pattern: max.poll.interval exceeded → rebalancing
- Async error handling — don’t block the poll loop, avoid rebalancing
- Idempotent processing — prerequisite for safe retry (protects against duplicates)
- Classification: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
- ConsumerRebalanceListener: commitSync in onPartitionsRevoked, await in-flight processing
Common follow-up questions:
- What is a poison pill? — A message that always fails (corrupt data); retry is infinite.
- Why is Thread.sleep bad? — It blocks poll() → max.poll.interval exceeded → rebalancing.
- When NOT to use retry? — Poison pill, business rule violations, auth errors.
- How to handle partial batch failure? — Idempotent processing + commit at safe points.
Red flags (DO NOT say):
- “Retry on all exceptions” — poison pill = infinite loop
- “Commit before processing is standard practice” — data loss on crash
- “DLQ is not needed if retry works” — poison pill bypasses retry
- “Thread.sleep in the consumer loop is normal” — causes rebalancing
Related topics:
- [[25. What is DLQ (Dead Letter Queue)]]
- [[13. How does offset commit work]]
- [[15. What is rebalancing and when does it happen]]
- [[10. What is the difference between at-most-once, at-least-once and exactly-once]]