Вопрос 25 · Раздел 15

Что такое DLQ (Dead Letter Queue) в Kafka

Представьте фабрику по упаковке подарков. Рабочий берёт подарок, пытается упаковать. Если подарк повреждён (сломан, разбит):

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

DLQ (Dead Letter Queue) — это специальный Kafka топик, куда отправляются сообщения, которые не удалось обработать после нескольких попыток. Это “изолятор” для проблемных данных.

Аналогия

Представьте фабрику по упаковке подарков. Рабочий берёт подарок, пытается упаковать. Если подарк повреждён (сломан, разбит):

  1. Попробовать ещё раз (retry)
  2. Если после 3 попыток всё ещё сломан — положить в корзину “брак” (DLQ)
  3. Продолжить упаковывать следующие подарки

Конвейер не останавливается, а бракованные подарки можно позже изучить и починить.

Пример с Spring Kafka

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory,
            KafkaTemplate<String, String> kafkaTemplate) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory);

        // Настройка DLQ
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate),
            new FixedBackOff(1000L, 3)  // 3 retry, 1 секунда между попытками
        ));

        return factory;
    }
}

Что попадает в DLQ

  • Corrupt данные (невалидный JSON, schema mismatch)
  • Невозможно обработать (БД недоступна > max retries)
  • Business rule violations (negative price, invalid email)
  • External service errors (API rate limited, timeout)

Когда использовать

  • Критичные данные — нельзя терять сообщения
  • Необходимость анализа — нужно понять почему обработка失敗
  • Compliance — аудит и traceability
  • Production — всегда иметь DLQ plan

🟡 Middle Level

Архитектура DLQ

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Producer  │────>│  Main Topic  │────>│  Consumer   │
└─────────────┘     └──────────────┘     └──────┬──────┘
                                                │
                                    ┌───────────┼───────────┐
                                    │           │           │
                               Success    Error → Retry   │
                                                  │        │
                                          Max retries?    │
                                             Yes    │     │
                                                  ▼     │
                                          ┌──────────────┴──┐
                                          │    DLQ Topic     │
                                          │  (orders-dlq)    │
                                          └──────────────────┘

DLQ Message Enrichment

Просто отправить оригинальное сообщение в DLQ — недостаточно. Нужно добавить контекст:

public class DLQEnricher {

    public ProducerRecord<String, String> enrichForDLQ(
            ConsumerRecord<String, String> original,
            Exception error) {

        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            original.topic() + "-dlq",  // orders → orders-dlq
            original.key(),
            original.value()
        );

        // Метаданные для debugging
        dlqRecord.headers().add("x-original-topic", original.topic().getBytes(StandardCharsets.UTF_8));
        dlqRecord.headers().add("x-original-partition", String.valueOf(original.partition()).getBytes());
        dlqRecord.headers().add("x-original-offset", String.valueOf(original.offset()).getBytes());
        dlqRecord.headers().add("x-original-timestamp", String.valueOf(original.timestamp()).getBytes());
        dlqRecord.headers().add("x-consumer-group", "order-service".getBytes());
        dlqRecord.headers().add("x-error-message", error.getMessage().getBytes(StandardCharsets.UTF_8));
        dlqRecord.headers().add("x-error-stacktrace", getStackTrace(error).getBytes(StandardCharsets.UTF_8));
        dlqRecord.headers().add("x-retry-count", "3".getBytes());
        dlqRecord.headers().add("x-dlq-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        return dlqRecord;
    }
}

DLQ Patterns

Паттерн Описание Когда использовать
Single DLQ Один DLQ топик для всех ошибок Простые системы
Per-topic DLQ topic-dlq для каждого topic Разные команды ответственности
Per-error DLQ topic-dlq-validation, topic-dlq-timeout Тонкий analysis
Multi-level DLQ DLQ → DLQ2 → Manual review High-compliance системы

DLQ Consumer — обработка DLQ

// DLQ Consumer — читает DLQ и решает что делать
@KafkaListener(topics = "orders-dlq", groupId = "dlq-handler")
public void handleDLQ(ConsumerRecord<String, String> record) {
    String originalTopic = getHeader(record, "x-original-topic");
    String errorMessage = getHeader(record, "x-error-message");
    String stacktrace = getHeader(record, "x-error-stacktrace");

    // Классификация ошибки
    if (errorMessage.contains("Timeout")) {
        // Временная ошибка — retry
        retryMessage(record);
    } else if (errorMessage.contains("ConstraintViolation")) {
        // Data issue — manual review
        sendToManualReview(record);
    } else if (errorMessage.contains("Schema")) {
        // Schema mismatch — fix producer
        alertProducerTeam(record);
    } else {
        // Unknown — manual review
        sendToManualReview(record);
    }
}

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
Без DLQ Poison pill блокирует consumer Consumer lag ∞, pipeline stall Добавить DLQ
DLQ без monitoring DLQ растёт до бесконечности Disk fill, missed errors Alert на DLQ size
DLQ без metadata Невозможно debug Часы на investigation Enrich headers
DLQ = оригинальный topic Infinite loop Message bounce между topics Отдельный DLQ topic
DLQ без retention DLQ занимает диск вечно Disk fill Настроить retention
Без DLQ consumer DLQ messages накапливаются Без автоматического recovery DLQ consumer для auto-retry

Когда НЕ использовать DLQ

  • Non-recoverable данные — логи, метрики которые можно потерять
  • Идемпотентные операции — retry безопасно повторять бесконечно
  • Тестовые environments — можно просто перезапустить
  • Очень low-volume (< 10 msg/min) — проще alert и manual fix
  • Для полностью идемпотентных операций с автоматическим retry DLQ может быть избыточен — достаточно alerting на retry exhaustion.

🔴 Senior Level

Глубокие внутренности

DLQ в Spring Kafka — DeadLetterPublishingRecoverer

// Internal implementation
public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {

    private final Function<ConsumerRecord<?, ?>, TopicPartition> destinationResolver;
    private final KafkaTemplate<Object, Object> template;

    @Override
    public void accept(ConsumerRecord<?, ?> record, Exception exception) {
        // 1. Determine DLQ topic
        TopicPartition dest = destinationResolver.apply(record);

        // 2. Create enriched record
        ProducerRecord<Object, Object> dlqRecord = createProducerRecord(record, exception);

        // 3. Send to DLQ (sync to guarantee delivery)
        template.send(dlqRecord).get();  // Blocking!

        // 4. Log
        log.error("Sent to DLQ: topic={}, partition={}, offset={}",
                  record.topic(), record.partition(), record.offset());
    }

    // Header enrichment
    private ProducerRecord<Object, Object> createProducerRecord(
            ConsumerRecord<?, ?> record, Exception exception) {
        // Копирует key, value
        // Добавляет headers:
        //   - KafkaHeaders.DLT_EXCEPTION_FQCN (exception class name)
        //   - KafkaHeaders.DLT_EXCEPTION_MESSAGE (exception message)
        //   - KafkaHeaders.DLT_ORIGINAL_TOPIC
        //   - KafkaHeaders.DLT_ORIGINAL_PARTITION
        //   - KafkaHeaders.DLT_ORIGINAL_OFFSET
        //   - KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
    }
}

Kafka Streams — Branching для DLQ

// Kafka Streams не имеет built-in DLQ
// Реализуем через branching

KStream<String, String> orders = builder.stream("orders");

// Split: valid vs invalid
KStream<String, String>[] branches = orders.branch(
    (key, value) -> isValid(value),   // valid stream
    (key, value) -> !isValid(value)   // invalid → DLQ
);

// Valid → process
branches[0].process(() -> orderProcessor);

// Invalid → DLQ topic
branches[1].mapValues(value -> enrichWithMetadata(value, "validation-error"))
            .to("orders-dlq");

Apache Camel / Kafka Connect DLQ

# Kafka Connect — built-in DLQ support
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true

# Connect automatically sends failed records to DLQ
# with context headers

Trade-offs

Аспект Без DLQ С DLQ
Data safety Низкая (poison pill data loss) Высокая
System availability Низкая (stall on error) Высокая
Complexity Низкая Средняя
Debugging time Часы-дни Минуты
Storage overhead 0% 0.1–1% (DLQ messages)
Operational overhead High (manual investigation) Low (automated)

Edge Cases

1. DLQ overflow — когда DLQ сам становится проблемой:

Сценарий:
  Producer bug → все messages invalid
  100K messages/min → DLQ
  DLQ topic: replication factor 3, retention 7 дней
  DLQ storage: 100K × 60 × 24 × 7 × 1KB = 10GB/day

Проблема: DLQ fills disk faster than main topic!

Решение:
  1. DLQ retention.ms = 86400000 (1 день, не 7)
  2. DLQ consumer для быстрого drain
  3. Alert on DLQ rate > threshold
  4. Circuit breaker: stop sending to DLQ, alert only

2. DLQ ordering и causality:

Сценарий:
  Partition 0: order-1 (success), order-2 (DLQ), order-3 (success)
  order-3 depends on order-2 (update после create)
  order-2 в DLQ → order-3 processed but fails (no order-2)

Проблема: Causality broken — downstream processing depends on DLQ'd message

Решение:
  1. Saga pattern: order-3 checks prerequisite
  2. Compensating action: если order-2 в DLQ → rollback order-1
  3. Partition by correlation key — related messages в same partition

3. DLQ replay — переотправка из DLQ:

// DLQ replay consumer
@KafkaListener(topics = "orders-dlq", groupId = "dlq-replayer")
public void replay(ConsumerRecord<String, String> record) {
    String errorMessage = getHeader(record, "x-error-message");
    String originalTopic = getHeader(record, "x-original-topic");

    if (errorMessage.contains("Timeout") && isDatabaseHealthy()) {
        // Retry — ошибка была временной
        originalProducer.send(new ProducerRecord<>(
            originalTopic, record.key(), record.value()
        ));
        log.info("Replayed from DLQ: {}", record.offset());
    } else {
        // Не retryable — manual review
        log.warn("Non-replayable DLQ message: {}", record.offset());
    }
}
// При повторной ошибке — отправить в manual-review queue, НЕ бесконечный retry.
// DLQ-for-DLQ pattern: после N retry → вторая DLQ → manual intervention.

4. DLQ и exactly-once semantics:

Проблема:
  Consumer читает message → error → send to DLQ → commit offset
  Consumer crash AFTER DLQ send, BEFORE commit
  При restart: message прочитан снова → снова в DLQ → duplicate in DLQ

Решение:
  1. Transactional DLQ producer:
     transaction {
       process(record);
       if (error) dlqProducer.send(dlqRecord);
       consumer.commitSync();
     }
  2. DLQ deduplication key = original topic + partition + offset
  3. Idempotent DLQ producer

5. Multi-tenant DLQ isolation:

Сценарий:
  SaaS platform, 100 tenants, shared Kafka topics
  Tenant A errors → DLQ
  Tenant B errors → тот же DLQ
  Tenant C видит DLQ messages от A и B (data leak!)

Решение:
  Per-tenant DLQ topics:
    orders-tenant-a-dlq
    orders-tenant-b-dlq
  Или DLQ с tenant-id header + ACLs

Производительность (production numbers)

DLQ Pattern Overhead Throughput impact Latency impact Storage
No DLQ 0% 0% 0ms 0
Sync DLQ send +10–30ms/msg -5% +20ms 0.1–1%
Async DLQ send +1ms -1% +2ms 0.1–1%
DLQ + enrichment +15–40ms/msg -8% +30ms 0.5–2%
Transactional DLQ +50ms -15% +50ms 0.1–1%

Production War Story

Ситуация: FinTech, обработка транзакций. DLQ существовал, но не мониторился.

Проблема: Обнаружили что DLQ topic вырос до 50M messages за 3 месяца. 50M транзакций по $10–500 каждая не были обработаны. Потенциальный financial exposure: $5M+.

Расследование:

  • DLQ consumer существовал, но упал 3 месяца назад
  • Никто не заметил (no alerting)
  • DLQ messages накапливались без обработки
  • Retention = -1 (бесконечный) → disk fill risk
  • Root cause: producer schema change, validation messages в DLQ
  • 50M messages = schema violation errors

Решение:

  1. Immediate: Alert on DLQ size > 1000 messages
  2. Short-term: DLQ consumer restart + replay for retriable errors
  3. Medium: Schema validation на producer side (prevention)
  4. Long: DLQ auto-classification + auto-retry for transient errors
  5. Operational: DLQ dashboard, on-call rotation для DLQ review

Post-mortem:

  • DLQ без monitoring = no DLQ (false sense of security)
  • DLQ retention policy обязательна
  • Schema registry для producer validation
  • Regular DLQ review process (daily)

Monitoring (JMX, Prometheus, Burrow)

JMX метрики (Spring Kafka):

spring.kafka.consumer.dlq.messages.sent.total
spring.kafka.consumer.dlq.messages.sent.rate
spring.kafka.consumer.retry.attempts.total
spring.kafka.consumer.retry.success.total

Prometheus + Grafana:

- record: kafka_dlq_message_rate
  expr: rate(kafka_topic_messages_total{topic=".*-dlq"}[5m])

- alert: KafkaDLQSizeExceeded
  expr: kafka_topic_partition_log_size{topic=".*-dlq"} > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "DLQ size exceeded 10,000 messages"

- alert: KafkaDLQMessageRateHigh
  expr: rate(kafka_dlq_message_total[5m]) > 10
  for: 2m
  labels:
    severity: critical
  annotations:
    summary: "More than 10 messages/minute going to DLQ"

- alert: KafkaDLQConsumerDown
  expr: kafka_consumer_lag{consumer_group="dlq-handler"} > 1000
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "DLQ consumer is falling behind"

- alert: KafkaDLQDiskUsageHigh
  expr: kafka_log_size_bytes{topic=".*-dlq"} / kafka_disk_capacity_bytes > 0.1
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "DLQ using > 10% disk capacity"

Burrow для DLQ consumer monitoring:

GET /v3/kafka/cluster/consumer/dlq-handler/status

{
  "status": "OK",
  "lag": {
    "orders-dlq": {
      "partition_0": {"current_lag": 0},
      "partition_1": {"current_lag": 500}  // DLQ consumer falling behind
    }
  }
}

Highload Best Practices

✅ Отдельный DLQ topic (не re-use original topic)
✅ DLQ enrichment с metadata (topic, partition, offset, exception)
✅ DLQ retention policy (1–7 дней, не infinite)
✅ Alert on DLQ messages (never silent DLQ)
✅ DLQ consumer для automated retry/replay
✅ DLQ dashboard (message count, error types, age)
✅ Schema validation на producer side (prevent poison pills)
✅ Per-error-type DLQ routing для automated classification
✅ Idempotent DLQ producer (prevent duplicate DLQ messages)
✅ Circuit breaker на DLQ send rate (prevent DLQ overflow)

❌ Без DLQ в production
❌ DLQ без monitoring/alerting
❌ DLQ без retention policy (disk fill)
❌ DLQ = оригинальный topic (infinite loop)
❌ Без DLQ consumer (messages accumulate forever)
❌ Без metadata в DLQ (impossible to debug)
❌ DLQ без enrichment (lost context)

Архитектурные решения

  1. DLQ — не просто топик, а процесс — producer → DLQ → consumer → replay → fix
  2. Enrichment critical — без metadata DLQ бесполезен для debugging
  3. Auto-classification — categorize DLQ errors для automated handling
  4. Prevention > cure — schema validation, producer-side checks
  5. DLQ retention — balance между debugging needs и disk usage

Резюме для Senior

  • DLQ — insurance policy против poison pill и data loss
  • DLQ без monitoring = no DLQ (false sense of security)
  • Enrichment headers — критичны для debugging и replay
  • DLQ consumer нужен для automated retry/replay capability
  • Per-error routing → automated classification → reduced manual review
  • Schema validation on producer — best way to reduce DLQ volume
  • DLQ retention policy обязательна для disk management
  • Transactional DLQ для exactly-once DLQ semantics
  • At highload: DLQ rate может стать проблемой — circuit breaker нужен
  • DLQ dashboard + on-call process — operational necessity

🎯 Шпаргалка для интервью

Обязательно знать:

  • DLQ — специальный топик для сообщений, не обработанных после max retries
  • DLQ enrichment: original topic, partition, offset, exception message, stacktrace, timestamp
  • Паттерны: Single DLQ, Per-topic DLQ (topic-dlq), Per-error DLQ, Multi-level DLQ
  • DLQ consumer: читает DLQ, классифицирует ошибки, retry или manual review
  • DLQ retention policy (1-7 дней) — без неё disk fill guaranteed
  • DLQ без monitoring = no DLQ (false sense of security)
  • Transactional DLQ producer предотвращает duplicate DLQ messages

Частые уточняющие вопросы:

  • Зачем enrich DLQ messages? — Без metadata невозможно debugging и replay.
  • Что если DLQ producer тоже упадёт? — Retry limit → manual-review queue.
  • Как replay из DLQ? — DLQ consumer читает, проверяет fix, resends в original topic.
  • DLQ = original topic? — NO! Infinite loop: message bounce между topics.

Красные флаги (НЕ говорить):

  • «DLQ без monitoring — нормально» — messages accumulate forever unnoticed
  • «DLQ без retention — хранить вечно» — disk fill guaranteed
  • «DLQ = оригинальный топик» — infinite retry loop
  • «DLQ не нужен для идемпотентных операций» — poison pill всё равно нужен

Связанные темы:

  • [[24. Как обрабатывать ошибки при чтении сообщений]]
  • [[26. Как мониторить lag консьюмера]]
  • [[11. Как настроить exactly-once семантику]]
  • [[10. В чём разница между at-most-once, at-least-once и exactly-once]]