Question 11 · Section 15

How to configure exactly-once semantics

For exactly-once you need to configure both the producer and the consumer:

Language versions: English Russian Ukrainian

Junior Level

Basic configuration

For exactly-once you need to configure both the producer and the consumer:

// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");

What does each setting do?

enable.idempotence=true  → producer won't send duplicates
acks=all                 → waits for confirmation from all replicas
retries=INT_MAX          → infinite retry attempts on errors
isolation.level=read_committed → reads only committed messages
enable.auto.commit=false → manual commit control

Simple example

// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));

// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
    process(record);
}
consumer.commitSync();  // only after processing

Middle Level

Transaction API for Exactly-Once

// Initialization (once at startup)
producer.initTransactions();

try {
    producer.beginTransaction();

    // 1. Read from Kafka
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // 2. Process
    for (var record : records) {
        Result result = process(record);
    }

    // 3. Write result
    producer.send(new ProducerRecord<>("output", result));

    // 4. Commit both actions atomically
    producer.commitTransaction();

} catch (Exception e) {
    // Rollback on any error
    producer.abortTransaction();
}

Read-Process-Write Pattern

1. Read    → consumer.poll() from input topic
2. Process → business logic
3. Write   → producer.send() to output topic
4. Commit  → producer.commitTransaction() atomically

All in one transaction!

Broker Configuration

# Broker must support transactions
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000

Common mistakes

  1. initTransactions() not called:
    producer.beginTransaction() without initTransactions()
    → IllegalStateException
    
  2. Not aborting transaction on error:
    Partially committed data → inconsistency
    
  3. Without transactional.id:
    props.put("transactional.id", "my-tx-id");
    // Without this, transactions don't work
    

Senior Level

Internal Implementation

Transaction Coordinator:

Separate component on the broker
Manages transaction lifecycle
Stores state in __transaction_state topic

Transaction State Machine:

Empty → Ongoing → PrepareCommit → CompleteCommit
                    ↓
                PrepareAbort → CompleteAbort

Transaction Log:

__transaction_state topic (internal)
Stores:
- Transactional ID
- Producer ID and Epoch

PID (Producer ID) — unique producer identifier, assigned by the broker.
Epoch — producer generation, increases on restart. Together they allow
the broker to distinguish new messages from duplicates.

- Transaction state
- List of affected partitions

Exactly-Once Semantics (EOS) v2

EOS v1 (Kafka 1.0): required a separate transaction coordinator for each consumer.
EOS v2 (Kafka 2.5): one producer coordinates for all consumers in the transaction —
less overhead, simpler API.

Configuration:
props.put("processing.guarantee", "exactly_once_v2");

Transaction Timeout

// Configure timeout
props.put("transaction.timeout.ms", "60000");  // 60 seconds

// If transaction not completed → automatically abort
// Consumer with isolation.level=read_committed doesn't see aborted messages

Timeout consequences:

Transaction > timeout → abort
Messages → not visible to consumers
Producer → must start over

Advanced Transaction Patterns

1. Multiple Output Topics:

producer.beginTransaction();

// Read
ConsumerRecords<String, String> records = consumer.poll();

// Process and write to multiple topics
for (var record : records) {
    Result result = process(record);
    producer.send(new ProducerRecord<>("output-1", result));
    producer.send(new ProducerRecord<>("output-2", result));
}

// Atomic commit of all writes
producer.commitTransaction();

2. Send Offsets to Transaction:

// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();

// Send to output
producer.send(outputRecord);

// Commit consumer offsets in the same transaction
producer.sendOffsetsToTransaction(
    offsets,
    consumer.groupMetadata()
);

producer.commitTransaction();

Performance Optimization

Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol

Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms for long operations
3. Use transactional.id for producer state recovery
4. Monitor transaction latency

Monitoring Transactions

Key metrics:

kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset

Alerts:

- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions in logs → investigate

Failure Recovery

Producer restart:

// On restart, producer with same transactional.id:
// 1. initTransactions() aborts all incomplete transactions
// 2. Producer can continue from where it left off
props.put("transactional.id", "my-producer-1");
producer.initTransactions();  // aborts incomplete transactions

Consumer restart:

Consumer with isolation.level=read_committed:
- Sees only committed messages
- Aborted messages automatically hidden
- Offsets not committed for aborted transactions

Best Practices

✅ initTransactions() at startup
✅ abortTransaction() on any error
✅ Read-process-write in one transaction
✅ Configure transaction.timeout.ms
✅ Transactional.id for recovery
✅ Monitor commit/abort rate

❌ Without enable.idempotence
❌ Without abort on error
❌ Long transactions (> timeout)
❌ Without transactional.id
❌ Ignoring aborted transactions

Architectural decisions

  1. Transaction-based exactly-once works only for Kafka-to-Kafka. Idempotent producer provides exactly-once delivery to topic (deduplication on retry).
  2. Transaction timeout is critical — too short → abort, too long → resource waste
  3. Transactional.id for recovery — allows continuation after restart
  4. Monitoring is mandatory — aborted transactions can go unnoticed

Summary for Senior

  • Transaction API provides atomic read-process-write
  • Exactly-once requires special broker configuration
  • Transaction timeout — balance between reliability and resource usage
  • Transactional.id is critical for recovery after restart
  • Monitoring commit/abort rate is mandatory for production

🎯 Interview Cheat Sheet

Must know:

  • Producer: enable.idempotence=true, acks=all, transactional.id
  • Consumer: isolation.level=read_committed, enable.auto.commit=false
  • Transaction API: initTransactions()beginTransaction()commitTransaction() / abortTransaction()
  • Read-Process-Write pattern: poll → process → send → commitTransaction (atomically)
  • Exactly-once works only for Kafka-to-Kafka, not for external systems
  • EOS v2 (Kafka 2.5): one producer coordinates for all consumers, less overhead
  • Transaction timeout is critical: too short → abort, too long → resource waste
  • On restart: initTransactions() aborts all incomplete transactions

Common follow-up questions:

  • What happens without transactional.id? — Transactions don’t work, needed for recovery.
  • What if transaction exceeds timeout? — Automatic abort, messages not visible to consumers.
  • How to commit offsets in a transaction?sendOffsetsToTransaction(offsets, groupMetadata).
  • How does EOS v2 differ from v1? — Less overhead, simpler API, one producer for all consumers.

Red flags (DO NOT say):

  • “Exactly-once works for Kafka → PostgreSQL” — only Kafka-to-Kafka
  • “You can call beginTransaction without initTransactions” — IllegalStateException
  • “Transactions are free performance-wise” — 2-3x latency overhead
  • “Abort on error is optional” — partially committed data = inconsistency

Related topics:

  • [[10. What is the difference between at-most-once, at-least-once and exactly-once]]
  • [[23. What is idempotent producer]]
  • [[9. What message delivery guarantees does Kafka provide]]
  • [[20. What is producer acknowledgment and what modes exist (acks=0,1,all)]]