Питання 24 · Розділ 15

Як обробляти помилки при читанні повідомлень в Kafka

Уявіть поштове відділення. Якщо лист неможливо доставити (неправильна адреса):

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Просте визначення

Обробка помилок при читанні — це набір паттернів для коректної обробки ситуацій коли консьюмер не може обробити повідомлення: помилка парсингу, недоступність БД, timeout зовнішнього сервісу і т.д.

Аналогія

Уявіть поштове відділення. Якщо лист неможливо доставити (неправильна адреса):

  1. Спробувати знову через годину (retry)
  2. Якщо після 3 спроб все ще неможливо — покласти в “відділ ненайдених листів” (DLQ)
  3. Ніколи не викидати лист без запису (log everything)

Базовий шаблон обробки

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                log.error("Error processing record: {}", record, e);
                // НЕ коммітимо offset → повідомлення буде прочитано знову
            }
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    log.info("Shutdown requested");
} finally {
    consumer.commitSync();  // фінальний комміт
    consumer.close();
}

private void processRecord(ConsumerRecord<String, String> record) {
    String value = record.value();
    Order order = parseOrder(value);  // може кинути exception
    saveToDatabase(order);            // може кинути exception
}

Graceful shutdown

// В іншому потоці (наприклад, shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    running = false;
    consumer.wakeup();  // перериває poll()
}));

Коли це важливо

  • Будь-який production додаток — помилки неминучі
  • Фінансові дані — не можна втрачати транзакції
  • Інтеграції із зовнішніми сервісами — timeouts, rate limits
  • Data pipelines — corrupt data, schema mismatch

🟡 Middle Level

Стратегії обробки помилок

1. Retry з exponential backoff

private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;

private void processWithRetry(ConsumerRecord<String, String> record) {
    int retries = 0;
    while (retries < MAX_RETRIES) {
        try {
            processRecord(record);
            return;  // success
        } catch (RetryableException e) {
            retries++;
            long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
            // Exponential backoff: 1s, 2s, 4s
            log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
            // ⚠️ АНТИПАТТЕРН: Thread.sleep() в consumer loop блокує poll()
            // і може викликати max.poll.interval.ms exceeded.
            // Використовуйте async retry або DLQ замість sleep.
            try {
                Thread.sleep(delay);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ie);
            }
        }
    }
    throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}

2. Dead Letter Queue (DLQ)

KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);

private void processWithDLQ(ConsumerRecord<String, String> record) {
    try {
        processWithRetry(record);
    } catch (Exception e) {
        // Відправляємо в DLQ
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            "orders-dlq",
            record.key(),
            record.value()
        );
        // enrich з метаданими помилки
        dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
        dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
        dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
        dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
        dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
        dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        dlqProducer.send(dlqRecord);
        log.error("Sent to DLQ: {}", record, e);
    }
}

3. Skip і продовжити

// Для некритичних даних — log і skip
for (ConsumerRecord<String, String> record : records) {
    try {
        processRecord(record);
    } catch (NonCriticalException e) {
        log.warn("Skipping non-critical error: {}", e.getMessage());
        // Continue — offset буде закоммічений
    } catch (CriticalException e) {
        log.error("Critical error, stopping: {}", e.getMessage());
        throw e;  // Не коммітимо → retry при перезапуску
    }
}
consumer.commitSync();

Таблиця типових помилок

Помилка Симптоми Наслідки Рішення
Нескінченний retry Консьюмер завис на одному повідомленні Consumer lag росте, pipeline зупинений Max retries + DLQ
Комміт до обробки Дані втрачені при падінні Silent data loss Комміт ПІСЛЯ обробки
Без DLQ Проблемні повідомлення втрачаються Немає можливості аналізу Завжди використовувати DLQ
Thread.sleep в consumer loop max.poll.interval.ms exceeded Rebalancing, duplicate processing Асинхронний retry
Catch Exception (занадто широко) Приховує критичні помилки Непередбачувана поведінка Catch specific exceptions
Обробка в тому ж потоці Один failing record блокує весь partition Throughput = 0 Separate error handling thread

Класифікація помилок

Тип помилки Приклад Retry? DLQ?
Transient Timeout БД, network glitch Так (3–5 раз) Якщо retries exhausted
Poison pill Corrupt data, schema mismatch Ні Одразу в DLQ
External dependency API down, rate limit Так (з backoff) Якщо retries exhausted
Business logic Invalid order amount Ні В DLQ з метаданими
Fatal OOM, disk full Ні Alert + stop

Коли НЕ використовувати retry

  • Poison pill — corrupt data, schema violations (ніколи не виправиться retry)
  • Business rule violations — negative amount, invalid status (потрібна ручна перевірка)
  • Authentication/authorization errors — credential issues не вирішаться retry
  • Rate limit від зовнішнього API — retry через backoff, АЛЕ не нескінченно

🔴 Senior Level

Глибокі внутрішності

Error handling в Spring Kafka

@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
    processOrder(record.value());
}

// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
    // Retryable exceptions
    var retryableExceptions = Map.of(
        RetriableException.class, RetryState.ALWAYS_RETRY
    );

    // FixedBackOff або ExponentialBackOff
    var backOff = new ExponentialBackOff(1000L, 2.0);
    backOff.setMaxElapsedTime(60000L);  // max 60s total retry
    backOff.setMaxAttempts(5);

    // DLQ producer
    var dlqSender = new DeadLetterPublishingRecoverer(template);

    return new DefaultErrorHandler(dlqSender, backOff);
}

Error handling в Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");

stream.process(() -> new Processor<String, String>() {
    @Override
    public void process(Record<String, String> record) {
        try {
            processRecord(record);
        } catch (Exception e) {
            // В Kafka Streams немає DLQ out-of-the-box
            // Потрібно використовувати side output
            context().forward(record.withValue("ERROR: " + e.getMessage()),
                             To.child("error-topic"));
        }
    }
});

ConsumerRebalanceListener + error handling

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // При ребалансі: коммітимо поточний offset
        // Якщо є in-flight processing — чекаємо completion
        awaitInFlightProcessing();
        consumer.commitSync();
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // При отриманні партицій: завантажуємо state
        initializeState(partitions);
    }
});

Trade-offs

Стратегія Data safety Throughput Complexity When to use
Retry + commit after Висока Низька Середня Критичні дані
Retry + async DLQ Висока Середня Висока Production standard
Skip + log Низька Висока Низька Метрики, логи
Block until success Максимальна 0 при error Низька Single-record processing
Side output (Streams) Висока Висока Середня Kafka Streams apps

Edge Cases

1. Poison Pill — повідомлення яке завжди падає:

Сценарій:
  Partition 0: offset 1000 — corrupt JSON
  Consumer читає offset 1000 → parse error → retry → error → retry...
  Consumer loop нескінченно на одному повідомленні
  Усі наступні повідомлення (1001, 1002, ...) НЕ обробляються

Виявлення:
  Consumer lag для partition 0 не змінюється
  Log показує повторювану помилку на offset 1000

Рішення:
  1. DLQ після max retries
  2. Seek past the poison pill:
     consumer.seek(new TopicPartition("orders", 0), 1001);
  3. Аналіз DLQ повідомлення і fix producer schema

2. max.poll.interval.ms exceeded при retry:

// ПРОБЛЕМА:
while (retries < MAX_RETRIES) {
    try {
        process(record);
        break;
    } catch (Exception e) {
        Thread.sleep(10000);  // 10s sleep × 5 retries = 50s
        retries++;
    }
}
// Якщо max.poll.interval.ms = 300000 (5 min) — OK
// Якщо max.poll.interval.ms = 30000 (30s) — consumer kicked out!

// РІШЕННЯ: async retry
CompletableFuture.runAsync(() -> {
    retryWithBackoff(record);
}).whenComplete((result, ex) -> {
    if (ex == null) {
        commitOffset(record);
    } else {
        sendToDLQ(record, ex);
        commitOffset(record);
    }
});

3. Partial batch failure:

Сценарій:
  Consumer отримав батч з 500 records
  Records 0-499 оброблені успішно
  Record 500 викликає OutOfMemoryError
  Consumer crash → жоден offset з batch не закоммічений

При перезапуску:
  Усі 500 records будуть прочитані знову
  Records 0-499 → duplicate processing!

Рішення:
  1. Idempotent processing (upsert, dedup keys)
  2. Per-record commit (рідко, high overhead)
  3. Commit at safe points (кожні 100 records)
  4. Transactional processing

Highload Best Practices

✅ Retry тільки для transient errors (визначити allowlist)
✅ Max retries (3–5) + DLQ для non-retryable
✅ DLQ з enriched metadata (original topic, offset, exception)
✅ Async error handling для long retries (не block poll loop)
✅ Exponential backoff з jitter (prevent thundering herd)
✅ Idempotent processing (захист від дублікатів при retry)
✅ Alert на DLQ messages (never ignore DLQ)
✅ Graceful shutdown з commit in-flight offsets
✅ Schema validation на producer side (prevent poison pills)
✅ Separate error handling thread (не block main processing)

❌ Retry на всі exceptions (infinite loop risk)
❌ Retry в main consumer thread (max.poll.interval exceeded)
❌ Без DLQ (lost poison pills)
❌ Комміт до обробки (data loss on failure)
❌ Без alerting на errors (silent degradation)
❌ Без idempotent processing (duplicates on retry)

Архітектурні рішення

  1. Error classification — transient vs permanent, визначає retry стратегію
  2. DLQ enrichment — metadata для debugging і replay capability
  3. Async error handling — не block poll loop, уникати rebalancing
  4. Idempotent processing — фундаментальний для safe retry
  5. Prevention > cure — schema validation на producer side

Резюме для Senior

  • Error handling — критичний компонент production Kafka systems
  • Poison pill — найчастіша причина consumer stall
  • DLQ — обов’язкова для poison pill isolation і подальшого аналізу
  • Retry тільки transient errors — permanent errors йдуть в DLQ одразу
  • max.poll.interval.ms — ліміт на total processing + retry time
  • Async error handling для long retries — уникати block poll loop
  • Idempotent processing — prerequisite для safe retry
  • Monitoring: error rate, DLQ size, consumer lag — все потребує alerts
  • Prevention: schema validation, producer-side checks — зменшують помилки в джерелі

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Retry тільки для transient errors (timeout, network); poison pill → одразу в DLQ
  • DLQ (Dead Letter Queue) — ізолятор проблемних повідомлень після max retries
  • Thread.sleep в consumer loop — антипаттерн: max.poll.interval exceeded → rebalancing
  • Async error handling — не block poll loop, уникати rebalancing
  • Idempotent processing — prerequisite для safe retry (захист від дублікатів)
  • Класифікація: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
  • ConsumerRebalanceListener: commitSync в onPartitionsRevoked, await in-flight processing

Часті уточнюючі запитання:

  • Що таке poison pill? — Повідомлення яке завжди падає (corrupt data); retry нескінченний.
  • Чому Thread.sleep поганий? — Блокує poll() → max.poll.interval exceeded → rebalancing.
  • Коли НЕ використовувати retry? — Poison pill, business rule violations, auth errors.
  • Як обробити partial batch failure? — Idempotent processing + commit at safe points.

Червоні прапорці (НЕ говорити):

  • «Retry на всі exceptions» — poison pill = infinite loop
  • «Комміт до обробки — стандартна практика» — data loss при crash
  • «DLQ не потрібен якщо retry працює» — poison pill bypasses retry
  • «Thread.sleep в consumer loop — норма» — викликає rebalancing

Пов’язані теми:

  • [[25. Що таке DLQ (Dead Letter Queue)]]
  • [[13. Як працює commit offset]]
  • [[15. Що таке ребаланс і коли він відбувається]]
  • [[10. У чому різниця між at-most-once, at-least-once і exactly-once]]