Как настроить exactly-once семантику
Для exactly-once нужно настроить продюсера и консьюмера:
Уровень Junior
Базовая настройка
Для exactly-once нужно настроить продюсера и консьюмера:
// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");
Что делает каждая настройка?
enable.idempotence=true → продюсер не отправит дубликаты
acks=all → ждёт подтверждения от всех реплик
retries=INT_MAX → бесконечные попытки при ошибках
isolation.level=read_committed → читает только закоммиченные
enable.auto.commit=false → ручной контроль коммитов
Простой пример
// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));
// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
}
consumer.commitSync(); // только после обработки
Уровень Middle
Transaction API для Exactly-Once
// Инициализация (один раз при старте)
producer.initTransactions();
try {
producer.beginTransaction();
// 1. Чтение из Kafka
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 2. Обработка
for (var record : records) {
Result result = process(record);
}
// 3. Запись результата
producer.send(new ProducerRecord<>("output", result));
// 4. Коммит обоих действий атомарно
producer.commitTransaction();
} catch (Exception e) {
// Откат при любой ошибке
producer.abortTransaction();
}
Read-Process-Write Pattern
1. Read → consumer.poll() из input topic
2. Process → бизнес-логика
3. Write → producer.send() в output topic
4. Commit → producer.commitTransaction() атомарно
Всё в одной транзакции!
Broker Configuration
# Брокер должен поддерживать транзакции
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000
Типичные ошибки
- Не вызван initTransactions():
producer.beginTransaction() без initTransactions() → IllegalStateException - Не abort transaction при ошибке:
Частично закоммиченные данные → inconsistency - Без transactional.id:
props.put("transactional.id", "my-tx-id"); // Без этого транзакции не работают
Уровень Senior
Internal Implementation
Transaction Coordinator:
Отдельный компонент на брокере
Управляет lifecycle транзакций
Хранит state в __transaction_state topic
Transaction State Machine:
Empty → Ongoing → PrepareCommit → CompleteCommit
↓
PrepareAbort → CompleteAbort
Transaction Log:
__transaction_state topic (внутренний)
Хранит:
- Transactional ID
- Producer ID и Epoch
PID (Producer ID) — уникальный идентификатор продюсера, назначается брокером.
Epoch — поколение продюсера, увеличивается при рестарте. Вместе они позволяют
брокеру отличать новые сообщения от дубликатов.
- State транзакции
- Список затронутых партиций
Exactly-Once Semantics (EOS) v2
EOS v1 (Kafka 1.0): требовал отдельный transaction coordinator для каждого consumer.
EOS v2 (Kafka 2.5): один producer координирует для всех consumers в транзакции —
меньше overhead, проще API.
Настройка:
props.put("processing.guarantee", "exactly_once_v2");
Transaction Timeout
// Настройка таймаута
props.put("transaction.timeout.ms", "60000"); // 60 секунд
// Если транзакция не завершена → автоматически abort
// Consumer с isolation.level=read_committed не видит aborted messages
Последствия таймаута:
Транзакция > timeout → abort
Сообщения → не видны потребителям
Producer → должен начать заново
Advanced Transaction Patterns
1. Multiple Output Topics:
producer.beginTransaction();
// Чтение
ConsumerRecords<String, String> records = consumer.poll();
// Обработка и запись в несколько топиков
for (var record : records) {
Result result = process(record);
producer.send(new ProducerRecord<>("output-1", result));
producer.send(new ProducerRecord<>("output-2", result));
}
// Атомарный коммит всех записей
producer.commitTransaction();
2. Send Offsets to Transaction:
// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();
// Send to output
producer.send(outputRecord);
// Commit consumer offsets в ту же транзакцию
producer.sendOffsetsToTransaction(
offsets,
consumer.groupMetadata()
);
producer.commitTransaction();
Performance Optimization
Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol
Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms для долгих операций
3. Use transactional.id для producer state recovery
4. Monitor transaction latency
Monitoring Transactions
Ключевые метрики:
kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset
Alerts:
- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions в логах → investigate
Failure Recovery
Producer restart:
// При рестарте producer с тем же transactional.id:
// 1. initTransactions() aborts все незавершённые транзакции
// 2. Producer可以继续 с того же места
props.put("transactional.id", "my-producer-1");
producer.initTransactions(); // aborts incomplete transactions
Consumer restart:
Consumer с isolation.level=read_committed:
- Видит только закоммиченные сообщения
- Aborted messages автоматически скрываются
- Offsets не коммитятся для aborted транзакций
Best Practices
✅ initTransactions() при старте
✅ abortTransaction() при любой ошибке
✅ Read-process-write в одной транзакции
✅ Настройка transaction.timeout.ms
✅ Transactional.id для recovery
✅ Мониторинг commit/abort rate
❌ Без enable.idempotence
❌ Без abort при ошибке
❌ Долгие транзакции (> timeout)
❌ Без transactional.id
❌ Игнорирование aborted transactions
Архитектурные решения
- Transaction-based exactly-once работает только для Kafka-to-Kafka. Idempotent producer обеспечивает exactly-once delivery в топик (deduplication при retry).
- Transaction timeout критичен — слишком короткий → abort, слишком длинный → resource waste
- Transactional.id для recovery — позволяет продолжить после рестарта
- Monitoring обязателен — aborted transactions могут остаться незамеченными
Резюме для Senior
- Transaction API обеспечивает atomic read-process-write
- Exactly-once требует специальной конфигурации брокеров
- Transaction timeout — баланс между reliability и resource usage
- Transactional.id критичен для recovery после restart
- Monitoring commit/abort rate обязателен для production
🎯 Шпаргалка для интервью
Обязательно знать:
- Producer:
enable.idempotence=true,acks=all,transactional.id - Consumer:
isolation.level=read_committed,enable.auto.commit=false - Transaction API:
initTransactions()→beginTransaction()→commitTransaction()/abortTransaction() - Read-Process-Write pattern: poll → process → send → commitTransaction (атомарно)
- Exactly-once работает только для Kafka-to-Kafka, не для внешних систем
- EOS v2 (Kafka 2.5): один producer координирует для всех consumers, меньше overhead
- Transaction timeout критичен: слишком короткий → abort, слишком длинный → resource waste
- При restart:
initTransactions()aborts все незавершённые транзакции
Частые уточняющие вопросы:
- Что будет без transactional.id? — Транзакции не работают, нужен для recovery.
- Что если транзакция превысила timeout? — Автоматически abort, сообщения не видны консьюмерам.
- Как коммитить offsets в транзакции? —
sendOffsetsToTransaction(offsets, groupMetadata). - Чем EOS v2 отличается от v1? — Mеньше overhead, проще API, один producer для всех consumers.
Красные флаги (НЕ говорить):
- «Exactly-once работает для Kafka → PostgreSQL» — только Kafka-to-Kafka
- «Можно вызвать beginTransaction без initTransactions» — IllegalStateException
- «Транзакции бесплатны по производительности» — 2-3x latency overhead
- «Abort при ошибке не обязателен» — частично закоммиченные данные = inconsistency
Связанные темы:
- [[10. В чём разница между at-most-once, at-least-once и exactly-once]]
- [[23. Что такое idempotent producer]]
- [[9. Какие гарантии доставки сообщений предоставляет Kafka]]
- [[20. Что такое producer acknowledgment и какие режимы существуют (acks=0,1,all)]]