Как обрабатывать ошибки при чтении сообщений в Kafka
Представьте почтовое отделение. Если письмо невозможно доставить (неправильный адрес):
🟢 Junior Level
Простое определение
Обработка ошибок при чтении — это набор паттернов для корректной обработки ситуаций когда консьюмер не может обработать сообщение: ошибка парсинга, недоступность БД, timeout внешнего сервиса и т.д.
Аналогия
Представьте почтовое отделение. Если письмо невозможно доставить (неправильный адрес):
- Попробовать снова через час (retry)
- Если после 3 попыток всё ещё невозможно — положить в “отдел ненайденных писем” (DLQ)
- Никогда не выбрасывать письмо без записи (log everything)
Базовый шаблон обработки
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
log.error("Error processing record: {}", record, e);
// НЕ коммитим offset → сообщение будет прочитано снова
}
}
consumer.commitSync();
}
} catch (WakeupException e) {
log.info("Shutdown requested");
} finally {
consumer.commitSync(); // финальный коммит
consumer.close();
}
private void processRecord(ConsumerRecord<String, String> record) {
String value = record.value();
Order order = parseOrder(value); // может бросить exception
saveToDatabase(order); // может бросить exception
}
Graceful shutdown
// В другом потоке (например, shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
consumer.wakeup(); // прерывает poll()
}));
Когда это важно
- Любое production приложение — ошибки неизбежны
- Финансовые данные — нельзя терять транзакции
- Интеграции с внешними сервисами — timeouts, rate limits
- Data pipelines — corrupt data, schema mismatch
🟡 Middle Level
Стратегии обработки ошибок
1. Retry с exponential backoff
private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;
private void processWithRetry(ConsumerRecord<String, String> record) {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
processRecord(record);
return; // success
} catch (RetryableException e) {
retries++;
long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
// Exponential backoff: 1s, 2s, 4s
log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
// ⚠️ АНТИПАТТЕРН: Thread.sleep() в consumer loop блокирует poll()
// и может вызвать max.poll.interval.ms exceeded.
// Используйте async retry или DLQ вместо sleep.
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}
2. Dead Letter Queue (DLQ)
KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);
private void processWithDLQ(ConsumerRecord<String, String> record) {
try {
processWithRetry(record);
} catch (Exception e) {
// Отправляем в DLQ
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
"orders-dlq",
record.key(),
record.value()
);
// enrich с метаданными ошибки
dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
log.error("Sent to DLQ: {}", record, e);
}
}
3. Skip и продолжить
// Для некритичных данных — log и skip
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (NonCriticalException e) {
log.warn("Skipping non-critical error: {}", e.getMessage());
// Continue — offset будет закоммичен
} catch (CriticalException e) {
log.error("Critical error, stopping: {}", e.getMessage());
throw e; // Не коммитим → retry при рестарте
}
}
consumer.commitSync();
Таблица типичных ошибок
| Ошибка | Симптомы | Последствия | Решение |
|---|---|---|---|
| Бесконечный retry | Консьюмер завис на одном сообщении | Consumer lag растёт, pipeline остановлен | Max retries + DLQ |
| Коммит до обработки | Данные потеряны при падении | Silent data loss | Коммит ПОСЛЕ обработки |
| Без DLQ | Проблемные сообщения теряются | Нет возможности анализа | Всегда использовать DLQ |
| Thread.sleep в consumer loop | max.poll.interval.ms exceeded | Rebalancing, duplicate processing | Асинхронный retry |
| Catch Exception (слишком широко) | Скрывает критичные ошибки | Непредсказуемое поведение | Catch specific exceptions |
| Обработка в том же потоке | Один failing record блокирует весь partition | Throughput = 0 | Separate error handling thread |
Классификация ошибок
| Тип ошибки | Пример | Retry? | DLQ? |
|---|---|---|---|
| Transient | Timeout БД, network glitch | Да (3–5 раз) | Если retries exhausted |
| Poison pill | Corrupt data, schema mismatch | Нет | Сразу в DLQ |
| External dependency | API down, rate limit | Да (с backoff) | Если retries exhausted |
| Business logic | Invalid order amount | Нет | В DLQ с метаданными |
| Fatal | OOM, disk full | Нет | Alert + stop |
Когда НЕ использовать retry
- Poison pill — corrupt data, schema violations (никогда не исправится retry)
- Business rule violations — negative amount, invalid status (нужна ручная проверка)
- Authentication/authorization errors — credential issues не решатся retry
- Rate limit от внешнего API — retry через backoff, НО не бесконечно
🔴 Senior Level
Возможные follow-up вопросы от интервьюера
- «Что если DLQ продюсер тоже упадёт?» — retry limit → manual-review queue
- «Как обработать poison pill в Kafka Streams?» — Quarantine Transformer
- «Что если сообщение не десериализуется?» — bytes десериализатор → DLQ
Глубокие внутренности
Error handling в Spring Kafka
@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
processOrder(record.value());
}
// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
// Retryable exceptions
var retryableExceptions = Map.of(
RetriableException.class, RetryState.ALWAYS_RETRY
);
// FixedBackOff или ExponentialBackOff
var backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(60000L); // max 60s total retry
backOff.setMaxAttempts(5);
// DLQ producer
var dlqSender = new DeadLetterPublishingRecoverer(template);
return new DefaultErrorHandler(dlqSender, backOff);
}
Error handling в Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
stream.process(() -> new Processor<String, String>() {
@Override
public void process(Record<String, String> record) {
try {
processRecord(record);
} catch (Exception e) {
// В Kafka Streams нет DLQ out-of-the-box
// Нужно использовать side output
context().forward(record.withValue("ERROR: " + e.getMessage()),
To.child("error-topic"));
}
}
});
ConsumerRebalanceListener + error handling
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// При ребалансе: коммитим текущий offset
// Если есть in-flight processing — ждём completion
awaitInFlightProcessing();
consumer.commitSync();
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// При получении партиций: загружаем state
initializeState(partitions);
}
});
Trade-offs
| Стратегия | Data safety | Throughput | Complexity | When to use |
|---|---|---|---|---|
| Retry + commit after | Высокая | Низкая | Средняя | Критичные данные |
| Retry + async DLQ | Высокая | Средняя | Высокая | Production standard |
| Skip + log | Низкая | Высокая | Низкая | Метрики, логи |
| Block until success | Максимальная | 0 при error | Низкая | Single-record processing |
| Side output (Streams) | Высокая | Высокая | Средняя | Kafka Streams apps |
Edge Cases
1. Poison Pill — сообщение которое всегда падает:
Сценарий:
Partition 0: offset 1000 — corrupt JSON
Consumer читает offset 1000 → parse error → retry → error → retry...
Consumer loop бесконечно на одном сообщении
Все последующие сообщения (1001, 1002, ...) НЕ обрабатываются
Обнаружение:
Consumer lag для partition 0 не меняется
Log показывает повторяющуюся ошибку на offset 1000
Решение:
1. DLQ после max retries
2. Seek past the poison pill:
consumer.seek(new TopicPartition("orders", 0), 1001);
3. Анализ DLQ сообщения и fix producer schema
2. max.poll.interval.ms exceeded при retry:
// ПРОБЛЕМА:
while (retries < MAX_RETRIES) {
try {
process(record);
break;
} catch (Exception e) {
Thread.sleep(10000); // 10s sleep × 5 retries = 50s
retries++;
}
}
// Если max.poll.interval.ms = 300000 (5 min) — OK
// Если max.poll.interval.ms = 30000 (30s) — consumer kicked out!
// РЕШЕНИЕ: async retry
CompletableFuture.runAsync(() -> {
retryWithBackoff(record);
}).whenComplete((result, ex) -> {
if (ex == null) {
commitOffset(record);
} else {
sendToDLQ(record, ex);
commitOffset(record);
}
});
3. Partial batch failure:
Сценарий:
Consumer получил batch из 500 records
Records 0-499 обработаны успешно
Record 500 вызывает OutOfMemoryError
Consumer crash → ни один offset из batch не закоммичен
При рестарте:
Все 500 records будут прочитаны снова
Records 0-499 → duplicate processing!
Решение:
1. Idempotent processing (upsert, dedup keys)
2. Per-record commit (rare, high overhead)
3. Commit at safe points (every 100 records)
4. Transactional processing
4. Error handling с manual partition assignment:
// При assign() вместо subscribe() — нет rebalance
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
consumer.seek(partition, lastProcessedOffset);
// Error handling simpler — no rebalance interruption
// НО: нужно manually handle:
// - Offset tracking
// - Partition management
// - Consumer lifecycle
5. Concurrent error handling:
// Multi-threaded processing с error handling
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> {
try {
processRecord(record);
currentOffsets.compute(
new TopicPartition(record.topic(), record.partition()),
(tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
);
} catch (Exception e) {
sendToDLQ(record, e);
// Still commit — DLQ handled
currentOffsets.compute(
new TopicPartition(record.topic(), record.partition()),
(tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
);
}
});
}
// Commit max offsets
consumer.commitSync(buildOffsetMap(currentOffsets));
}
Производительность (production numbers)
| Стратегия | Throughput | P99 Latency | Error overhead | CPU impact |
|---|---|---|---|---|
| No error handling | 100K msg/s | 5ms | 0% | baseline |
| Try-catch + skip | 98K msg/s | 5ms | +2% | +1% |
| Retry (3×) + DLQ | 85K msg/s | 15ms | +15% | +5% |
| Retry (5×) + async DLQ | 80K msg/s | 20ms | +20% | +8% |
| Per-record commit | 50K msg/s | 50ms | +100% | +20% |
Production War Story
Ситуация: Логистическая компания, обработка GPS треков от 50K курьеров. 200K events/min.
Проблема: Каждые 2–3 часа один курьер присылал corrupt GPS данные (null lat/lon). Консьюмер падал с NullPointerException, restart processing → снова падал на том же сообщении. Consumer lag рос на 50K/min во время downtime.
Расследование:
- Error handling: catch-all + retry без DLQ
- Retry logic: бесконечный retry на все exceptions
- Poison pill эффект: один corrupt record блокировал partition
- 30 min downtime × 50K events/min = 1.5M backlog
- Rebalancing при каждом crash → cascade failures
Решение:
// 1. Retry только для transient errors
Set<Class<? extends Throwable>> retryable = Set.of(
TimeoutException.class,
ConnectException.class,
SocketException.class
);
// 2. DLQ для всех остальных
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 3) // 3 retries, 1s apart
);
handler.setRetryableExceptions(retryable);
// 3. Alert on DLQ messages
// 4. DLQ consumer для manual review и replay
// 5. Schema validation на producer side (prevention)
Результат: 0 downtime за 12 месяцев. DLQ avg 2 messages/day (все corrupt GPS).
Monitoring (JMX, Prometheus, Burrow)
JMX метрики:
kafka.consumer:type=consumer-metrics,client-id=consumer-1
- failed-rebalance-rate-per-hour
- last-heartbeat-seconds-ago
- commit-latency-avg
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
- records-lag-max (max lag across partitions)
- fetch-latency-avg
Spring Kafka / Application:
app.kafka.consumer.error.rate
app.kafka.consumer.dlq.size
app.kafka.consumer.retry.count
app.kafka.consumer.processing.latency.p99
Prometheus + Alertmanager:
- alert: KafkaConsumerErrorRateHigh
expr: rate(kafka_consumer_error_total[5m]) / rate(kafka_consumer_poll_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer error rate > 5%"
- alert: KafkaDLQMessagesReceived
expr: increase(kafka_dlq_messages_total[1h]) > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Messages sent to DLQ in the last hour"
- alert: KafkaConsumerLagGrowing
expr: rate(kafka_consumer_lag[5m]) > 1000
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer lag growing rapidly — processing bottleneck"
- alert: KafkaConsumerRebalanceStorm
expr: rate(kafka_consumer_rebalance_total[10m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Frequent rebalancing detected"
Burrow — consumer lag monitoring с оценкой status:
GET /v3/kafka/cluster/consumer/group/status
{
"status": "ERROR", // lag growing, processing stalled
"status": "WARN", // lag stable but high
"status": "OK" // lag decreasing or zero
}
Highload Best Practices
✅ Retry только для transient errors (определить allowlist)
✅ Max retries (3–5) + DLQ для non-retryable
✅ DLQ с enriched metadata (original topic, offset, exception)
✅ Async error handling для long retries (не block poll loop)
✅ Exponential backoff с jitter (prevent thundering herd)
✅ Idempotent processing (защита от duplicates при retry)
✅ Alert на DLQ messages (never ignore DLQ)
✅ Graceful shutdown с commit in-flight offsets
✅ Schema validation на producer side (prevent poison pills)
✅ Separate error handling thread (не block main processing)
❌ Retry на все exceptions (infinite loop risk)
❌ Retry в main consumer thread (max.poll.interval exceeded)
❌ Без DLQ (lost poison pills)
❌ Commit до обработки (data loss on failure)
❌ Без alerting на errors (silent degradation)
❌ Без idempotent processing (duplicates on retry)
Архитектурные решения
- Error classification — transient vs permanent, определяет retry стратегию
- DLQ enrichment — metadata для debugging и replay capability
- Async error handling — не block poll loop,避免 rebalancing
- Idempotent processing — fundamental для safe retry
- Prevention > cure — schema validation on producer side
Резюме для Senior
- Error handling — критический component production Kafka systems
- Poison pill — самый частый cause of consumer stall
- DLQ — обязательна для poison pill isolation и later analysis
- Retry только transient errors — permanent errors go to DLQ immediately
- max.poll.interval.ms — лимит на total processing + retry time
- Async error handling для long retries —避免 block poll loop
- Idempotent processing — prerequisite для safe retry
- Monitoring: error rate, DLQ size, consumer lag — all need alerts
- Prevention: schema validation, producer-side checks — reduce errors at source
🎯 Шпаргалка для интервью
Обязательно знать:
- Retry только для transient errors (timeout, network); poison pill → сразу в DLQ
- DLQ (Dead Letter Queue) — изолятор проблемных сообщений после max retries
- Thread.sleep в consumer loop — антипаттерн: max.poll.interval exceeded → rebalancing
- Async error handling — не block poll loop, avoid rebalancing
- Idempotent processing — prerequisite для safe retry (защита от дубликатов)
- Классификация: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
- ConsumerRebalanceListener: commitSync in onPartitionsRevoked, await in-flight processing
Частые уточняющие вопросы:
- Что такое poison pill? — Сообщение которое всегда падает (corrupt data); retry бесконечный.
- Почему Thread.sleep плох? — Блокирует poll() → max.poll.interval exceeded → rebalancing.
- Когда НЕ использовать retry? — Poison pill, business rule violations, auth errors.
- Как обработать partial batch failure? — Idempotent processing + commit at safe points.
Красные флаги (НЕ говорить):
- «Retry на все exceptions» — poison pill = infinite loop
- «Коммит до обработки — стандартная практика» — data loss при crash
- «DLQ не нужен если retry работает» — poison pill bypasses retry
- «Thread.sleep в consumer loop — норма» — вызывает rebalancing
Связанные темы:
- [[25. Что такое DLQ (Dead Letter Queue)]]
- [[13. Как работает commit offset]]
- [[15. Что такое rebalancing и когда он происходит]]
- [[10. В чём разница между at-most-once, at-least-once и exactly-once]]