Питання 11 · Розділ 15

Як налаштувати exactly-once семантику

Для exactly-once потрібно налаштувати продюсера і консьюмера:

Мовні версії: English Russian Ukrainian

Рівень Junior

Базове налаштування

Для exactly-once потрібно налаштувати продюсера і консьюмера:

// Producer
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

// Consumer
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "false");

Що робить кожне налаштування?

enable.idempotence=true  → продюсер не відправить дублікати
acks=all                 → чекає підтвердження від всіх реплік
retries=INT_MAX          → нескінченні спроби при помилках
isolation.level=read_committed → читає тільки закоммічені
enable.auto.commit=false → ручний контроль коммітів

Простий приклад

// Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", "order-1", "created"));

// Consumer
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
    process(record);
}
consumer.commitSync();  // тільки після обробки

Рівень Middle

Transaction API для Exactly-Once

// Ініціалізація (один раз при старті)
producer.initTransactions();

try {
    producer.beginTransaction();

    // 1. Читання з Kafka
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // 2. Обробка
    for (var record : records) {
        Result result = process(record);
    }

    // 3. Запис результату
    producer.send(new ProducerRecord<>("output", result));

    // 4. Комміт обох дій атомарно
    producer.commitTransaction();

} catch (Exception e) {
    // Відкат при будь-якій помилці
    producer.abortTransaction();
}

Read-Process-Write Pattern

1. Read    → consumer.poll() з input topic
2. Process → бізнес-логіка
3. Write   → producer.send() в output topic
4. Commit  → producer.commitTransaction() атомарно

Все в одній транзакції!

Broker Configuration

# Брокер повинен підтримувати транзакції
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000

Типові помилки

  1. Не викликаний initTransactions():
    producer.beginTransaction() без initTransactions()
    → IllegalStateException
    
  2. Не abort transaction при помилці:
    Частково закоммічені дані → inconsistency
    
  3. Без transactional.id:
    props.put("transactional.id", "my-tx-id");
    // Без цього транзакції не працюють
    

Рівень Senior

Internal Implementation

Transaction Coordinator:

Окремий компонент на брокері
Управляє lifecycle транзакцій
Зберігає state в __transaction_state topic

Transaction State Machine:

Empty → Ongoing → PrepareCommit → CompleteCommit
                    ↓
                PrepareAbort → CompleteAbort

Transaction Log:

__transaction_state topic (внутрішній)
Зберігає:
- Transactional ID
- Producer ID і Epoch

PID (Producer ID) — унікальний ідентифікатор продюсера, призначається брокером.
Epoch — покоління продюсера, збільшується при перезапуску. Разом вони дозволяють
брокеру відрізняти нові повідомлення від дублікатів.

- State транзакції
- Список залучених партицій

Exactly-Once Semantics (EOS) v2

EOS v1 (Kafka 1.0): потребував окремий transaction coordinator для кожного consumer.
EOS v2 (Kafka 2.5): один producer координує для всіх consumers в транзакції —
менше overhead, простіше API.

Налаштування:
props.put("processing.guarantee", "exactly_once_v2");

Transaction Timeout

// Налаштування таймауту
props.put("transaction.timeout.ms", "60000");  // 60 секунд

// Якщо транзакція не завершена → автоматично abort
// Consumer з isolation.level=read_committed не бачить aborted messages

Наслідки таймауту:

Транзакція > timeout → abort
Повідомлення → не видні споживачам
Producer → повинен почати заново

Advanced Transaction Patterns

1. Multiple Output Topics:

producer.beginTransaction();

// Читання
ConsumerRecords<String, String> records = consumer.poll();

// Обробка і запис в кілька топиків
for (var record : records) {
    Result result = process(record);
    producer.send(new ProducerRecord<>("output-1", result));
    producer.send(new ProducerRecord<>("output-2", result));
}

// Атомарний комміт усіх записів
producer.commitTransaction();

2. Send Offsets to Transaction:

// Read from topic A → process → write to topic B → commit offsets
producer.beginTransaction();

// Send to output
producer.send(outputRecord);

// Commit consumer offsets в ту ж транзакцію
producer.sendOffsetsToTransaction(
    offsets,
    consumer.groupMetadata()
);

producer.commitTransaction();

Performance Optimization

Transaction overhead:
- Coordinator communication
- State persistence
- Commit/abort protocol

Optimizations:
1. Batch multiple sends in one transaction
2. Increase transaction.timeout.ms для довгих операцій
3. Use transactional.id для producer state recovery
4. Monitor transaction latency

Monitoring Transactions

Ключові метрики:

kafka.producer:transaction-commit-rate
kafka.producer:transaction-abort-rate
kafka.producer:transaction-commit-failed-rate
kafka.consumer:committed-offset

Alerts:

- Transaction abort rate > 1% → warning
- Transaction commit latency > 1s → warning
- Aborted transactions в логах → investigate

Failure Recovery

Producer restart:

// При перезапуску producer з тим самим transactional.id:
// 1. initTransactions() aborts усі незавершені транзакції
// 2. Producer може продовжити з того ж місця
props.put("transactional.id", "my-producer-1");
producer.initTransactions();  // aborts incomplete transactions

Consumer restart:

Consumer з isolation.level=read_committed:
- Бачить тільки закоммічені повідомлення
- Aborted messages автоматично приховуються
- Offsets не коммітяться для aborted транзакцій

Best Practices

✅ initTransactions() при старті
✅ abortTransaction() при будь-якій помилці
✅ Read-process-write в одній транзакції
✅ Налаштування transaction.timeout.ms
✅ Transactional.id для recovery
✅ Моніторинг commit/abort rate

❌ Без enable.idempotence
❌ Без abort при помилці
❌ Довгі транзакції (> timeout)
❌ Без transactional.id
❌ Ігнорування aborted transactions

Архітектурні рішення

  1. Transaction-based exactly-once працює тільки для Kafka-to-Kafka. Idempotent producer забезпечує exactly-once delivery в топик (deduplication при retry).
  2. Transaction timeout критичний — занадто короткий → abort, занадто довгий → resource waste
  3. Transactional.id для recovery — дозволяє продовжити після перезапуску
  4. Monitoring обов’язковий — aborted transactions можуть залишитися непоміченими

Резюме для Senior

  • Transaction API забезпечує atomic read-process-write
  • Exactly-once потребує спеціальної конфігурації брокерів
  • Transaction timeout — баланс між reliability і resource usage
  • Transactional.id критичний для recovery після restart
  • Monitoring commit/abort rate обов’язковий для production

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Producer: enable.idempotence=true, acks=all, transactional.id
  • Consumer: isolation.level=read_committed, enable.auto.commit=false
  • Transaction API: initTransactions()beginTransaction()commitTransaction() / abortTransaction()
  • Read-Process-Write pattern: poll → process → send → commitTransaction (атомарно)
  • Exactly-once працює тільки для Kafka-to-Kafka, не для зовнішніх систем
  • EOS v2 (Kafka 2.5): один producer координує для всіх consumers, менше overhead
  • Transaction timeout критичний: занадто короткий → abort, занадто довгий → resource waste
  • При restart: initTransactions() aborts усі незавершені транзакції

Часті уточнюючі запитання:

  • Що буде без transactional.id? — Транзакції не працюють, потрібен для recovery.
  • Що якщо транзакція перевищила timeout? — Автоматично abort, повідомлення не видні консьюмерам.
  • Як коммітити offsets в транзакції?sendOffsetsToTransaction(offsets, groupMetadata).
  • Чим EOS v2 відрізняється від v1? — Менше overhead, простіше API, один producer для всіх consumers.

Червоні прапорці (НЕ говорити):

  • «Exactly-once працює для Kafka → PostgreSQL» — тільки Kafka-to-Kafka
  • «Можна викликати beginTransaction без initTransactions» — IllegalStateException
  • «Транзакції безкоштовні по продуктивності» — 2-3x latency overhead
  • «Abort при помилці не обов’язковий» — частково закоммічені дані = inconsistency

Пов’язані теми:

  • [[10. У чому різниця між at-most-once, at-least-once і exactly-once]]
  • [[23. Що таке idempotent producer]]
  • [[9. Які гарантії доставки повідомлень надає Kafka]]
  • [[20. Що таке producer acknowledgment і які режими існують (acks=0,1,all)]]