Як налаштувати exactly-once семантику
Для exactly-once потрібно налаштувати продюсера і консьюмера:
Рівень Junior
Базове налаштування
Для exactly-once потрібно налаштувати продюсера і консьюмера:
// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");
Що робить кожне налаштування?
enable.idempotence=true → продюсер не відправить дублікати
acks=all → чекає підтвердження від всіх реплік
retries=INT_MAX → нескінченні спроби при помилках
isolation.level=read_committed → читає тільки закоммічені
enable.auto.commit=false → ручний контроль коммітів
Простий приклад
// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));
// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
}
consumer.commitSync(); // тільки після обробки
Рівень Middle
Transaction API для Exactly-Once
// Ініціалізація (один раз при старті)
producer.initTransactions();
try {
producer.beginTransaction();
// 1. Читання з Kafka
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 2. Обробка
for (var record : records) {
Result result = process(record);
}
// 3. Запис результату
producer.send(new ProducerRecord<>("output", result));
// 4. Комміт обох дій атомарно
producer.commitTransaction();
} catch (Exception e) {
// Відкат при будь-якій помилці
producer.abortTransaction();
}
Read-Process-Write Pattern
1. Read → consumer.poll() з input topic
2. Process → бізнес-логіка
3. Write → producer.send() в output topic
4. Commit → producer.commitTransaction() атомарно
Все в одній транзакції!
Broker Configuration
# Брокер повинен підтримувати транзакції
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000
Типові помилки
- Не викликаний initTransactions():
producer.beginTransaction() без initTransactions() → IllegalStateException - Не abort transaction при помилці:
Частково закоммічені дані → inconsistency - Без transactional.id:
props.put("transactional.id", "my-tx-id"); // Без цього транзакції не працюють
Рівень Senior
Internal Implementation
Transaction Coordinator:
Окремий компонент на брокері
Управляє lifecycle транзакцій
Зберігає state в __transaction_state topic
Transaction State Machine:
Empty → Ongoing → PrepareCommit → CompleteCommit
↓
PrepareAbort → CompleteAbort
Transaction Log:
__transaction_state topic (внутрішній)
Зберігає:
- Transactional ID
- Producer ID і Epoch
PID (Producer ID) — унікальний ідентифікатор продюсера, призначається брокером.
Epoch — покоління продюсера, збільшується при перезапуску. Разом вони дозволяють
брокеру відрізняти нові повідомлення від дублікатів.
- State транзакції
- Список залучених партицій
Exactly-Once Semantics (EOS) v2
EOS v1 (Kafka 1.0): потребував окремий transaction coordinator для кожного consumer.
EOS v2 (Kafka 2.5): один producer координує для всіх consumers в транзакції —
менше overhead, простіше API.
Налаштування:
props.put("processing.guarantee", "exactly_once_v2");
Transaction Timeout
// Налаштування таймауту
props.put("transaction.timeout.ms", "60000"); // 60 секунд
// Якщо транзакція не завершена → автоматично abort
// Consumer з isolation.level=read_committed не бачить aborted messages
Наслідки таймауту:
Транзакція > timeout → abort
Повідомлення → не видні споживачам
Producer → повинен почати заново
Advanced Transaction Patterns
1. Multiple Output Topics:
producer.beginTransaction();
// Читання
ConsumerRecords<String, String> records = consumer.poll();
// Обробка і запис в кілька топиків
for (var record : records) {
Result result = process(record);
producer.send(new ProducerRecord<>("output-1", result));
producer.send(new ProducerRecord<>("output-2", result));
}
// Атомарний комміт усіх записів
producer.commitTransaction();
2. Send Offsets to Transaction:
// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();
// Send to output
producer.send(outputRecord);
// Commit consumer offsets в ту ж транзакцію
producer.sendOffsetsToTransaction(
offsets,
consumer.groupMetadata()
);
producer.commitTransaction();
Performance Optimization
Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol
Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms для довгих операцій
3. Use transactional.id для producer state recovery
4. Monitor transaction latency
Monitoring Transactions
Ключові метрики:
kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset
Alerts:
- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions в логах → investigate
Failure Recovery
Producer restart:
// При перезапуску producer з тим самим transactional.id:
// 1. initTransactions() aborts усі незавершені транзакції
// 2. Producer може продовжити з того ж місця
props.put("transactional.id", "my-producer-1");
producer.initTransactions(); // aborts incomplete transactions
Consumer restart:
Consumer з isolation.level=read_committed:
- Бачить тільки закоммічені повідомлення
- Aborted messages автоматично приховуються
- Offsets не коммітяться для aborted транзакцій
Best Practices
✅ initTransactions() при старті
✅ abortTransaction() при будь-якій помилці
✅ Read-process-write в одній транзакції
✅ Налаштування transaction.timeout.ms
✅ Transactional.id для recovery
✅ Моніторинг commit/abort rate
❌ Без enable.idempotence
❌ Без abort при помилці
❌ Довгі транзакції (> timeout)
❌ Без transactional.id
❌ Ігнорування aborted transactions
Архітектурні рішення
- Transaction-based exactly-once працює тільки для Kafka-to-Kafka. Idempotent producer забезпечує exactly-once delivery в топик (deduplication при retry).
- Transaction timeout критичний — занадто короткий → abort, занадто довгий → resource waste
- Transactional.id для recovery — дозволяє продовжити після перезапуску
- Monitoring обов’язковий — aborted transactions можуть залишитися непоміченими
Резюме для Senior
- Transaction API забезпечує atomic read-process-write
- Exactly-once потребує спеціальної конфігурації брокерів
- Transaction timeout — баланс між reliability і resource usage
- Transactional.id критичний для recovery після restart
- Monitoring commit/abort rate обов’язковий для production
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Producer:
enable.idempotence=true,acks=all,transactional.id - Consumer:
isolation.level=read_committed,enable.auto.commit=false - Transaction API:
initTransactions()→beginTransaction()→commitTransaction()/abortTransaction() - Read-Process-Write pattern: poll → process → send → commitTransaction (атомарно)
- Exactly-once працює тільки для Kafka-to-Kafka, не для зовнішніх систем
- EOS v2 (Kafka 2.5): один producer координує для всіх consumers, менше overhead
- Transaction timeout критичний: занадто короткий → abort, занадто довгий → resource waste
- При restart:
initTransactions()aborts усі незавершені транзакції
Часті уточнюючі запитання:
- Що буде без transactional.id? — Транзакції не працюють, потрібен для recovery.
- Що якщо транзакція перевищила timeout? — Автоматично abort, повідомлення не видні консьюмерам.
- Як коммітити offsets в транзакції? —
sendOffsetsToTransaction(offsets, groupMetadata). - Чим EOS v2 відрізняється від v1? — Менше overhead, простіше API, один producer для всіх consumers.
Червоні прапорці (НЕ говорити):
- «Exactly-once працює для Kafka → PostgreSQL» — тільки Kafka-to-Kafka
- «Можна викликати beginTransaction без initTransactions» — IllegalStateException
- «Транзакції безкоштовні по продуктивності» — 2-3x latency overhead
- «Abort при помилці не обов’язковий» — частково закоммічені дані = inconsistency
Пов’язані теми:
- [[10. У чому різниця між at-most-once, at-least-once і exactly-once]]
- [[23. Що таке idempotent producer]]
- [[9. Які гарантії доставки повідомлень надає Kafka]]
- [[20. Що таке producer acknowledgment і які режими існують (acks=0,1,all)]]