Як працює commit offset
4. Idempotent processing — захист від дублікатів при retry
Рівень Junior
Визначення
Commit offset — це збереження номера останнього обробленого повідомлення в Kafka.
Куди зберігається: у спеціальний внутрішній топик Kafka (__consumer_offsets). При перезапуску консьюмер читає звідти свою останню позицію і продовжує з неї. Без комміту — починає спочатку.
// Автоматичний комміт
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // кожні 5 секунд
// Ручний комміт
props.put("enable.auto.commit", "false");
consumer.commitSync(); // після обробки
Навіщо коммітити?
Без комміту:
Консьюмер впав → при перезапуску читає все спочатку
→ Дублікати, wasted processing
З коммітом:
Консьюмер впав → при перезапуску продовжує з останнього комміту
→ Мінімум дублікатів
Приклад
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record); // обробка
}
consumer.commitSync(); // комміт після обробки
}
Рівень Middle
Sync vs Async Commit
Sync Commit (синхронний):
// Чекає підтвердження від брокера
consumer.commitSync();
// Sync commit: latency 5-50ms на комміт. При 100 коммітах/сек — 500-5000ms overhead.
// Async commit: <5ms (не блокує).
// При помилці — викидає виключення
Async Commit (асинхронний):
// Не чекає підтвердження
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
// Швидше, але можлива втрата при збої
Коли що використовувати?
| Тип комміту | Коли використовувати |
|---|---|
| commitSync() | Критичні дані, низький throughput |
| commitAsync() | High-throughput, допустимі рідкі дублікати |
| auto.commit | Прототипи, не-production |
Batch Commit
// Комміт кожні N повідомлень
int count = 0;
for (var record : records) {
process(record);
if (++count % 100 == 0) {
consumer.commitAsync();
}
}
Committable Offsets
// Комміт конкретних offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(1000));
offsets.put(new TopicPartition("orders", 1), new OffsetAndMetadata(2000));
consumer.commitSync(offsets);
Типові помилки
- Комміт до обробки:
consumer.commitSync(); // ❌ спочатку комміт process(records); // потім обробка // Якщо впав → дані втрачені - Комміт в циклі обробки:
for (var record : records) { process(record); consumer.commitSync(); // ❌ занадто часто }
Рівень Senior
Internal Implementation
Commit Process:
1. Consumer → CommitOffset request → Broker
2. Broker → записує в __consumer_offsets topic
3. Broker → підтверджує комміт
4. Consumer → отримує підтвердження
__consumer_offsets topic:
- Внутрішній топик Kafka
- Компактується (log compaction)
- Зберігає offsets для всіх consumer groups
- Key: group.id + topic + partition
- Value: offset + metadata + timestamp
Offset Commit Latency
Sync commit latency:
Зазвичай: 5-20ms
При проблемах: >100ms
Impact: блокує обробку
Async commit latency:
Зазвичай: <5ms (не блокує)
Impact: мінімальний
Failure Scenarios
1. Commit failure:
Consumer → commit → network error → broker не отримав
При перезапуску:
- Committed offset старий
- Повідомлення обробляться знову
- Idempotent processing вирішує проблему
2. Broker failure під час commit:
Consumer → commit → broker записав → broker впав до підтвердження
Consumer → retry commit → новий leader
Commit може бути втрачений або повторений
Offset Management Patterns
1. Per-Record Commit:
// Комміт кожного повідомлення (максимальна надійність)
for (var record : records) {
process(record);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
}
2. Batch Commit with Error Handling:
try {
for (var record : records) {
process(record);
}
consumer.commitSync();
} catch (Exception e) {
// Не коммітимо → повідомлення будуть прочитані знову
log.error("Processing failed, will retry", e);
}
3. Async Commit with Retry:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// Sync commit — єдина retry спроба. Якщо і він впав — логуємо error.
// Не робимо нескінченних retry щоб не блокувати обробку.
try {
consumer.commitSync(offsets);
} catch (Exception e) {
log.error("Commit retry failed", e);
}
}
});
Offset Metadata
// Збереження метаданих з offset
OffsetAndMetadata metadata = new OffsetAndMetadata(
offset,
"processed-at=" + System.currentTimeMillis()
);
consumer.commitSync(Map.of(partition, metadata));
Performance Optimization
Commit frequency trade-offs:
Частіше комміт → менше дублікатів, більше overhead
Рідше комміт → більше throughput, більше дублікатів при збої
Рекомендації:
- Critical data: commit після кожного батча
- High throughput: commit кожні N повідомлень
- Balanced: commitAsync з retry на error
Monitoring Offset Commit
Ключові метрики:
kafka.consumer:commit-latency-avg
kafka.consumer:commit-rate
kafka.consumer:failed-commit-rate
kafka.consumer:last-commit-latency
Alerts:
- Commit latency > 100ms → warning
- Failed commit rate > 1% → critical
- No commits for > 5 minutes → critical
Best Practices
✅ commitSync() після обробки батча
✅ commitAsync() для high-throughput
✅ Обробка помилок комміту
✅ Idempotent processing для захисту від дублікатів
✅ Моніторинг commit latency
❌ Автокомміт для критичних даних
❌ Комміт до обробки
❌ Комміт в циклі без необхідності
❌ Ігнорування помилок комміту
❌ Без monitoring commit health
Архітектурні рішення
- Sync commit для reliability — критичні дані
- Async commit для throughput — high-volume системи
- Error handling обов’язкова — retry або alert
- Idempotent processing — захист від дублікатів при retry
Резюме для Senior
- Commit process: consumer → __consumer_offsets → broker acknowledgment
- Sync vs async — trade-off між reliability і performance
- Failure scenarios потребують обробки retry
- Metadata може зберігати додаткову інформацію
- Monitoring commit latency критичний для production health
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Commit offset — збереження номера останнього обробленого повідомлення в
__consumer_offsets - Sync commit: надійний, 5-50ms, блокує; Async commit: <5ms, не блокує, можлива втрата
- Комміт ПІСЛЯ обробки = at-least-once; комміт ДО = data loss при crash
- Batch commit: кожні N повідомлень — баланс між overhead і дублікатами
- Auto commit коммітить отриманий offset, не оброблений — не для production
- При failure: committed offset старий → повідомлення обробляться знову (idempotent processing)
- Offset metadata дозволяє зберігати дод. інформацію (timestamp, processing state)
Часті уточнюючі запитання:
- Коли використовувати sync vs async? — Sync для critical data, async для high-throughput.
- Що буде при async commit failure? — Fallback на sync commit (єдина retry спроба).
- Як коммітити конкретні offsets? —
commitSync(Map<TopicPartition, OffsetAndMetadata>). - Чому auto commit поганий для production? — Коммітить до обробки, немає контролю над помилками.
Червоні прапорці (НЕ говорити):
- «Автокомміт — стандарт для production» — коммітить необроблені повідомлення
- «Комміт в циклі кожного повідомлення — норма» — величезний overhead
- «Async commit без error handling — OK» — втрата offsets при failure
- «Offset коммітиться автоматично брокером» — це client-side logic
Пов’язані теми:
- [[14. У чому різниця між auto commit і manual commit]]
- [[12. Що таке offset в Kafka]]
- [[24. Як обробляти помилки при читанні повідомлень]]
- [[15. Що таке ребаланс і коли він відбувається]]