Вопрос 13 · Раздел 15

Как работает commit offset

4. Idempotent processing — защита от дубликатов при retry

Версии по языкам: English Russian Ukrainian

Уровень Junior

Определение

Commit offset — это сохранение номера последнего обработанного сообщения в Kafka.

Куда сохраняется: в специальный внутренний топик Kafka (__consumer_offsets). При рестарте консьюмер читает оттуда свою последнюю позицию и продолжает с неё. Без коммита — начинает сначала.

// Автоматический коммит
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");  // каждые 5 секунд

// Ручной коммит
props.put("enable.auto.commit", "false");
consumer.commitSync();  // после обработки

Зачем коммитить?

Без коммита:
  Консьюмер упал → при рестарте читает всё сначала
  → Дубликаты, wasted processing

С коммитом:
  Консьюмер упал → при рестарте продолжает с последнего коммита
  → Минимум дубликатов

Пример

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (var record : records) {
        process(record);  // обработка
    }
    consumer.commitSync();  // коммит после обработки
}

Уровень Middle

Sync vs Async Commit

Sync Commit (синхронный):

// Ждёт подтверждения от брокера
consumer.commitSync();
// Sync commit: latency 5-50ms на коммит. При 100 коммитах/сек — 500-5000ms overhead.
// Async commit: <5ms (не блокирует).
// При ошибке — выбрасывает исключение

Async Commit (асинхронный):

// Не ждёт подтверждения
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});
// Быстрее, но возможна потеря при сбое

Когда что использовать?

Тип коммита Когда использовать
commitSync() Критичные данные, низкий throughput
commitAsync() High-throughput, допустимы редкие дубликаты
auto.commit Прототипы, не-production

Batch Commit

// Коммит каждые N сообщений
int count = 0;
for (var record : records) {
    process(record);
    if (++count % 100 == 0) {
        consumer.commitAsync();
    }
}

Committable Offsets

// Коммит конкретных offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(1000));
offsets.put(new TopicPartition("orders", 1), new OffsetAndMetadata(2000));
consumer.commitSync(offsets);

Типичные ошибки

  1. Коммит до обработки:
    consumer.commitSync();  // ❌ сначала коммит
    process(records);       // потом обработка
    // Если упал → данные потеряны
    
  2. Коммит в цикле обработки:
    for (var record : records) {
     process(record);
     consumer.commitSync();  // ❌ слишком часто
    }
    

Уровень Senior

Internal Implementation

Commit Process:

1. Consumer → CommitOffset request → Broker
2. Broker → записывает в __consumer_offsets topic
3. Broker → подтверждает коммит
4. Consumer → получает подтверждение

__consumer_offsets topic:

- Внутренний топик Kafka
- Компактируется (log compaction)
- Хранит offsets для всех consumer groups
- Key: group.id + topic + partition
- Value: offset + metadata + timestamp

Offset Commit Latency

Sync commit latency:
  Обычно: 5-20ms
  При проблемах: >100ms
  Impact: блокирует обработку

Async commit latency:
  Обычно: <5ms (не блокирует)
  Impact: минимальный

Failure Scenarios

1. Commit failure:

Consumer → commit → network error → broker не получил
При рестарте:
  - Committed offset старый
  - Сообщения обработаются снова
  - Idempotent processing решает проблему

2. Broker failure во время commit:

Consumer → commit → broker записал → broker упал до подтверждения
Consumer → retry commit → новый leader
Commit может быть потерян или повторен

Offset Management Patterns

1. Per-Record Commit:

// Коммит каждого сообщения (максимальная надёжность)
for (var record : records) {
    process(record);
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
    consumer.commitSync(offsets);
}

2. Batch Commit with Error Handling:

try {
    for (var record : records) {
        process(record);
    }
    consumer.commitSync();
} catch (Exception e) {
    // Не коммитим → сообщения будут прочитаны снова
    log.error("Processing failed, will retry", e);
}

3. Async Commit with Retry:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // Sync commit — единственная retry попытка. Если и он упал — логируем error.
        // Не делаем бесконечных retry чтобы не блокировать обработку.
        try {
            consumer.commitSync(offsets);
        } catch (Exception e) {
            log.error("Commit retry failed", e);
        }
    }
});

Offset Metadata

// Сохранение метаданных с offset
OffsetAndMetadata metadata = new OffsetAndMetadata(
    offset,
    "processed-at=" + System.currentTimeMillis()
);
consumer.commitSync(Map.of(partition, metadata));

Performance Optimization

Commit frequency trade-offs:
  Чаще коммит → меньше дубликатов, больше overhead
  Реже коммит → больше throughput, больше дубликатов при сбое

Рекомендации:
  - Critical data: commit после каждого батча
  - High throughput: commit каждые N сообщений
  - Balanced: commitAsync с retry на error

Monitoring Offset Commit

Ключевые метрики:

kafka.consumer:commit-latency-avg
kafka.consumer:commit-rate
kafka.consumer:failed-commit-rate
kafka.consumer:last-commit-latency

Alerts:

- Commit latency > 100ms → warning
- Failed commit rate > 1% → critical
- No commits for > 5 minutes → critical

Best Practices

✅ commitSync() после обработки батча
✅ commitAsync() для high-throughput
✅ Обработка ошибок коммита
✅ Idempotent processing для защиты от дубликатов
✅ Мониторинг commit latency

❌ Автокоммит для критичных данных
❌ Коммит до обработки
❌ Коммит в цикле без необходимости
❌ Игнорирование ошибок коммита
❌ Без monitoring commit health

Архитектурные решения

  1. Sync commit для reliability — критичные данные
  2. Async commit для throughput — high-volume системы
  3. Error handling обязательна — retry или alert
  4. Idempotent processing — защита от дубликатов при retry

Резюме для Senior

  • Commit process: consumer → __consumer_offsets → broker acknowledgment
  • Sync vs async — trade-off между reliability и performance
  • Failure scenarios требуют обработки retry
  • Metadata может хранить дополнительную информацию
  • Monitoring commit latency критичен для production health

🎯 Шпаргалка для интервью

Обязательно знать:

  • Commit offset — сохранение номера последнего обработанного сообщения в __consumer_offsets
  • Sync commit: надёжный, 5-50ms, блокирует; Async commit: <5ms, не блокирует, возможна потеря
  • Коммит ПОСЛЕ обработки = at-least-once; коммит ДО = data loss при crash
  • Batch commit: каждые N сообщений — баланс между overhead и дубликатами
  • Auto commit коммитит полученный offset, не обработанный — не для production
  • При failure: committed offset старый → сообщения обработаются снова (idempotent processing)
  • Offset metadata позволяет хранить доп. информацию (timestamp, processing state)

Частые уточняющие вопросы:

  • Когда использовать sync vs async? — Sync для critical data, async для high-throughput.
  • Что будет при async commit failure? — Fallback на sync commit (единственная retry попытка).
  • Как коммитить конкретные offsets?commitSync(Map<TopicPartition, OffsetAndMetadata>).
  • Почему auto commit плох для production? — Коммитит до обработки, нет контроля над ошибками.

Красные флаги (НЕ говорить):

  • «Автокоммит — стандарт для production» — коммитит непрообработанные сообщения
  • «Коммит в цикле каждого сообщения — норма» — огромный overhead
  • «Async commit без error handling — OK» — потеря offsets при failure
  • «Offset коммитится автоматически брокером» — это client-side logic

Связанные темы:

  • [[14. В чём разница между auto commit и manual commit]]
  • [[12. Что такое offset в Kafka]]
  • [[24. Как обрабатывать ошибки при чтении сообщений]]
  • [[15. Что такое rebalancing и когда он происходит]]