Як обробляти помилки при читанні повідомлень в Kafka
Уявіть поштове відділення. Якщо лист неможливо доставити (неправильна адреса):
🟢 Junior Level
Просте визначення
Обробка помилок при читанні — це набір паттернів для коректної обробки ситуацій коли консьюмер не може обробити повідомлення: помилка парсингу, недоступність БД, timeout зовнішнього сервісу і т.д.
Аналогія
Уявіть поштове відділення. Якщо лист неможливо доставити (неправильна адреса):
- Спробувати знову через годину (retry)
- Якщо після 3 спроб все ще неможливо — покласти в “відділ ненайдених листів” (DLQ)
- Ніколи не викидати лист без запису (log everything)
Базовий шаблон обробки
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
log.error("Error processing record: {}", record, e);
// НЕ коммітимо offset → повідомлення буде прочитано знову
}
}
consumer.commitSync();
}
} catch (WakeupException e) {
log.info("Shutdown requested");
} finally {
consumer.commitSync(); // фінальний комміт
consumer.close();
}
private void processRecord(ConsumerRecord<String, String> record) {
String value = record.value();
Order order = parseOrder(value); // може кинути exception
saveToDatabase(order); // може кинути exception
}
Graceful shutdown
// В іншому потоці (наприклад, shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
consumer.wakeup(); // перериває poll()
}));
Коли це важливо
- Будь-який production додаток — помилки неминучі
- Фінансові дані — не можна втрачати транзакції
- Інтеграції із зовнішніми сервісами — timeouts, rate limits
- Data pipelines — corrupt data, schema mismatch
🟡 Middle Level
Стратегії обробки помилок
1. Retry з exponential backoff
private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;
private void processWithRetry(ConsumerRecord<String, String> record) {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
processRecord(record);
return; // success
} catch (RetryableException e) {
retries++;
long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
// Exponential backoff: 1s, 2s, 4s
log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
// ⚠️ АНТИПАТТЕРН: Thread.sleep() в consumer loop блокує poll()
// і може викликати max.poll.interval.ms exceeded.
// Використовуйте async retry або DLQ замість sleep.
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}
2. Dead Letter Queue (DLQ)
KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);
private void processWithDLQ(ConsumerRecord<String, String> record) {
try {
processWithRetry(record);
} catch (Exception e) {
// Відправляємо в DLQ
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
"orders-dlq",
record.key(),
record.value()
);
// enrich з метаданими помилки
dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
log.error("Sent to DLQ: {}", record, e);
}
}
3. Skip і продовжити
// Для некритичних даних — log і skip
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (NonCriticalException e) {
log.warn("Skipping non-critical error: {}", e.getMessage());
// Continue — offset буде закоммічений
} catch (CriticalException e) {
log.error("Critical error, stopping: {}", e.getMessage());
throw e; // Не коммітимо → retry при перезапуску
}
}
consumer.commitSync();
Таблиця типових помилок
| Помилка | Симптоми | Наслідки | Рішення |
|---|---|---|---|
| Нескінченний retry | Консьюмер завис на одному повідомленні | Consumer lag росте, pipeline зупинений | Max retries + DLQ |
| Комміт до обробки | Дані втрачені при падінні | Silent data loss | Комміт ПІСЛЯ обробки |
| Без DLQ | Проблемні повідомлення втрачаються | Немає можливості аналізу | Завжди використовувати DLQ |
| Thread.sleep в consumer loop | max.poll.interval.ms exceeded | Rebalancing, duplicate processing | Асинхронний retry |
| Catch Exception (занадто широко) | Приховує критичні помилки | Непередбачувана поведінка | Catch specific exceptions |
| Обробка в тому ж потоці | Один failing record блокує весь partition | Throughput = 0 | Separate error handling thread |
Класифікація помилок
| Тип помилки | Приклад | Retry? | DLQ? |
|---|---|---|---|
| Transient | Timeout БД, network glitch | Так (3–5 раз) | Якщо retries exhausted |
| Poison pill | Corrupt data, schema mismatch | Ні | Одразу в DLQ |
| External dependency | API down, rate limit | Так (з backoff) | Якщо retries exhausted |
| Business logic | Invalid order amount | Ні | В DLQ з метаданими |
| Fatal | OOM, disk full | Ні | Alert + stop |
Коли НЕ використовувати retry
- Poison pill — corrupt data, schema violations (ніколи не виправиться retry)
- Business rule violations — negative amount, invalid status (потрібна ручна перевірка)
- Authentication/authorization errors — credential issues не вирішаться retry
- Rate limit від зовнішнього API — retry через backoff, АЛЕ не нескінченно
🔴 Senior Level
Глибокі внутрішності
Error handling в Spring Kafka
@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
processOrder(record.value());
}
// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
// Retryable exceptions
var retryableExceptions = Map.of(
RetriableException.class, RetryState.ALWAYS_RETRY
);
// FixedBackOff або ExponentialBackOff
var backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(60000L); // max 60s total retry
backOff.setMaxAttempts(5);
// DLQ producer
var dlqSender = new DeadLetterPublishingRecoverer(template);
return new DefaultErrorHandler(dlqSender, backOff);
}
Error handling в Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
stream.process(() -> new Processor<String, String>() {
@Override
public void process(Record<String, String> record) {
try {
processRecord(record);
} catch (Exception e) {
// В Kafka Streams немає DLQ out-of-the-box
// Потрібно використовувати side output
context().forward(record.withValue("ERROR: " + e.getMessage()),
To.child("error-topic"));
}
}
});
ConsumerRebalanceListener + error handling
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// При ребалансі: коммітимо поточний offset
// Якщо є in-flight processing — чекаємо completion
awaitInFlightProcessing();
consumer.commitSync();
cleanupState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// При отриманні партицій: завантажуємо state
initializeState(partitions);
}
});
Trade-offs
| Стратегія | Data safety | Throughput | Complexity | When to use |
|---|---|---|---|---|
| Retry + commit after | Висока | Низька | Середня | Критичні дані |
| Retry + async DLQ | Висока | Середня | Висока | Production standard |
| Skip + log | Низька | Висока | Низька | Метрики, логи |
| Block until success | Максимальна | 0 при error | Низька | Single-record processing |
| Side output (Streams) | Висока | Висока | Середня | Kafka Streams apps |
Edge Cases
1. Poison Pill — повідомлення яке завжди падає:
Сценарій:
Partition 0: offset 1000 — corrupt JSON
Consumer читає offset 1000 → parse error → retry → error → retry...
Consumer loop нескінченно на одному повідомленні
Усі наступні повідомлення (1001, 1002, ...) НЕ обробляються
Виявлення:
Consumer lag для partition 0 не змінюється
Log показує повторювану помилку на offset 1000
Рішення:
1. DLQ після max retries
2. Seek past the poison pill:
consumer.seek(new TopicPartition("orders", 0), 1001);
3. Аналіз DLQ повідомлення і fix producer schema
2. max.poll.interval.ms exceeded при retry:
// ПРОБЛЕМА:
while (retries < MAX_RETRIES) {
try {
process(record);
break;
} catch (Exception e) {
Thread.sleep(10000); // 10s sleep × 5 retries = 50s
retries++;
}
}
// Якщо max.poll.interval.ms = 300000 (5 min) — OK
// Якщо max.poll.interval.ms = 30000 (30s) — consumer kicked out!
// РІШЕННЯ: async retry
CompletableFuture.runAsync(() -> {
retryWithBackoff(record);
}).whenComplete((result, ex) -> {
if (ex == null) {
commitOffset(record);
} else {
sendToDLQ(record, ex);
commitOffset(record);
}
});
3. Partial batch failure:
Сценарій:
Consumer отримав батч з 500 records
Records 0-499 оброблені успішно
Record 500 викликає OutOfMemoryError
Consumer crash → жоден offset з batch не закоммічений
При перезапуску:
Усі 500 records будуть прочитані знову
Records 0-499 → duplicate processing!
Рішення:
1. Idempotent processing (upsert, dedup keys)
2. Per-record commit (рідко, high overhead)
3. Commit at safe points (кожні 100 records)
4. Transactional processing
Highload Best Practices
✅ Retry тільки для transient errors (визначити allowlist)
✅ Max retries (3–5) + DLQ для non-retryable
✅ DLQ з enriched metadata (original topic, offset, exception)
✅ Async error handling для long retries (не block poll loop)
✅ Exponential backoff з jitter (prevent thundering herd)
✅ Idempotent processing (захист від дублікатів при retry)
✅ Alert на DLQ messages (never ignore DLQ)
✅ Graceful shutdown з commit in-flight offsets
✅ Schema validation на producer side (prevent poison pills)
✅ Separate error handling thread (не block main processing)
❌ Retry на всі exceptions (infinite loop risk)
❌ Retry в main consumer thread (max.poll.interval exceeded)
❌ Без DLQ (lost poison pills)
❌ Комміт до обробки (data loss on failure)
❌ Без alerting на errors (silent degradation)
❌ Без idempotent processing (duplicates on retry)
Архітектурні рішення
- Error classification — transient vs permanent, визначає retry стратегію
- DLQ enrichment — metadata для debugging і replay capability
- Async error handling — не block poll loop, уникати rebalancing
- Idempotent processing — фундаментальний для safe retry
- Prevention > cure — schema validation на producer side
Резюме для Senior
- Error handling — критичний компонент production Kafka systems
- Poison pill — найчастіша причина consumer stall
- DLQ — обов’язкова для poison pill isolation і подальшого аналізу
- Retry тільки transient errors — permanent errors йдуть в DLQ одразу
- max.poll.interval.ms — ліміт на total processing + retry time
- Async error handling для long retries — уникати block poll loop
- Idempotent processing — prerequisite для safe retry
- Monitoring: error rate, DLQ size, consumer lag — все потребує alerts
- Prevention: schema validation, producer-side checks — зменшують помилки в джерелі
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Retry тільки для transient errors (timeout, network); poison pill → одразу в DLQ
- DLQ (Dead Letter Queue) — ізолятор проблемних повідомлень після max retries
- Thread.sleep в consumer loop — антипаттерн: max.poll.interval exceeded → rebalancing
- Async error handling — не block poll loop, уникати rebalancing
- Idempotent processing — prerequisite для safe retry (захист від дублікатів)
- Класифікація: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
- ConsumerRebalanceListener: commitSync в onPartitionsRevoked, await in-flight processing
Часті уточнюючі запитання:
- Що таке poison pill? — Повідомлення яке завжди падає (corrupt data); retry нескінченний.
- Чому Thread.sleep поганий? — Блокує poll() → max.poll.interval exceeded → rebalancing.
- Коли НЕ використовувати retry? — Poison pill, business rule violations, auth errors.
- Як обробити partial batch failure? — Idempotent processing + commit at safe points.
Червоні прапорці (НЕ говорити):
- «Retry на всі exceptions» — poison pill = infinite loop
- «Комміт до обробки — стандартна практика» — data loss при crash
- «DLQ не потрібен якщо retry працює» — poison pill bypasses retry
- «Thread.sleep в consumer loop — норма» — викликає rebalancing
Пов’язані теми:
- [[25. Що таке DLQ (Dead Letter Queue)]]
- [[13. Як працює commit offset]]
- [[15. Що таке ребаланс і коли він відбувається]]
- [[10. У чому різниця між at-most-once, at-least-once і exactly-once]]