Питання 13 · Розділ 15

Як працює commit offset

4. Idempotent processing — захист від дублікатів при retry

Мовні версії: English Russian Ukrainian

Рівень Junior

Визначення

Commit offset — це збереження номера останнього обробленого повідомлення в Kafka.

Куди зберігається: у спеціальний внутрішній топик Kafka (__consumer_offsets). При перезапуску консьюмер читає звідти свою останню позицію і продовжує з неї. Без комміту — починає спочатку.

// Автоматичний комміт
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");  // кожні 5 секунд

// Ручний комміт
props.put("enable.auto.commit", "false");
consumer.commitSync();  // після обробки

Навіщо коммітити?

Без комміту:
  Консьюмер впав → при перезапуску читає все спочатку
  → Дублікати, wasted processing

З коммітом:
  Консьюмер впав → при перезапуску продовжує з останнього комміту
  → Мінімум дублікатів

Приклад

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (var record : records) {
        process(record);  // обробка
    }
    consumer.commitSync();  // комміт після обробки
}

Рівень Middle

Sync vs Async Commit

Sync Commit (синхронний):

// Чекає підтвердження від брокера
consumer.commitSync();
// Sync commit: latency 5-50ms на комміт. При 100 коммітах/сек — 500-5000ms overhead.
// Async commit: <5ms (не блокує).
// При помилці — викидає виключення

Async Commit (асинхронний):

// Не чекає підтвердження
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});
// Швидше, але можлива втрата при збої

Коли що використовувати?

Тип комміту Коли використовувати
commitSync() Критичні дані, низький throughput
commitAsync() High-throughput, допустимі рідкі дублікати
auto.commit Прототипи, не-production

Batch Commit

// Комміт кожні N повідомлень
int count = 0;
for (var record : records) {
    process(record);
    if (++count % 100 == 0) {
        consumer.commitAsync();
    }
}

Committable Offsets

// Комміт конкретних offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(1000));
offsets.put(new TopicPartition("orders", 1), new OffsetAndMetadata(2000));
consumer.commitSync(offsets);

Типові помилки

  1. Комміт до обробки:
    consumer.commitSync();  // ❌ спочатку комміт
    process(records);       // потім обробка
    // Якщо впав → дані втрачені
    
  2. Комміт в циклі обробки:
    for (var record : records) {
     process(record);
     consumer.commitSync();  // ❌ занадто часто
    }
    

Рівень Senior

Internal Implementation

Commit Process:

1. Consumer → CommitOffset request → Broker
2. Broker → записує в __consumer_offsets topic
3. Broker → підтверджує комміт
4. Consumer → отримує підтвердження

__consumer_offsets topic:

- Внутрішній топик Kafka
- Компактується (log compaction)
- Зберігає offsets для всіх consumer groups
- Key: group.id + topic + partition
- Value: offset + metadata + timestamp

Offset Commit Latency

Sync commit latency:
  Зазвичай: 5-20ms
  При проблемах: >100ms
  Impact: блокує обробку

Async commit latency:
  Зазвичай: <5ms (не блокує)
  Impact: мінімальний

Failure Scenarios

1. Commit failure:

Consumer → commit → network error → broker не отримав
При перезапуску:
  - Committed offset старий
  - Повідомлення обробляться знову
  - Idempotent processing вирішує проблему

2. Broker failure під час commit:

Consumer → commit → broker записав → broker впав до підтвердження
Consumer → retry commit → новий leader
Commit може бути втрачений або повторений

Offset Management Patterns

1. Per-Record Commit:

// Комміт кожного повідомлення (максимальна надійність)
for (var record : records) {
    process(record);
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
    consumer.commitSync(offsets);
}

2. Batch Commit with Error Handling:

try {
    for (var record : records) {
        process(record);
    }
    consumer.commitSync();
} catch (Exception e) {
    // Не коммітимо → повідомлення будуть прочитані знову
    log.error("Processing failed, will retry", e);
}

3. Async Commit with Retry:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // Sync commit — єдина retry спроба. Якщо і він впав — логуємо error.
        // Не робимо нескінченних retry щоб не блокувати обробку.
        try {
            consumer.commitSync(offsets);
        } catch (Exception e) {
            log.error("Commit retry failed", e);
        }
    }
});

Offset Metadata

// Збереження метаданих з offset
OffsetAndMetadata metadata = new OffsetAndMetadata(
    offset,
    "processed-at=" + System.currentTimeMillis()
);
consumer.commitSync(Map.of(partition, metadata));

Performance Optimization

Commit frequency trade-offs:
  Частіше комміт → менше дублікатів, більше overhead
  Рідше комміт → більше throughput, більше дублікатів при збої

Рекомендації:
  - Critical data: commit після кожного батча
  - High throughput: commit кожні N повідомлень
  - Balanced: commitAsync з retry на error

Monitoring Offset Commit

Ключові метрики:

kafka.consumer:commit-latency-avg
kafka.consumer:commit-rate
kafka.consumer:failed-commit-rate
kafka.consumer:last-commit-latency

Alerts:

- Commit latency > 100ms → warning
- Failed commit rate > 1% → critical
- No commits for > 5 minutes → critical

Best Practices

✅ commitSync() після обробки батча
✅ commitAsync() для high-throughput
✅ Обробка помилок комміту
✅ Idempotent processing для захисту від дублікатів
✅ Моніторинг commit latency

❌ Автокомміт для критичних даних
❌ Комміт до обробки
❌ Комміт в циклі без необхідності
❌ Ігнорування помилок комміту
❌ Без monitoring commit health

Архітектурні рішення

  1. Sync commit для reliability — критичні дані
  2. Async commit для throughput — high-volume системи
  3. Error handling обов’язкова — retry або alert
  4. Idempotent processing — захист від дублікатів при retry

Резюме для Senior

  • Commit process: consumer → __consumer_offsets → broker acknowledgment
  • Sync vs async — trade-off між reliability і performance
  • Failure scenarios потребують обробки retry
  • Metadata може зберігати додаткову інформацію
  • Monitoring commit latency критичний для production health

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Commit offset — збереження номера останнього обробленого повідомлення в __consumer_offsets
  • Sync commit: надійний, 5-50ms, блокує; Async commit: <5ms, не блокує, можлива втрата
  • Комміт ПІСЛЯ обробки = at-least-once; комміт ДО = data loss при crash
  • Batch commit: кожні N повідомлень — баланс між overhead і дублікатами
  • Auto commit коммітить отриманий offset, не оброблений — не для production
  • При failure: committed offset старий → повідомлення обробляться знову (idempotent processing)
  • Offset metadata дозволяє зберігати дод. інформацію (timestamp, processing state)

Часті уточнюючі запитання:

  • Коли використовувати sync vs async? — Sync для critical data, async для high-throughput.
  • Що буде при async commit failure? — Fallback на sync commit (єдина retry спроба).
  • Як коммітити конкретні offsets?commitSync(Map<TopicPartition, OffsetAndMetadata>).
  • Чому auto commit поганий для production? — Коммітить до обробки, немає контролю над помилками.

Червоні прапорці (НЕ говорити):

  • «Автокомміт — стандарт для production» — коммітить необроблені повідомлення
  • «Комміт в циклі кожного повідомлення — норма» — величезний overhead
  • «Async commit без error handling — OK» — втрата offsets при failure
  • «Offset коммітиться автоматично брокером» — це client-side logic

Пов’язані теми:

  • [[14. У чому різниця між auto commit і manual commit]]
  • [[12. Що таке offset в Kafka]]
  • [[24. Як обробляти помилки при читанні повідомлень]]
  • [[15. Що таке ребаланс і коли він відбувається]]