Вопрос 24 · Раздел 15

Как обрабатывать ошибки при чтении сообщений в Kafka

Представьте почтовое отделение. Если письмо невозможно доставить (неправильный адрес):

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

Обработка ошибок при чтении — это набор паттернов для корректной обработки ситуаций когда консьюмер не может обработать сообщение: ошибка парсинга, недоступность БД, timeout внешнего сервиса и т.д.

Аналогия

Представьте почтовое отделение. Если письмо невозможно доставить (неправильный адрес):

  1. Попробовать снова через час (retry)
  2. Если после 3 попыток всё ещё невозможно — положить в “отдел ненайденных писем” (DLQ)
  3. Никогда не выбрасывать письмо без записи (log everything)

Базовый шаблон обработки

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                log.error("Error processing record: {}", record, e);
                // НЕ коммитим offset → сообщение будет прочитано снова
            }
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    log.info("Shutdown requested");
} finally {
    consumer.commitSync();  // финальный коммит
    consumer.close();
}

private void processRecord(ConsumerRecord<String, String> record) {
    String value = record.value();
    Order order = parseOrder(value);  // может бросить exception
    saveToDatabase(order);            // может бросить exception
}

Graceful shutdown

// В другом потоке (например, shutdown hook):
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    running = false;
    consumer.wakeup();  // прерывает poll()
}));

Когда это важно

  • Любое production приложение — ошибки неизбежны
  • Финансовые данные — нельзя терять транзакции
  • Интеграции с внешними сервисами — timeouts, rate limits
  • Data pipelines — corrupt data, schema mismatch

🟡 Middle Level

Стратегии обработки ошибок

1. Retry с exponential backoff

private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 1000;

private void processWithRetry(ConsumerRecord<String, String> record) {
    int retries = 0;
    while (retries < MAX_RETRIES) {
        try {
            processRecord(record);
            return;  // success
        } catch (RetryableException e) {
            retries++;
            long delay = BASE_DELAY_MS * (long) Math.pow(2, retries - 1);
            // Exponential backoff: 1s, 2s, 4s
            log.warn("Retryable error, attempt {}/{}: {}", retries, MAX_RETRIES, e.getMessage());
            // ⚠️ АНТИПАТТЕРН: Thread.sleep() в consumer loop блокирует poll()
            // и может вызвать max.poll.interval.ms exceeded.
            // Используйте async retry или DLQ вместо sleep.
            try {
                Thread.sleep(delay);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ie);
            }
        }
    }
    throw new MaxRetriesExceededException("Failed after " + MAX_RETRIES + " retries");
}

2. Dead Letter Queue (DLQ)

KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);

private void processWithDLQ(ConsumerRecord<String, String> record) {
    try {
        processWithRetry(record);
    } catch (Exception e) {
        // Отправляем в DLQ
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            "orders-dlq",
            record.key(),
            record.value()
        );
        // enrich с метаданными ошибки
        dlqRecord.headers().add("x-original-topic", record.topic().getBytes());
        dlqRecord.headers().add("x-original-partition", String.valueOf(record.partition()).getBytes());
        dlqRecord.headers().add("x-original-offset", String.valueOf(record.offset()).getBytes());
        dlqRecord.headers().add("x-exception-message", e.getMessage().getBytes());
        dlqRecord.headers().add("x-exception-stacktrace", getStackTrace(e).getBytes());
        dlqRecord.headers().add("x-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        dlqProducer.send(dlqRecord);
        log.error("Sent to DLQ: {}", record, e);
    }
}

3. Skip и продолжить

// Для некритичных данных — log и skip
for (ConsumerRecord<String, String> record : records) {
    try {
        processRecord(record);
    } catch (NonCriticalException e) {
        log.warn("Skipping non-critical error: {}", e.getMessage());
        // Continue — offset будет закоммичен
    } catch (CriticalException e) {
        log.error("Critical error, stopping: {}", e.getMessage());
        throw e;  // Не коммитим → retry при рестарте
    }
}
consumer.commitSync();

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
Бесконечный retry Консьюмер завис на одном сообщении Consumer lag растёт, pipeline остановлен Max retries + DLQ
Коммит до обработки Данные потеряны при падении Silent data loss Коммит ПОСЛЕ обработки
Без DLQ Проблемные сообщения теряются Нет возможности анализа Всегда использовать DLQ
Thread.sleep в consumer loop max.poll.interval.ms exceeded Rebalancing, duplicate processing Асинхронный retry
Catch Exception (слишком широко) Скрывает критичные ошибки Непредсказуемое поведение Catch specific exceptions
Обработка в том же потоке Один failing record блокирует весь partition Throughput = 0 Separate error handling thread

Классификация ошибок

Тип ошибки Пример Retry? DLQ?
Transient Timeout БД, network glitch Да (3–5 раз) Если retries exhausted
Poison pill Corrupt data, schema mismatch Нет Сразу в DLQ
External dependency API down, rate limit Да (с backoff) Если retries exhausted
Business logic Invalid order amount Нет В DLQ с метаданными
Fatal OOM, disk full Нет Alert + stop

Когда НЕ использовать retry

  • Poison pill — corrupt data, schema violations (никогда не исправится retry)
  • Business rule violations — negative amount, invalid status (нужна ручная проверка)
  • Authentication/authorization errors — credential issues не решатся retry
  • Rate limit от внешнего API — retry через backoff, НО не бесконечно

🔴 Senior Level

Возможные follow-up вопросы от интервьюера

  • «Что если DLQ продюсер тоже упадёт?» — retry limit → manual-review queue
  • «Как обработать poison pill в Kafka Streams?» — Quarantine Transformer
  • «Что если сообщение не десериализуется?» — bytes десериализатор → DLQ

Глубокие внутренности

Error handling в Spring Kafka

@KafkaListener(topics = "orders", groupId = "order-service")
public void listen(ConsumerRecord<String, String> record) {
    processOrder(record.value());
}

// DefaultErrorHandler configuration
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> template) {
    // Retryable exceptions
    var retryableExceptions = Map.of(
        RetriableException.class, RetryState.ALWAYS_RETRY
    );

    // FixedBackOff или ExponentialBackOff
    var backOff = new ExponentialBackOff(1000L, 2.0);
    backOff.setMaxElapsedTime(60000L);  // max 60s total retry
    backOff.setMaxAttempts(5);

    // DLQ producer
    var dlqSender = new DeadLetterPublishingRecoverer(template);

    return new DefaultErrorHandler(dlqSender, backOff);
}

Error handling в Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");

stream.process(() -> new Processor<String, String>() {
    @Override
    public void process(Record<String, String> record) {
        try {
            processRecord(record);
        } catch (Exception e) {
            // В Kafka Streams нет DLQ out-of-the-box
            // Нужно использовать side output
            context().forward(record.withValue("ERROR: " + e.getMessage()),
                             To.child("error-topic"));
        }
    }
});

ConsumerRebalanceListener + error handling

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // При ребалансе: коммитим текущий offset
        // Если есть in-flight processing — ждём completion
        awaitInFlightProcessing();
        consumer.commitSync();
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // При получении партиций: загружаем state
        initializeState(partitions);
    }
});

Trade-offs

Стратегия Data safety Throughput Complexity When to use
Retry + commit after Высокая Низкая Средняя Критичные данные
Retry + async DLQ Высокая Средняя Высокая Production standard
Skip + log Низкая Высокая Низкая Метрики, логи
Block until success Максимальная 0 при error Низкая Single-record processing
Side output (Streams) Высокая Высокая Средняя Kafka Streams apps

Edge Cases

1. Poison Pill — сообщение которое всегда падает:

Сценарий:
  Partition 0: offset 1000 — corrupt JSON
  Consumer читает offset 1000 → parse error → retry → error → retry...
  Consumer loop бесконечно на одном сообщении
  Все последующие сообщения (1001, 1002, ...) НЕ обрабатываются

Обнаружение:
  Consumer lag для partition 0 не меняется
  Log показывает повторяющуюся ошибку на offset 1000

Решение:
  1. DLQ после max retries
  2. Seek past the poison pill:
     consumer.seek(new TopicPartition("orders", 0), 1001);
  3. Анализ DLQ сообщения и fix producer schema

2. max.poll.interval.ms exceeded при retry:

// ПРОБЛЕМА:
while (retries < MAX_RETRIES) {
    try {
        process(record);
        break;
    } catch (Exception e) {
        Thread.sleep(10000);  // 10s sleep × 5 retries = 50s
        retries++;
    }
}
// Если max.poll.interval.ms = 300000 (5 min) — OK
// Если max.poll.interval.ms = 30000 (30s) — consumer kicked out!

// РЕШЕНИЕ: async retry
CompletableFuture.runAsync(() -> {
    retryWithBackoff(record);
}).whenComplete((result, ex) -> {
    if (ex == null) {
        commitOffset(record);
    } else {
        sendToDLQ(record, ex);
        commitOffset(record);
    }
});

3. Partial batch failure:

Сценарий:
  Consumer получил batch из 500 records
  Records 0-499 обработаны успешно
  Record 500 вызывает OutOfMemoryError
  Consumer crash → ни один offset из batch не закоммичен

При рестарте:
  Все 500 records будут прочитаны снова
  Records 0-499 → duplicate processing!

Решение:
  1. Idempotent processing (upsert, dedup keys)
  2. Per-record commit (rare, high overhead)
  3. Commit at safe points (every 100 records)
  4. Transactional processing

4. Error handling с manual partition assignment:

// При assign() вместо subscribe() — нет rebalance
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
consumer.seek(partition, lastProcessedOffset);

// Error handling simpler — no rebalance interruption
// НО: нужно manually handle:
//  - Offset tracking
//  - Partition management
//  - Consumer lifecycle

5. Concurrent error handling:

// Multi-threaded processing с error handling
ExecutorService executor = Executors.newFixedThreadPool(10);
Map<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        executor.submit(() -> {
            try {
                processRecord(record);
                currentOffsets.compute(
                    new TopicPartition(record.topic(), record.partition()),
                    (tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
                );
            } catch (Exception e) {
                sendToDLQ(record, e);
                // Still commit — DLQ handled
                currentOffsets.compute(
                    new TopicPartition(record.topic(), record.partition()),
                    (tp, offset) -> Math.max(offset == null ? 0L : offset, record.offset() + 1)
                );
            }
        });
    }

    // Commit max offsets
    consumer.commitSync(buildOffsetMap(currentOffsets));
}

Производительность (production numbers)

Стратегия Throughput P99 Latency Error overhead CPU impact
No error handling 100K msg/s 5ms 0% baseline
Try-catch + skip 98K msg/s 5ms +2% +1%
Retry (3×) + DLQ 85K msg/s 15ms +15% +5%
Retry (5×) + async DLQ 80K msg/s 20ms +20% +8%
Per-record commit 50K msg/s 50ms +100% +20%

Production War Story

Ситуация: Логистическая компания, обработка GPS треков от 50K курьеров. 200K events/min.

Проблема: Каждые 2–3 часа один курьер присылал corrupt GPS данные (null lat/lon). Консьюмер падал с NullPointerException, restart processing → снова падал на том же сообщении. Consumer lag рос на 50K/min во время downtime.

Расследование:

  • Error handling: catch-all + retry без DLQ
  • Retry logic: бесконечный retry на все exceptions
  • Poison pill эффект: один corrupt record блокировал partition
  • 30 min downtime × 50K events/min = 1.5M backlog
  • Rebalancing при каждом crash → cascade failures

Решение:

// 1. Retry только для transient errors
Set<Class<? extends Throwable>> retryable = Set.of(
    TimeoutException.class,
    ConnectException.class,
    SocketException.class
);

// 2. DLQ для всех остальных
DefaultErrorHandler handler = new DefaultErrorHandler(
    new DeadLetterPublishingRecoverer(kafkaTemplate),
    new FixedBackOff(1000L, 3)  // 3 retries, 1s apart
);
handler.setRetryableExceptions(retryable);

// 3. Alert on DLQ messages
// 4. DLQ consumer для manual review и replay
// 5. Schema validation на producer side (prevention)

Результат: 0 downtime за 12 месяцев. DLQ avg 2 messages/day (все corrupt GPS).

Monitoring (JMX, Prometheus, Burrow)

JMX метрики:

kafka.consumer:type=consumer-metrics,client-id=consumer-1
  - failed-rebalance-rate-per-hour
  - last-heartbeat-seconds-ago
  - commit-latency-avg

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
  - records-lag-max (max lag across partitions)
  - fetch-latency-avg

Spring Kafka / Application:

app.kafka.consumer.error.rate
app.kafka.consumer.dlq.size
app.kafka.consumer.retry.count
app.kafka.consumer.processing.latency.p99

Prometheus + Alertmanager:

- alert: KafkaConsumerErrorRateHigh
  expr: rate(kafka_consumer_error_total[5m]) / rate(kafka_consumer_poll_total[5m]) > 0.05
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer error rate > 5%"

- alert: KafkaDLQMessagesReceived
  expr: increase(kafka_dlq_messages_total[1h]) > 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Messages sent to DLQ in the last hour"

- alert: KafkaConsumerLagGrowing
  expr: rate(kafka_consumer_lag[5m]) > 1000
  for: 10m
  labels:
    severity: critical
  annotations:
    summary: "Consumer lag growing rapidly  processing bottleneck"

- alert: KafkaConsumerRebalanceStorm
  expr: rate(kafka_consumer_rebalance_total[10m]) > 0.1
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "Frequent rebalancing detected"

Burrow — consumer lag monitoring с оценкой status:

GET /v3/kafka/cluster/consumer/group/status

{
  "status": "ERROR",     // lag growing, processing stalled
  "status": "WARN",      // lag stable but high
  "status": "OK"         // lag decreasing or zero
}

Highload Best Practices

✅ Retry только для transient errors (определить allowlist)
✅ Max retries (3–5) + DLQ для non-retryable
✅ DLQ с enriched metadata (original topic, offset, exception)
✅ Async error handling для long retries (не block poll loop)
✅ Exponential backoff с jitter (prevent thundering herd)
✅ Idempotent processing (защита от duplicates при retry)
✅ Alert на DLQ messages (never ignore DLQ)
✅ Graceful shutdown с commit in-flight offsets
✅ Schema validation на producer side (prevent poison pills)
✅ Separate error handling thread (не block main processing)

❌ Retry на все exceptions (infinite loop risk)
❌ Retry в main consumer thread (max.poll.interval exceeded)
❌ Без DLQ (lost poison pills)
❌ Commit до обработки (data loss on failure)
❌ Без alerting на errors (silent degradation)
❌ Без idempotent processing (duplicates on retry)

Архитектурные решения

  1. Error classification — transient vs permanent, определяет retry стратегию
  2. DLQ enrichment — metadata для debugging и replay capability
  3. Async error handling — не block poll loop,避免 rebalancing
  4. Idempotent processing — fundamental для safe retry
  5. Prevention > cure — schema validation on producer side

Резюме для Senior

  • Error handling — критический component production Kafka systems
  • Poison pill — самый частый cause of consumer stall
  • DLQ — обязательна для poison pill isolation и later analysis
  • Retry только transient errors — permanent errors go to DLQ immediately
  • max.poll.interval.ms — лимит на total processing + retry time
  • Async error handling для long retries —避免 block poll loop
  • Idempotent processing — prerequisite для safe retry
  • Monitoring: error rate, DLQ size, consumer lag — all need alerts
  • Prevention: schema validation, producer-side checks — reduce errors at source

🎯 Шпаргалка для интервью

Обязательно знать:

  • Retry только для transient errors (timeout, network); poison pill → сразу в DLQ
  • DLQ (Dead Letter Queue) — изолятор проблемных сообщений после max retries
  • Thread.sleep в consumer loop — антипаттерн: max.poll.interval exceeded → rebalancing
  • Async error handling — не block poll loop, avoid rebalancing
  • Idempotent processing — prerequisite для safe retry (защита от дубликатов)
  • Классификация: Transient (retry), Poison pill (DLQ), External dependency (retry + DLQ), Fatal (stop)
  • ConsumerRebalanceListener: commitSync in onPartitionsRevoked, await in-flight processing

Частые уточняющие вопросы:

  • Что такое poison pill? — Сообщение которое всегда падает (corrupt data); retry бесконечный.
  • Почему Thread.sleep плох? — Блокирует poll() → max.poll.interval exceeded → rebalancing.
  • Когда НЕ использовать retry? — Poison pill, business rule violations, auth errors.
  • Как обработать partial batch failure? — Idempotent processing + commit at safe points.

Красные флаги (НЕ говорить):

  • «Retry на все exceptions» — poison pill = infinite loop
  • «Коммит до обработки — стандартная практика» — data loss при crash
  • «DLQ не нужен если retry работает» — poison pill bypasses retry
  • «Thread.sleep в consumer loop — норма» — вызывает rebalancing

Связанные темы:

  • [[25. Что такое DLQ (Dead Letter Queue)]]
  • [[13. Как работает commit offset]]
  • [[15. Что такое rebalancing и когда он происходит]]
  • [[10. В чём разница между at-most-once, at-least-once и exactly-once]]