Что такое offset в Kafka
Offset — это НЕ уникальный ID сообщения. Это позиция в логе. Одно и то же сообщение в разных партициях имеет разные offsets. Offset монотонно растёт (0, 1, 2...) и никогда не ум...
Уровень Junior
Определение
Offset — это порядковый номер сообщения в партиции.
Offset — это НЕ уникальный ID сообщения. Это позиция в логе. Одно и то же сообщение в разных партициях имеет разные offsets. Offset монотонно растёт (0, 1, 2…) и никогда не уменьшается, даже если сообщения удалены.
Каждое сообщение получает уникальный offset.
Партиция 0:
offset 0: {"user": "user-1", "action": "login"}
offset 1: {"user": "user-2", "action": "purchase"}
offset 2: {"user": "user-1", "action": "logout"}
offset 3: {"user": "user-3", "action": "login"}
Основные свойства
- Offset — это число, начинающееся с 0
- Offset уникален только внутри одной партиции
- Offset нельзя изменить после записи
- Сообщения упорядочены по offset
Как консьюмер использует offset?
Консьюмер запоминает свой offset:
Прочитал offset 5 → запомнил 5
При рестарте → продолжает с offset 6
Пример кода
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
System.out.println("Value: " + record.value());
}
Уровень Middle
Committed Offset
Committed offset — последний обработанный offset, сохранённый в Kafka
Консьюмер читает offset 5 → обрабатывает → коммитит offset 5
При рестарте → продолжает с offset 6
Consumer Position
// Текущая позиция (следующий offset для чтения)
long position = consumer.position(new TopicPartition("orders", 0));
// Последний закоммиченный offset
OffsetAndMetadata committed = consumer.committed(new TopicPartition("orders", 0));
Offset Management
// Ручной коммит
props.put("enable.auto.commit", "false");
// Синхронный коммит (надёжный)
consumer.commitSync();
// Асинхронный коммит (быстрый)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
Seek — чтение с определённого offset
TopicPartition partition = new TopicPartition("orders", 0);
// Чтение с конкретного offset
consumer.seek(partition, 1000);
// Чтение с начала
consumer.seekToBeginning(List.of(partition));
// Чтение с конца
consumer.seekToEnd(List.of(partition));
Типичные ошибки
- Коммит до обработки:
consumer.commitSync(); // ❌ сначала коммит process(records); // потом обработка // Если упал → данные потеряны - Ручное изменение offsets без причины:
consumer.seek(partition, 0); // чтение с начала // Может вызвать дубликаты
Уровень Senior
Internal Implementation
__consumer_offsets topic:
Внутренний топик Kafka для хранения offsets
- Компактируется (log compaction)
- Хранит offsets для всех consumer groups
- Ключ: group.id + topic + partition
- Значение: offset + metadata + timestamp
Offset Storage Format:
Key: [group.id, topic, partition]
Value: {
offset: long,
metadata: String,
leaderEpoch: int,
timestamp: long
}
Offset Commit Strategies
1. Synchronous Commit:
// Блокирует до подтверждения брокером
consumer.commitSync();
// Надёжно, но медленнее
2. Asynchronous Commit:
// Не ждёт подтверждения
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// Retry или логирование
log.error("Commit failed for offsets: {}", offsets, exception);
}
});
// Быстрее, но возможна потеря при сбое
3. Batch Commit:
// Коммит каждые N сообщений
int count = 0;
for (var record : records) {
process(record);
if (++count % 100 == 0) {
consumer.commitAsync();
}
}
Offset Reset Policy
// auto.offset.reset — поведение при отсутствии committed offset
props.put("auto.offset.reset", "earliest"); // читать с начала
props.put("auto.offset.reset", "latest"); // читать с конца (по умолчанию)
props.put("auto.offset.reset", "none"); // исключение
Leader Epoch и Offset Validation
Начиная с Kafka 0.11:
- Leader Epoch хранится с каждым offset
- При failover консьюмер может определить
какие сообщения могли быть потеряны
- Предотвращает silent data loss
Leader Epoch — номер поколения лидера партиции. Каждый раз при смене лидера epoch
увеличивается. Позволяет консьюмеру определить, какие сообщения могли быть потеряны
при failover.
Offset Monitoring
Ключевые метрики:
Current Offset → последний прочитанный
Committed Offset → последний закоммиченный
Log End Offset → последний в партиции
Consumer Lag = Log End Offset - Committed Offset
Monitoring lag:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group orders 0 1000 1200 200
LAG=200 означает: 200 сообщений в партиции ещё не прочитаны. Критичность зависит от throughput: при 100 msg/s — 2 секунды отставания (нормально), при 1 msg/s — 200 секунд (критично).
Offset Recovery After Failure
Сценарий:
1. Консьюмер прочитал offset 100
2. Начал обработку → упал
3. Не закоммитил offset
При рестарте:
- Committed offset = 99
- Продолжит с offset 100
- Сообщение 100 обработается снова (at-least-once)
Best Practices
✅ Коммит после обработки
✅ Синхронный коммит для надёжности
✅ Асинхронный коммит для high-throughput
✅ Мониторинг lag
✅ Настройка auto.offset.reset
❌ Автокоммит для критичных данных
❌ Коммит до обработки
❌ Ручное изменение offsets без причины
❌ Игнорирование consumer lag
❌ Без monitoring offsets
Архитектурные решения
- Committed offset — основа at-least-once — гарантирует доставку
- __consumer_offsets — компактный внутренний топик — эффективное хранение
- Leader epoch validation — предотвращает silent data loss
- Monitoring lag — ранний индикатор проблем
Резюме для Senior
- Offset — фундаментальный механизм отслеживания прогресса
- __consumer_offsets хранит offsets для всех групп
- Коммит стратегия влияет на reliability и performance
- Leader epoch validation критичен для data safety
- Monitoring lag обязателен для production
🎯 Шпаргалка для интервью
Обязательно знать:
- Offset — порядковый номер сообщения в партиции, начинается с 0
- Offset уникален только внутри одной партиции, не глобально
- Offset нельзя изменить после записи; сообщения упорядочены по offset
- Committed offset — последний обработанный offset, сохранённый в
__consumer_offsets - Consumer Lag = LogEndOffset - CommittedOffset — ключевая метрика здоровья
auto.offset.reset: earliest (с начала), latest (с конца, по умолчанию), none (exception)- Leader Epoch (с Kafka 0.11) — предотвращает silent data loss при failover
- Seek позволяет читать с любого offset: начала, конца, конкретного номера, timestamp
Частые уточняющие вопросы:
- Что такое Leader Epoch? — Номер поколения лидера; помогает определить потерянные сообщения при failover.
- Где хранятся committed offsets? — Во внутреннем топике
__consumer_offsets(compact). - Что значит LAG=200? — 200 сообщений ещё не прочитаны; критичность зависит от throughput.
- Что будет при коммите до обработки? — При crash сообщения потеряны (offset закоммичен, не обработан).
Красные флаги (НЕ говорить):
- «Offset — уникальный ID сообщения» — это позиция в логе, не ID
- «Offset можно изменить» — нельзя, immutable
- «Offset уникален во всём топике» — только внутри партиции
- «Коммит перед обработкой — норма» — это data loss при crash
Связанные темы:
- [[13. Как работает commit offset]]
- [[14. В чём разница между auto commit и manual commit]]
- [[26. Как мониторить lag консьюмера]]
- [[15. Что такое rebalancing и когда он происходит]]