Question 24 · Section 15

How to handle errors when reading messages in Kafka

Imagine a post office. If a letter cannot be delivered (wrong address):

Language versions: English Russian Ukrainian

Junior Level

Simple Definition

Error handling when reading is a set of patterns for correctly handling situations where a consumer cannot process a message: parsing errors, database unavailability, external service timeouts, etc.

Analogy

Imagine a post office. If a letter cannot be delivered (wrong address):

  1. Try again in an hour (retry)
  2. If after 3 attempts it still fails — put it in the “dead letters” department (DLQ)
  3. Never throw away a letter without logging it (log everything)

Basic Processing Template

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                log.error("Error processing record: {}", record, e);
                // Do NOT commit offset → message will be read again
            }
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    log.info("Shutdown requested");
} finally {
    consumer.commitSync();  // final commit
    consumer.close();
}

private void processRecord(ConsumerRecord<String, String> record) {
    String value = record.value();
    Order order = parseOrder(value);  // may throw exception
    saveToDatabase(order);            // may throw exception
}

Graceful Shutdown

// In another thread (e.g., shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    running = false;
    consumer.wakeup();  // interrupts poll()
}));

When This Matters

  • Any production application — errors are inevitable
  • Financial data — transactions cannot be lost
  • External service integrations — timeouts, rate limits
  • Data pipelines — corrupt data, schema mismatch

Middle Level

Error Handling Strategies

1. Retry with Exponential Backoff

private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;

private void processWithRetry(ConsumerRecord<String, String> record) {
    int retries = 0;
    while (retries < MAX_RETRIES) {
        try {
            processRecord(record);
            return;  // success
        } catch (RetryableException e) {
            retries++;
            long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
            // Exponential backoff: 1s, 2s, 4s
            log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
            // ⚠️ ANTI-PATTERN: Thread.sleep() in the consumer loop blocks poll()
            // and may cause max.poll.interval.ms exceeded.
            // Use async retry or DLQ instead of sleep.
            try {
                Thread.sleep(delay);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ie);
            }
        }
    }
    throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}

2. Dead Letter Queue (DLQ)

KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);

private void processWithDLQ(ConsumerRecord<String, String> record) {
    try {
        processWithRetry(record);
    } catch (Exception e) {
        // Send to DLQ
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            "orders-dlq",
            record.key(),
            record.value()
        );
        // Enrich with error metadata
        dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
        dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
        dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
        dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
        dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
        dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        dlqProducer.send(dlqRecord);
        log.error("Sent to DLQ: {}", record, e);
    }
}

3. Skip and Continue

// For non-critical data — log and skip
for (ConsumerRecord<String, String> record : records) {
    try {
        processRecord(record);
    } catch (NonCriticalException e) {
        log.warn("Skipping non-critical error: {}", e.getMessage());
        // Continue — offset will be committed
    } catch (CriticalException e) {
        log.error("Critical error, stopping: {}", e.getMessage());
        throw e;  // Don't commit → retry on restart
    }
}
consumer.commitSync();

Common Errors Table

Error Symptoms Consequences Solution
Infinite retry Consumer stuck on one message Consumer lag grows, pipeline is stalled Max retries + DLQ
Commit before processing Data lost on crash Silent data loss Commit AFTER processing
Without DLQ Problematic messages are lost No ability to analyze Always use DLQ
Thread.sleep in consumer loop max.poll.interval.ms exceeded Rebalancing, duplicate processing Async retry
Catch Exception (too broad) Hides critical errors Unpredictable behavior Catch specific exceptions
Processing in the same thread One failing record blocks the entire partition Throughput = 0 Separate error handling thread

Error Classification

Error Type Example Retry? DLQ?
Transient DB timeout, network glitch Yes (3–5×) If retries exhausted
Poison pill Corrupt data, schema mismatch No Immediately to DLQ
External dependency API down, rate limit Yes (backoff) If retries exhausted
Business logic Invalid order amount No To DLQ with metadata
Fatal OOM, disk full No Alert + stop

When NOT to Use Retry

  • Poison pill — corrupt data, schema violations (will never be fixed by retry)
  • Business rule violations — negative amount, invalid status (need manual review)
  • Authentication/authorization errors — credential issues won’t be resolved by retry
  • Rate limit from external API — retry with backoff, but not indefinitely

Senior Level

Possible Follow-up Questions from the Interviewer

  • “What if the DLQ producer also fails?” — retry limit → manual-review queue
  • “How to handle poison pill in Kafka Streams?” — Quarantine Transformer
  • “What if a message cannot be deserialized?” — use bytes deserializer → DLQ

Deep Internals

Error Handling in Spring Kafka

@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
    processOrder(record.value());
}

// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
    // Retryable exceptions
    var retryableExceptions = Map.of(
        RetriableException.class, RetryState.ALWAYS_RETRY
    );

    // FixedBackOff or ExponentialBackOff
    var backOff = new ExponentialBackOff(1000L, 2.0);
    backOff.setMaxElapsedTime(60000L);  // max 60s total retry
    backOff.setMaxAttempts(5);

    // DLQ producer
    var dlqSender = new DeadLetterPublishingRecoverer(template);

    return new DefaultErrorHandler(dlqSender, backOff);
}

Error Handling in Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");

stream.process(() -> new Processor<String, String>() {
    @Override
    public void process(Record<String, String> record) {
        try {
            processRecord(record);
        } catch (Exception e) {
            // Kafka Streams has no built-in DLQ
            // Use side output instead
            context().forward(record.withValue("ERROR: " + e.getMessage()),
                             To.child("error-topic"));
        }
    }
});

ConsumerRebalanceListener + Error Handling

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // On rebalance: commit current offset
        // If there is in-flight processing — wait for completion
        awaitInFlightProcessing();
        consumer.commitSync();
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // On receiving partitions: load state
        initializeState(partitions);
    }
});

Trade-offs

Strategy Data Safety Throughput Complexity When to Use
Retry + commit after High Low Medium Critical data
Retry + async DLQ High Medium High Production standard
Skip + log Low High Low Metrics, logs
Block until success Maximum 0 on error Low Single-record processing
Side output (Streams) High High Medium Kafka Streams apps

Edge Cases

1. Poison Pill — a message that always fails:

Scenario:
  Partition 0: offset 1000 — corrupt JSON
  Consumer reads offset 1000 → parse error → retry → error → retry...
  Consumer loop stuck on one message infinitely
  All subsequent messages (1001, 1002, ...) are NOT processed

Detection:
  Consumer lag for partition 0 doesn't change
  Log shows repeating error at offset 1000

Resolution:
  1. DLQ after max retries
  2. Seek past the poison pill:
     consumer.seek(new TopicPartition("orders", 0), 1001);
  3. Analyze the DLQ message and fix the producer schema

2. max.poll.interval.ms exceeded during retry:

// PROBLEM:
while (retries < MAX_RETRIES) {
    try {
        process(record);
        break;
    } catch (Exception e) {
        Thread.sleep(10000);  // 10s sleep × 5 retries = 50s
        retries++;
    }
}
// If max.poll.interval.ms = 300000 (5 min) — OK
// If max.poll.interval.ms = 30000 (30s) — consumer is kicked out!

// SOLUTION: async retry
CompletableFuture.runAsync(() -> {
    retryWithBackoff(record);
}).whenComplete((result, ex) -> {
    if (ex == null) {
        commitOffset(record);
    } else {
        sendToDLQ(record, ex);
        commitOffset(record);
    }
});

3. Partial Batch Failure:

Scenario:
  Consumer got a batch of 500 records
  Records 0–499 processed successfully
  Record 500 causes OutOfMemoryError
  Consumer crashes → no offsets from the batch are committed

On restart:
  All 500 records will be read again
  Records 0–499 → duplicate processing!

Resolution:
  1. Idempotent processing (upsert, dedup keys)
  2. Per-record commit (rare, high overhead)
  3. Commit at safe points (every 100 records)
  4. Transactional processing

4. Error Handling with Manual Partition Assignment:

// With assign() instead of subscribe() — no rebalance
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
consumer.seek(partition, lastProcessedOffset);

// Error handling is simpler — no rebalance interruption
// BUT: you must manually handle:
//  - Offset tracking
//  - Partition management
//  - Consumer lifecycle

5. Concurrent Error Handling:

// Multi-threaded processing with error handling
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        executor.submit(() -> {
            try {
                processRecord(record);
                currentOffsets.compute(
                    new TopicPartition(record.topic(), record.partition()),
                    (tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
                );
            } catch (Exception e) {
                sendToDLQ(record, e);
                // Still commit — DLQ handled
                currentOffsets.compute(
                    new TopicPartition(record.topic(), record.partition()),
                    (tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
                );
            }
        });
    }

    // Commit max offsets
    consumer.commitSync(buildOffsetMap(currentOffsets));
}

Performance (Production Numbers)

Strategy Throughput P99 Latency Error Overhead CPU Impact
No error handling 100K msg/s 5ms 0% baseline
Try-catch + skip 98K msg/s 5ms +2% +1%
Retry (3x) + DLQ 85K msg/s 15ms +15% +5%
Retry (5x) + async DLQ 80K msg/s 20ms +20% +8%
Per-record commit 50K msg/s 50ms +100% +20%

Production War Story

Situation: Logistics company, processing GPS tracks from 50K couriers. 200K events/min.

Problem: Every 2-3 hours, one courier sends corrupt GPS data (null lat/lon). Consumer crashes with NullPointerException, restarts → crashes again on the same message. Consumer lag grows at 50K/min during downtime.

Investigation:

  • Error handling: catch-all + retry without DLQ
  • Retry logic: infinite retry on all exceptions
  • Poison pill effect: one corrupt record blocks the partition
  • 30 min downtime × 50K events/min = 1.5M backlog
  • Rebalancing on each crash → cascade failures

Resolution:

// 1. Retry only for transient errors
Set<Class<? extends Throwable>> retryable = Set.of(
    TimeoutException.class,
    ConnectException.class,
    SocketException.class
);

// 2. DLQ for all others
DefaultErrorHandler handler = new DefaultErrorHandler(
    new DeadLetterPublishingRecoverer(kafkaTemplate),
    new FixedBackOff(1000L, 3)  // 3 retries, 1s apart
);
handler.setRetryableExceptions(retryable);

// 3. Alert on DLQ messages
// 4. DLQ consumer for manual review and replay
// 5. Schema validation on the producer side (prevention)

Result: 0 downtime for 12 months. DLQ avg 2 messages/day (all corrupt GPS).

Monitoring (JMX, Prometheus, Burrow)

JMX metrics:

kafka.consumer:type=consumer-metrics,client-id=consumer-1
  - failed-rebalance-rate-per-hour
  - last-heartbeat-seconds-ago
  - commit-latency-avg

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
  - records-lag-max (max lag across partitions)
  - fetch-latency-avg

Spring Kafka / Application:

app.kafka.consumer.error.rate
app.kafka.consumer.dlq.size
app.kafka.consumer.retry.count
app.kafka.consumer.processing.latency.p99

Prometheus + Alertmanager:

- alert: KafkaConsumerErrorRateHigh
  expr: rate(kafka_consumer_error_total[5m]) / rate(kafka_consumer_poll_total[5m]) > 0.05
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer error rate > 5%"

- alert: KafkaDLQMessagesReceived
  expr: increase(kafka_dlq_messages_total[1h]) > 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Messages sent to DLQ in the last hour"

- alert: KafkaConsumerLagGrowing
  expr: rate(kafka_consumer_lag[5m]) > 1000
  for: 10m
  labels:
    severity: critical
  annotations:
    summary: "Consumer lag growing rapidly  processing bottleneck"

- alert: KafkaConsumerRebalanceStorm
  expr: rate(kafka_consumer_rebalance_total[10m]) > 0.1
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "Frequent rebalancing detected"

Burrow — consumer lag monitoring with status assessment:

GET /v3/kafka/cluster/consumer/group/status

{
  "status": "ERROR",     // lag growing, processing stalled
  "status": "WARN",      // lag stable but high
  "status": "OK"         // lag decreasing or zero
}

Highload Best Practices

✅ Retry only for transient errors (define an allowlist)
✅ Max retries (3–5) + DLQ for non-retryable
✅ DLQ with enriched metadata (original topic, offset, exception)
✅ Async error handling for long retries (don't block poll loop)
✅ Exponential backoff with jitter (prevent thundering herd)
✅ Idempotent processing (protect against duplicates on retry)
✅ Alert on DLQ messages (never ignore DLQ)
✅ Graceful shutdown with committing in-flight offsets
✅ Schema validation on the producer side (prevent poison pills)
✅ Separate error handling thread (don't block main processing)

❌ Retry on all exceptions (infinite loop risk)
❌ Retry in main consumer thread (max.poll.interval exceeded)
❌ Without DLQ (lost poison pills)
❌ Commit before processing (data loss on failure)
❌ Without alerting on errors (silent degradation)
❌ Without idempotent processing (duplicates on retry)

Architectural Decisions

  1. Error classification — transient vs permanent, determines retry strategy
  2. DLQ enrichment — metadata for debugging and replay capability
  3. Async error handling — don’t block poll loop, avoid rebalancing
  4. Idempotent processing — fundamental for safe retry
  5. Prevention > cure — schema validation on the producer side

Summary for Senior

  • Error handling is a critical component of production Kafka systems
  • Poison pill — the most frequent cause of consumer stall
  • DLQ — mandatory for poison pill isolation and later analysis
  • Retry only transient errors — permanent errors go to DLQ immediately
  • max.poll.interval.ms — limit on total processing + retry time
  • Async error handling for long retries — avoid blocking the poll loop
  • Idempotent processing — prerequisite for safe retry
  • Monitoring: error rate, DLQ size, consumer lag — all need alerts
  • Prevention: schema validation, producer-side checks — reduce errors at source

🎯 Interview Cheat Sheet

Must know:

  • Retry only for transient errors (timeout, network); poison pill → immediately to DLQ
  • DLQ (Dead Letter Queue) — isolator for problematic messages after max retries
  • Thread.sleep in the consumer loop is an anti-pattern: max.poll.interval exceeded → rebalancing
  • Async error handling — don’t block the poll loop, avoid rebalancing
  • Idempotent processing — prerequisite for safe retry (protects against duplicates)
  • Classification: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
  • ConsumerRebalanceListener: commitSync in onPartitionsRevoked, await in-flight processing

Common follow-up questions:

  • What is a poison pill? — A message that always fails (corrupt data); retry is infinite.
  • Why is Thread.sleep bad? — It blocks poll() → max.poll.interval exceeded → rebalancing.
  • When NOT to use retry? — Poison pill, business rule violations, auth errors.
  • How to handle partial batch failure? — Idempotent processing + commit at safe points.

Red flags (DO NOT say):

  • “Retry on all exceptions” — poison pill = infinite loop
  • “Commit before processing is standard practice” — data loss on crash
  • “DLQ is not needed if retry works” — poison pill bypasses retry
  • “Thread.sleep in the consumer loop is normal” — causes rebalancing

Related topics:

  • [[25. What is DLQ (Dead Letter Queue)]]
  • [[13. How does offset commit work]]
  • [[15. What is rebalancing and when does it happen]]
  • [[10. What is the difference between at-most-once, at-least-once and exactly-once]]