Вопрос 11 · Раздел 15

Как настроить exactly-once семантику

Для exactly-once нужно настроить продюсера и консьюмера:

Версии по языкам: English Russian Ukrainian

Уровень Junior

Базовая настройка

Для exactly-once нужно настроить продюсера и консьюмера:

// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");

Что делает каждая настройка?

enable.idempotence=true  → продюсер не отправит дубликаты
acks=all                 → ждёт подтверждения от всех реплик
retries=INT_MAX          → бесконечные попытки при ошибках
isolation.level=read_committed → читает только закоммиченные
enable.auto.commit=false → ручной контроль коммитов

Простой пример

// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));

// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
    process(record);
}
consumer.commitSync();  // только после обработки

Уровень Middle

Transaction API для Exactly-Once

// Инициализация (один раз при старте)
producer.initTransactions();

try {
    producer.beginTransaction();

    // 1. Чтение из Kafka
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // 2. Обработка
    for (var record : records) {
        Result result = process(record);
    }

    // 3. Запись результата
    producer.send(new ProducerRecord<>("output", result));

    // 4. Коммит обоих действий атомарно
    producer.commitTransaction();

} catch (Exception e) {
    // Откат при любой ошибке
    producer.abortTransaction();
}

Read-Process-Write Pattern

1. Read    → consumer.poll() из input topic
2. Process → бизнес-логика
3. Write   → producer.send() в output topic
4. Commit  → producer.commitTransaction() атомарно

Всё в одной транзакции!

Broker Configuration

# Брокер должен поддерживать транзакции
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000

Типичные ошибки

  1. Не вызван initTransactions():
    producer.beginTransaction() без initTransactions()
    → IllegalStateException
    
  2. Не abort transaction при ошибке:
    Частично закоммиченные данные → inconsistency
    
  3. Без transactional.id:
    props.put("transactional.id", "my-tx-id");
    // Без этого транзакции не работают
    

Уровень Senior

Internal Implementation

Transaction Coordinator:

Отдельный компонент на брокере
Управляет lifecycle транзакций
Хранит state в __transaction_state topic

Transaction State Machine:

Empty → Ongoing → PrepareCommit → CompleteCommit
                    ↓
                PrepareAbort → CompleteAbort

Transaction Log:

__transaction_state topic (внутренний)
Хранит:
- Transactional ID
- Producer ID и Epoch

PID (Producer ID) — уникальный идентификатор продюсера, назначается брокером.
Epoch — поколение продюсера, увеличивается при рестарте. Вместе они позволяют
брокеру отличать новые сообщения от дубликатов.

- State транзакции
- Список затронутых партиций

Exactly-Once Semantics (EOS) v2

EOS v1 (Kafka 1.0): требовал отдельный transaction coordinator для каждого consumer.
EOS v2 (Kafka 2.5): один producer координирует для всех consumers в транзакции —
меньше overhead, проще API.

Настройка:
props.put("processing.guarantee", "exactly_once_v2");

Transaction Timeout

// Настройка таймаута
props.put("transaction.timeout.ms", "60000");  // 60 секунд

// Если транзакция не завершена → автоматически abort
// Consumer с isolation.level=read_committed не видит aborted messages

Последствия таймаута:

Транзакция > timeout → abort
Сообщения → не видны потребителям
Producer → должен начать заново

Advanced Transaction Patterns

1. Multiple Output Topics:

producer.beginTransaction();

// Чтение
ConsumerRecords<String, String> records = consumer.poll();

// Обработка и запись в несколько топиков
for (var record : records) {
    Result result = process(record);
    producer.send(new ProducerRecord<>("output-1", result));
    producer.send(new ProducerRecord<>("output-2", result));
}

// Атомарный коммит всех записей
producer.commitTransaction();

2. Send Offsets to Transaction:

// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();

// Send to output
producer.send(outputRecord);

// Commit consumer offsets в ту же транзакцию
producer.sendOffsetsToTransaction(
    offsets,
    consumer.groupMetadata()
);

producer.commitTransaction();

Performance Optimization

Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol

Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms для долгих операций
3. Use transactional.id для producer state recovery
4. Monitor transaction latency

Monitoring Transactions

Ключевые метрики:

kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset

Alerts:

- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions в логах → investigate

Failure Recovery

Producer restart:

// При рестарте producer с тем же transactional.id:
// 1. initTransactions() aborts все незавершённые транзакции
// 2. Producer可以继续 с того же места
props.put("transactional.id", "my-producer-1");
producer.initTransactions();  // aborts incomplete transactions

Consumer restart:

Consumer с isolation.level=read_committed:
- Видит только закоммиченные сообщения
- Aborted messages автоматически скрываются
- Offsets не коммитятся для aborted транзакций

Best Practices

✅ initTransactions() при старте
✅ abortTransaction() при любой ошибке
✅ Read-process-write в одной транзакции
✅ Настройка transaction.timeout.ms
✅ Transactional.id для recovery
✅ Мониторинг commit/abort rate

❌ Без enable.idempotence
❌ Без abort при ошибке
❌ Долгие транзакции (> timeout)
❌ Без transactional.id
❌ Игнорирование aborted transactions

Архитектурные решения

  1. Transaction-based exactly-once работает только для Kafka-to-Kafka. Idempotent producer обеспечивает exactly-once delivery в топик (deduplication при retry).
  2. Transaction timeout критичен — слишком короткий → abort, слишком длинный → resource waste
  3. Transactional.id для recovery — позволяет продолжить после рестарта
  4. Monitoring обязателен — aborted transactions могут остаться незамеченными

Резюме для Senior

  • Transaction API обеспечивает atomic read-process-write
  • Exactly-once требует специальной конфигурации брокеров
  • Transaction timeout — баланс между reliability и resource usage
  • Transactional.id критичен для recovery после restart
  • Monitoring commit/abort rate обязателен для production

🎯 Шпаргалка для интервью

Обязательно знать:

  • Producer: enable.idempotence=true, acks=all, transactional.id
  • Consumer: isolation.level=read_committed, enable.auto.commit=false
  • Transaction API: initTransactions()beginTransaction()commitTransaction() / abortTransaction()
  • Read-Process-Write pattern: poll → process → send → commitTransaction (атомарно)
  • Exactly-once работает только для Kafka-to-Kafka, не для внешних систем
  • EOS v2 (Kafka 2.5): один producer координирует для всех consumers, меньше overhead
  • Transaction timeout критичен: слишком короткий → abort, слишком длинный → resource waste
  • При restart: initTransactions() aborts все незавершённые транзакции

Частые уточняющие вопросы:

  • Что будет без transactional.id? — Транзакции не работают, нужен для recovery.
  • Что если транзакция превысила timeout? — Автоматически abort, сообщения не видны консьюмерам.
  • Как коммитить offsets в транзакции?sendOffsetsToTransaction(offsets, groupMetadata).
  • Чем EOS v2 отличается от v1? — Mеньше overhead, проще API, один producer для всех consumers.

Красные флаги (НЕ говорить):

  • «Exactly-once работает для Kafka → PostgreSQL» — только Kafka-to-Kafka
  • «Можно вызвать beginTransaction без initTransactions» — IllegalStateException
  • «Транзакции бесплатны по производительности» — 2-3x latency overhead
  • «Abort при ошибке не обязателен» — частично закоммиченные данные = inconsistency

Связанные темы:

  • [[10. В чём разница между at-most-once, at-least-once и exactly-once]]
  • [[23. Что такое idempotent producer]]
  • [[9. Какие гарантии доставки сообщений предоставляет Kafka]]
  • [[20. Что такое producer acknowledgment и какие режимы существуют (acks=0,1,all)]]