How to configure exactly-once semantics
For exactly-once you need to configure both the producer and the consumer:
Junior Level
Basic configuration
For exactly-once you need to configure both the producer and the consumer:
// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");
What does each setting do?
enable.idempotence=true → producer won't send duplicates
acks=all → waits for confirmation from all replicas
retries=INT_MAX → infinite retry attempts on errors
isolation.level=read_committed → reads only committed messages
enable.auto.commit=false → manual commit control
Simple example
// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));
// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
}
consumer.commitSync(); // only after processing
Middle Level
Transaction API for Exactly-Once
// Initialization (once at startup)
producer.initTransactions();
try {
producer.beginTransaction();
// 1. Read from Kafka
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 2. Process
for (var record : records) {
Result result = process(record);
}
// 3. Write result
producer.send(new ProducerRecord<>("output", result));
// 4. Commit both actions atomically
producer.commitTransaction();
} catch (Exception e) {
// Rollback on any error
producer.abortTransaction();
}
Read-Process-Write Pattern
1. Read → consumer.poll() from input topic
2. Process → business logic
3. Write → producer.send() to output topic
4. Commit → producer.commitTransaction() atomically
All in one transaction!
Broker Configuration
# Broker must support transactions
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000
Common mistakes
- initTransactions() not called:
producer.beginTransaction() without initTransactions() → IllegalStateException - Not aborting transaction on error:
Partially committed data → inconsistency - Without transactional.id:
props.put("transactional.id", "my-tx-id"); // Without this, transactions don't work
Senior Level
Internal Implementation
Transaction Coordinator:
Separate component on the broker
Manages transaction lifecycle
Stores state in __transaction_state topic
Transaction State Machine:
Empty → Ongoing → PrepareCommit → CompleteCommit
↓
PrepareAbort → CompleteAbort
Transaction Log:
__transaction_state topic (internal)
Stores:
- Transactional ID
- Producer ID and Epoch
PID (Producer ID) — unique producer identifier, assigned by the broker.
Epoch — producer generation, increases on restart. Together they allow
the broker to distinguish new messages from duplicates.
- Transaction state
- List of affected partitions
Exactly-Once Semantics (EOS) v2
EOS v1 (Kafka 1.0): required a separate transaction coordinator for each consumer.
EOS v2 (Kafka 2.5): one producer coordinates for all consumers in the transaction —
less overhead, simpler API.
Configuration:
props.put("processing.guarantee", "exactly_once_v2");
Transaction Timeout
// Configure timeout
props.put("transaction.timeout.ms", "60000"); // 60 seconds
// If transaction not completed → automatically abort
// Consumer with isolation.level=read_committed doesn't see aborted messages
Timeout consequences:
Transaction > timeout → abort
Messages → not visible to consumers
Producer → must start over
Advanced Transaction Patterns
1. Multiple Output Topics:
producer.beginTransaction();
// Read
ConsumerRecords<String, String> records = consumer.poll();
// Process and write to multiple topics
for (var record : records) {
Result result = process(record);
producer.send(new ProducerRecord<>("output-1", result));
producer.send(new ProducerRecord<>("output-2", result));
}
// Atomic commit of all writes
producer.commitTransaction();
2. Send Offsets to Transaction:
// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();
// Send to output
producer.send(outputRecord);
// Commit consumer offsets in the same transaction
producer.sendOffsetsToTransaction(
offsets,
consumer.groupMetadata()
);
producer.commitTransaction();
Performance Optimization
Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol
Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms for long operations
3. Use transactional.id for producer state recovery
4. Monitor transaction latency
Monitoring Transactions
Key metrics:
kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset
Alerts:
- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions in logs → investigate
Failure Recovery
Producer restart:
// On restart, producer with same transactional.id:
// 1. initTransactions() aborts all incomplete transactions
// 2. Producer can continue from where it left off
props.put("transactional.id", "my-producer-1");
producer.initTransactions(); // aborts incomplete transactions
Consumer restart:
Consumer with isolation.level=read_committed:
- Sees only committed messages
- Aborted messages automatically hidden
- Offsets not committed for aborted transactions
Best Practices
✅ initTransactions() at startup
✅ abortTransaction() on any error
✅ Read-process-write in one transaction
✅ Configure transaction.timeout.ms
✅ Transactional.id for recovery
✅ Monitor commit/abort rate
❌ Without enable.idempotence
❌ Without abort on error
❌ Long transactions (> timeout)
❌ Without transactional.id
❌ Ignoring aborted transactions
Architectural decisions
- Transaction-based exactly-once works only for Kafka-to-Kafka. Idempotent producer provides exactly-once delivery to topic (deduplication on retry).
- Transaction timeout is critical — too short → abort, too long → resource waste
- Transactional.id for recovery — allows continuation after restart
- Monitoring is mandatory — aborted transactions can go unnoticed
Summary for Senior
- Transaction API provides atomic read-process-write
- Exactly-once requires special broker configuration
- Transaction timeout — balance between reliability and resource usage
- Transactional.id is critical for recovery after restart
- Monitoring commit/abort rate is mandatory for production
🎯 Interview Cheat Sheet
Must know:
- Producer:
enable.idempotence=true,acks=all,transactional.id - Consumer:
isolation.level=read_committed,enable.auto.commit=false - Transaction API:
initTransactions()→beginTransaction()→commitTransaction()/abortTransaction() - Read-Process-Write pattern: poll → process → send → commitTransaction (atomically)
- Exactly-once works only for Kafka-to-Kafka, not for external systems
- EOS v2 (Kafka 2.5): one producer coordinates for all consumers, less overhead
- Transaction timeout is critical: too short → abort, too long → resource waste
- On restart:
initTransactions()aborts all incomplete transactions
Common follow-up questions:
- What happens without transactional.id? — Transactions don’t work, needed for recovery.
- What if transaction exceeds timeout? — Automatic abort, messages not visible to consumers.
- How to commit offsets in a transaction? —
sendOffsetsToTransaction(offsets, groupMetadata). - How does EOS v2 differ from v1? — Less overhead, simpler API, one producer for all consumers.
Red flags (DO NOT say):
- “Exactly-once works for Kafka → PostgreSQL” — only Kafka-to-Kafka
- “You can call beginTransaction without initTransactions” — IllegalStateException
- “Transactions are free performance-wise” — 2-3x latency overhead
- “Abort on error is optional” — partially committed data = inconsistency
Related topics:
- [[10. What is the difference between at-most-once, at-least-once and exactly-once]]
- [[23. What is idempotent producer]]
- [[9. What message delivery guarantees does Kafka provide]]
- [[20. What is producer acknowledgment and what modes exist (acks=0,1,all)]]