Как работает commit offset
4. Idempotent processing — защита от дубликатов при retry
Уровень Junior
Определение
Commit offset — это сохранение номера последнего обработанного сообщения в Kafka.
Куда сохраняется: в специальный внутренний топик Kafka (__consumer_offsets). При рестарте консьюмер читает оттуда свою последнюю позицию и продолжает с неё. Без коммита — начинает сначала.
// Автоматический коммит
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // каждые 5 секунд
// Ручной коммит
props.put("enable.auto.commit", "false");
consumer.commitSync(); // после обработки
Зачем коммитить?
Без коммита:
Консьюмер упал → при рестарте читает всё сначала
→ Дубликаты, wasted processing
С коммитом:
Консьюмер упал → при рестарте продолжает с последнего коммита
→ Минимум дубликатов
Пример
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record); // обработка
}
consumer.commitSync(); // коммит после обработки
}
Уровень Middle
Sync vs Async Commit
Sync Commit (синхронный):
// Ждёт подтверждения от брокера
consumer.commitSync();
// Sync commit: latency 5-50ms на коммит. При 100 коммитах/сек — 500-5000ms overhead.
// Async commit: <5ms (не блокирует).
// При ошибке — выбрасывает исключение
Async Commit (асинхронный):
// Не ждёт подтверждения
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
// Быстрее, но возможна потеря при сбое
Когда что использовать?
| Тип коммита | Когда использовать |
|---|---|
| commitSync() | Критичные данные, низкий throughput |
| commitAsync() | High-throughput, допустимы редкие дубликаты |
| auto.commit | Прототипы, не-production |
Batch Commit
// Коммит каждые N сообщений
int count = 0;
for (var record : records) {
process(record);
if (++count % 100 == 0) {
consumer.commitAsync();
}
}
Committable Offsets
// Коммит конкретных offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(1000));
offsets.put(new TopicPartition("orders", 1), new OffsetAndMetadata(2000));
consumer.commitSync(offsets);
Типичные ошибки
- Коммит до обработки:
consumer.commitSync(); // ❌ сначала коммит process(records); // потом обработка // Если упал → данные потеряны - Коммит в цикле обработки:
for (var record : records) { process(record); consumer.commitSync(); // ❌ слишком часто }
Уровень Senior
Internal Implementation
Commit Process:
1. Consumer → CommitOffset request → Broker
2. Broker → записывает в __consumer_offsets topic
3. Broker → подтверждает коммит
4. Consumer → получает подтверждение
__consumer_offsets topic:
- Внутренний топик Kafka
- Компактируется (log compaction)
- Хранит offsets для всех consumer groups
- Key: group.id + topic + partition
- Value: offset + metadata + timestamp
Offset Commit Latency
Sync commit latency:
Обычно: 5-20ms
При проблемах: >100ms
Impact: блокирует обработку
Async commit latency:
Обычно: <5ms (не блокирует)
Impact: минимальный
Failure Scenarios
1. Commit failure:
Consumer → commit → network error → broker не получил
При рестарте:
- Committed offset старый
- Сообщения обработаются снова
- Idempotent processing решает проблему
2. Broker failure во время commit:
Consumer → commit → broker записал → broker упал до подтверждения
Consumer → retry commit → новый leader
Commit может быть потерян или повторен
Offset Management Patterns
1. Per-Record Commit:
// Коммит каждого сообщения (максимальная надёжность)
for (var record : records) {
process(record);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
}
2. Batch Commit with Error Handling:
try {
for (var record : records) {
process(record);
}
consumer.commitSync();
} catch (Exception e) {
// Не коммитим → сообщения будут прочитаны снова
log.error("Processing failed, will retry", e);
}
3. Async Commit with Retry:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// Sync commit — единственная retry попытка. Если и он упал — логируем error.
// Не делаем бесконечных retry чтобы не блокировать обработку.
try {
consumer.commitSync(offsets);
} catch (Exception e) {
log.error("Commit retry failed", e);
}
}
});
Offset Metadata
// Сохранение метаданных с offset
OffsetAndMetadata metadata = new OffsetAndMetadata(
offset,
"processed-at=" + System.currentTimeMillis()
);
consumer.commitSync(Map.of(partition, metadata));
Performance Optimization
Commit frequency trade-offs:
Чаще коммит → меньше дубликатов, больше overhead
Реже коммит → больше throughput, больше дубликатов при сбое
Рекомендации:
- Critical data: commit после каждого батча
- High throughput: commit каждые N сообщений
- Balanced: commitAsync с retry на error
Monitoring Offset Commit
Ключевые метрики:
kafka.consumer:commit-latency-avg
kafka.consumer:commit-rate
kafka.consumer:failed-commit-rate
kafka.consumer:last-commit-latency
Alerts:
- Commit latency > 100ms → warning
- Failed commit rate > 1% → critical
- No commits for > 5 minutes → critical
Best Practices
✅ commitSync() после обработки батча
✅ commitAsync() для high-throughput
✅ Обработка ошибок коммита
✅ Idempotent processing для защиты от дубликатов
✅ Мониторинг commit latency
❌ Автокоммит для критичных данных
❌ Коммит до обработки
❌ Коммит в цикле без необходимости
❌ Игнорирование ошибок коммита
❌ Без monitoring commit health
Архитектурные решения
- Sync commit для reliability — критичные данные
- Async commit для throughput — high-volume системы
- Error handling обязательна — retry или alert
- Idempotent processing — защита от дубликатов при retry
Резюме для Senior
- Commit process: consumer → __consumer_offsets → broker acknowledgment
- Sync vs async — trade-off между reliability и performance
- Failure scenarios требуют обработки retry
- Metadata может хранить дополнительную информацию
- Monitoring commit latency критичен для production health
🎯 Шпаргалка для интервью
Обязательно знать:
- Commit offset — сохранение номера последнего обработанного сообщения в
__consumer_offsets - Sync commit: надёжный, 5-50ms, блокирует; Async commit: <5ms, не блокирует, возможна потеря
- Коммит ПОСЛЕ обработки = at-least-once; коммит ДО = data loss при crash
- Batch commit: каждые N сообщений — баланс между overhead и дубликатами
- Auto commit коммитит полученный offset, не обработанный — не для production
- При failure: committed offset старый → сообщения обработаются снова (idempotent processing)
- Offset metadata позволяет хранить доп. информацию (timestamp, processing state)
Частые уточняющие вопросы:
- Когда использовать sync vs async? — Sync для critical data, async для high-throughput.
- Что будет при async commit failure? — Fallback на sync commit (единственная retry попытка).
- Как коммитить конкретные offsets? —
commitSync(Map<TopicPartition, OffsetAndMetadata>). - Почему auto commit плох для production? — Коммитит до обработки, нет контроля над ошибками.
Красные флаги (НЕ говорить):
- «Автокоммит — стандарт для production» — коммитит непрообработанные сообщения
- «Коммит в цикле каждого сообщения — норма» — огромный overhead
- «Async commit без error handling — OK» — потеря offsets при failure
- «Offset коммитится автоматически брокером» — это client-side logic
Связанные темы:
- [[14. В чём разница между auto commit и manual commit]]
- [[12. Что такое offset в Kafka]]
- [[24. Как обрабатывать ошибки при чтении сообщений]]
- [[15. Что такое rebalancing и когда он происходит]]