Вопрос 12 · Раздел 15

Что такое offset в Kafka

Offset — это НЕ уникальный ID сообщения. Это позиция в логе. Одно и то же сообщение в разных партициях имеет разные offsets. Offset монотонно растёт (0, 1, 2...) и никогда не ум...

Версии по языкам: English Russian Ukrainian

Уровень Junior

Определение

Offset — это порядковый номер сообщения в партиции.

Offset — это НЕ уникальный ID сообщения. Это позиция в логе. Одно и то же сообщение в разных партициях имеет разные offsets. Offset монотонно растёт (0, 1, 2…) и никогда не уменьшается, даже если сообщения удалены.

Каждое сообщение получает уникальный offset.

Партиция 0:
  offset 0: {"user": "user-1", "action": "login"}
  offset 1: {"user": "user-2", "action": "purchase"}
  offset 2: {"user": "user-1", "action": "logout"}
  offset 3: {"user": "user-3", "action": "login"}

Основные свойства

  • Offset — это число, начинающееся с 0
  • Offset уникален только внутри одной партиции
  • Offset нельзя изменить после записи
  • Сообщения упорядочены по offset

Как консьюмер использует offset?

Консьюмер запоминает свой offset:
  Прочитал offset 5 → запомнил 5
  При рестарте → продолжает с offset 6

Пример кода

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.println("Partition: " + record.partition());
    System.out.println("Offset: " + record.offset());
    System.out.println("Value: " + record.value());
}

Уровень Middle

Committed Offset

Committed offset — последний обработанный offset, сохранённый в Kafka

Консьюмер читает offset 5 → обрабатывает → коммитит offset 5
При рестарте → продолжает с offset 6

Consumer Position

// Текущая позиция (следующий offset для чтения)
long position = consumer.position(new TopicPartition("orders", 0));

// Последний закоммиченный offset
OffsetAndMetadata committed = consumer.committed(new TopicPartition("orders", 0));

Offset Management

// Ручной коммит
props.put("enable.auto.commit", "false");

// Синхронный коммит (надёжный)
consumer.commitSync();

// Асинхронный коммит (быстрый)
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});

Seek — чтение с определённого offset

TopicPartition partition = new TopicPartition("orders", 0);

// Чтение с конкретного offset
consumer.seek(partition, 1000);

// Чтение с начала
consumer.seekToBeginning(List.of(partition));

// Чтение с конца
consumer.seekToEnd(List.of(partition));

Типичные ошибки

  1. Коммит до обработки:
    consumer.commitSync();  // ❌ сначала коммит
    process(records);       // потом обработка
    // Если упал → данные потеряны
    
  2. Ручное изменение offsets без причины:
    consumer.seek(partition, 0);  // чтение с начала
    // Может вызвать дубликаты
    

Уровень Senior

Internal Implementation

__consumer_offsets topic:

Внутренний топик Kafka для хранения offsets
- Компактируется (log compaction)
- Хранит offsets для всех consumer groups
- Ключ: group.id + topic + partition
- Значение: offset + metadata + timestamp

Offset Storage Format:

Key: [group.id, topic, partition]
Value: {
    offset: long,
    metadata: String,
    leaderEpoch: int,
    timestamp: long
}

Offset Commit Strategies

1. Synchronous Commit:

// Блокирует до подтверждения брокером
consumer.commitSync();
// Надёжно, но медленнее

2. Asynchronous Commit:

// Не ждёт подтверждения
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // Retry или логирование
        log.error("Commit failed for offsets: {}", offsets, exception);
    }
});
// Быстрее, но возможна потеря при сбое

3. Batch Commit:

// Коммит каждые N сообщений
int count = 0;
for (var record : records) {
    process(record);
    if (++count % 100 == 0) {
        consumer.commitAsync();
    }
}

Offset Reset Policy

// auto.offset.reset — поведение при отсутствии committed offset
props.put("auto.offset.reset", "earliest");  // читать с начала
props.put("auto.offset.reset", "latest");    // читать с конца (по умолчанию)
props.put("auto.offset.reset", "none");      // исключение

Leader Epoch и Offset Validation

Начиная с Kafka 0.11:
- Leader Epoch хранится с каждым offset
- При failover консьюмер может определить
  какие сообщения могли быть потеряны
- Предотвращает silent data loss

Leader Epoch — номер поколения лидера партиции. Каждый раз при смене лидера epoch
увеличивается. Позволяет консьюмеру определить, какие сообщения могли быть потеряны
при failover.

Offset Monitoring

Ключевые метрики:

Current Offset → последний прочитанный
Committed Offset → последний закоммиченный
Log End Offset → последний в партиции
Consumer Lag = Log End Offset - Committed Offset

Monitoring lag:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

GROUP           TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
my-group        orders 0          1000            1200            200

LAG=200 означает: 200 сообщений в партиции ещё не прочитаны. Критичность зависит от throughput: при 100 msg/s — 2 секунды отставания (нормально), при 1 msg/s — 200 секунд (критично).

Offset Recovery After Failure

Сценарий:
1. Консьюмер прочитал offset 100
2. Начал обработку → упал
3. Не закоммитил offset

При рестарте:
- Committed offset = 99
- Продолжит с offset 100
- Сообщение 100 обработается снова (at-least-once)

Best Practices

✅ Коммит после обработки
✅ Синхронный коммит для надёжности
✅ Асинхронный коммит для high-throughput
✅ Мониторинг lag
✅ Настройка auto.offset.reset

❌ Автокоммит для критичных данных
❌ Коммит до обработки
❌ Ручное изменение offsets без причины
❌ Игнорирование consumer lag
❌ Без monitoring offsets

Архитектурные решения

  1. Committed offset — основа at-least-once — гарантирует доставку
  2. __consumer_offsets — компактный внутренний топик — эффективное хранение
  3. Leader epoch validation — предотвращает silent data loss
  4. Monitoring lag — ранний индикатор проблем

Резюме для Senior

  • Offset — фундаментальный механизм отслеживания прогресса
  • __consumer_offsets хранит offsets для всех групп
  • Коммит стратегия влияет на reliability и performance
  • Leader epoch validation критичен для data safety
  • Monitoring lag обязателен для production

🎯 Шпаргалка для интервью

Обязательно знать:

  • Offset — порядковый номер сообщения в партиции, начинается с 0
  • Offset уникален только внутри одной партиции, не глобально
  • Offset нельзя изменить после записи; сообщения упорядочены по offset
  • Committed offset — последний обработанный offset, сохранённый в __consumer_offsets
  • Consumer Lag = LogEndOffset - CommittedOffset — ключевая метрика здоровья
  • auto.offset.reset: earliest (с начала), latest (с конца, по умолчанию), none (exception)
  • Leader Epoch (с Kafka 0.11) — предотвращает silent data loss при failover
  • Seek позволяет читать с любого offset: начала, конца, конкретного номера, timestamp

Частые уточняющие вопросы:

  • Что такое Leader Epoch? — Номер поколения лидера; помогает определить потерянные сообщения при failover.
  • Где хранятся committed offsets? — Во внутреннем топике __consumer_offsets (compact).
  • Что значит LAG=200? — 200 сообщений ещё не прочитаны; критичность зависит от throughput.
  • Что будет при коммите до обработки? — При crash сообщения потеряны (offset закоммичен, не обработан).

Красные флаги (НЕ говорить):

  • «Offset — уникальный ID сообщения» — это позиция в логе, не ID
  • «Offset можно изменить» — нельзя, immutable
  • «Offset уникален во всём топике» — только внутри партиции
  • «Коммит перед обработкой — норма» — это data loss при crash

Связанные темы:

  • [[13. Как работает commit offset]]
  • [[14. В чём разница между auto commit и manual commit]]
  • [[26. Как мониторить lag консьюмера]]
  • [[15. Что такое rebalancing и когда он происходит]]