Вопрос 28 · Раздел 15

Как удаляются старые сообщения из топика Kafka

Kafka удаляет старые сообщения по сегментам, когда они становятся старше заданного retention периода (для cleanup.policy=delete; для compact — удаление по наличию более новых зн...

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

Kafka удаляет старые сообщения по сегментам, когда они становятся старше заданного retention периода (для cleanup.policy=delete; для compact — удаление по наличию более новых значений того же ключа). Не по одному сообщению, а целыми файлами (сегментами).

Аналогия

Представьте шкаф с папками. Каждый день — новая папка. Когда папка становится старше 7 дней, вы выбрасываете её целиком, а не отдельные документы из неё. Это быстрее и проще.

Визуализация сегментов

Топик "orders", log.segment.bytes = 1GB:

[Segment 0] [Segment 1] [Segment 2] [Segment 3] [Active]
1GB         1GB         1GB         1GB         300MB
offsets:    offsets:    offsets:    offsets:    offsets:
0-500000    500001-     1000001-    1500001-    2000001-
            1000000     1500000     2000000     (writing)

retention.ms = 7 дней
Segment 0 создан 8 дней назад → УДАЛЯЕТСЯ ЦЕЛИКОМ

Каждый сегмент — это файлы

00000000000000000000.log       ← данные сообщений
00000000000000000000.index     ← индекс offset→position
00000000000000000000.timeindex ← индекс timestamp→offset

Когда это важно

  • Понимание disk usage — как Kafka управляет местом
  • Troubleshooting — почему данные “пропали”
  • Capacity planning — сколько disk нужно
  • Retention tuning — как изменение retention влияет на данные

🟡 Middle Level

Процесс удаления по шагам

1. Log Cleaner thread просыпается (каждые log.cleaner.backoff.ms)
2. Сканирует сегменты топика
3. Находит "deletable" сегменты:
   - Сегмент закрыт (не active)
   - Максимальный timestamp сегмента < now - retention.ms
   - ИЛИ размер топика > retention.bytes (удаляем oldest first)
4. Проверяет log.segment.delete.delay.ms (60s по умолчанию)
5. Удаляет файлы сегмента:
   - .log file
   - .index file
   - .timeindex file
   - .txnindex file (если есть)
6. Обновляет log metadata

Сегмент rollover

Новый сегмент создаётся когда:
  - log.segment.bytes достигнут (1GB по умолчанию)
  - log.segment.ms достигнут (7 дней по умолчанию)
  - log.roll.ms (alias для log.segment.ms)

Active segment → Closed → Eligible for deletion

Rollover с jitter:

log.roll.ms = 7 дней
log.roll.jitter.ms = 1 час (random)

Сегмент создаётся через 7 дней ± 1 час
Jitter предотвращает одновременный rollover всех partitions

Delete vs Compact — как удаляется

Аспект Delete Compact
Что удаляет Старые сегменты целиком Старые значения тех же ключей
Когда retention.ms или retention.bytes Когда есть newer value для того же ключа
Granularity Сегмент (1GB) Отдельные records
Speed Быстро (удалить файл) Медленно (rewrite сегмент)
Result Данные удалены Последнее значение для каждого ключа

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
Segment не удаляется Disk fill Log cleaner не работает Проверить log.cleaner.enable
Удаление нужных данных Consumer offset out of range Data loss retention.ms > consumer processing time
Частый rollover Много маленьких сегментов I/O overhead Увеличить log.segment.bytes
Редкий rollover Один огромный сегмент Delete = большой impact Уменьшить log.segment.bytes
Delete delay слишком большой Медленное освобождение disk Disk fill Уменьшить log.segment.delete.delay.ms
Compact для topics без ключей Ничего не compactится Wasted CPU Проверить что messages имеют keys

Когда сообщения НЕ удаляются

  1. Active segment — всегда protected
  2. retention.ms = -1 — хранить вечно
  3. retention.bytes = -1 — без лимита
  4. Log cleaner disabled — log.cleaner.enable=false
  5. Segment не deletable — timestamp ещё не expired
  6. Unclean shutdown — segment может быть locked

🔴 Senior Level

Частый вопрос на собеседовании

Что если консьюмер медленнее retention? Ответ: консьюмер получит OffsetOutOfRange и начнёт читать сначала (auto.offset.reset=earliest) или пропустит данные (latest). retention.ms должен быть больше максимального времени простоя консьюмера.

Глубокие внутренности

Log Cleaner — детальная архитектура

LogCleaner (kafka.log.LogCleaner):
├── Cleaner threads (log.cleaner.threads)
│   └── Каждый thread:
│       ├── LogCleanerManager (координирует работу)
│       ├── CleanablePartitions (выбирает что чистить)
│       └── Cleaner (выполняет actual cleaning)
│
├── Dedupe Buffer (log.cleaner.dedupe.buffer.size)
│   └── OffsetMap: key → latest offset
│   └── Hash table для compact deduplication
│
└── I/O throttling
    └── log.cleaner.io.buffer.size (512KB)
    └── log.cleaner.io.buffer.load.factor (0.9)

Delete process (для cleanup.policy=delete):

// Упрощённый алгоритм
def deleteOldSegments():
    for each partition:
        segments = partition.logSegments()
        deletable = segments.filter(s =>
            !s.isActive &&                              // не active
            s.maxTimestamp < now - retentionMs &&       // expired
            s.age > logSegmentDeleteDelayMs             // delay passed
        )
        for segment in deletable:
            segment.delete()                             // async delete
            asyncDelete(segment)

Compact process (для cleanup.policy=compact):

// Упрощённый алгоритм
def compactPartition():
    // 1. Build clean map (key → latest offset)
    cleanMap = OffsetMap()
    for segment in segments:
        for record in segment:
            if record.key != null:
                cleanMap.put(record.key, record.offset)
    
    // 2. Determine live records (latest value per key)
    liveOffsets = cleanMap.values()
    
    // 3. Copy live records to new segment
    newSegment = createCleanSegment()
    for record in segments:
        if record.offset in liveOffsets:
            newSegment.append(record)
    
    // 4. Swap old segments with new segment
    partition.replaceSegments(segments, newSegment)
    
    // 5. Delete old segment files
    for segment in segments:
        segment.delete()

Segment deletion — async и file system

Async deletion process:
1. Thread marks segment for deletion
2. Waits log.segment.delete.delay.ms (60s default)
3. Deletes .log, .index, .timeindex, .txnindex files
4. Files removed from directory

Async deletion rationale:
  - Consumers могут ещё читать этот сегмент
  - Delay дает время finish in-flight reads
  - Prevents "file not found" errors

File system behavior (Linux):
  rm(segment.log) → unlink() system call
  File не удаляется пока есть open file descriptors
  Consumers с in-flight reads продолжают читать
  После close всех FDs → disk space freed

Index files и их удаление

Каждый сегмент имеет 2 индекса:

Offset Index (.index):
  Maps offset → byte position in .log file
  Sparse index: каждые ~4KB данных
  Размер: log.index.size.max.bytes = 10MB

Time Index (.timeindex):
  Maps timestamp → offset
  Sparse index: каждые ~4KB данных
  Размер: log.index.size.max.bytes = 10MB

При удалении сегмента:
  Все 3 файла (.log, .index, .timeindex) удаляются вместе
  Нельзя удалить .log без индексов (atomically together)

Trade-offs

Параметр Маленькие сегменты Большие сегменты
Deletion granularity Высокая (меньше данных за раз) Низкая (больше данных за раз)
I/O overhead Выше (больше файлов) Ниже
Log cleaner efficiency Ниже (больше overhead) Выше
Recovery time Быстрее (меньше данных per segment) Медленнее
Disk fragmentation Выше Ниже
Recommended 100MB–500MB 1GB–2GB

Edge Cases

1. Consumer читает сегмент который удаляется:

Timeline:
  T=0:  Consumer читает offset 500 из Segment-1
  T=1:  retention.ms expired для Segment-1
  T=2:  Log cleaner начинает удаление Segment-1
  T=2.1: Consumer всё ещё читает (in-flight read)
  
Что происходит:
  log.segment.delete.delay.ms = 60s (default)
  Consumer имеет 60 секунд finish read
  После 60s: файлы удаляются
  Consumer если не дочитал → OffsetOutOfRange error

Решение:
  retention.ms > max consumer read time
  consumers читают быстрее чем retention expires

2. Unclean broker shutdown во время deletion:

Сценарий:
  Log cleaner удаляет Segment-5
  .log файл удалён, .index ещё нет
  Broker crash
  
При restart:
  Broker видит orphan .index файл без .log
  Broker удаляет .index файл (cleanup orphan)
  Данные уже удалены, не recoverable

Это safe behavior — segment deletion is idempotent

3. Retention bytes и partition imbalance:

retention.bytes = 10GB на partition
Partition 0: 5GB (низкий throughput)
Partition 1: 10GB (высокий throughput)
Partition 2: 2GB (низкий throughput)

Partition 1 достигает 10GB → начинает удалять
Partition 0 и 2 всё ещё имеют старые данные

Результат:
  Partition 1: хранит 2 дня данных
  Partition 0: хранит 7 дней данных
  Retention по времени НЕ одинаковый across partitions!

Решение:
  retention.ms для uniform time-based retention
  retention.bytes для disk protection, не для uniform retention

4. Compacted topic — удаление всех значений:

Сценарий:
  cleanup.policy = compact
  retention.ms = 86400000 (1 день)
  Key "user-1": последнее обновление 2 дня назад
  
  Log cleaner:
    1. Compact: у "user-1" только одно значение → нечего compact
    2. Delete: значение старше 1 дня → DELETE
  
  Результат: ключ "user-1" полностью удалён!
  Consumer при restart: key "user-1" = null

Это intended behavior для compact+delete!
  Для permanent state: retention.ms=-1
  Для changelog с TTL: compact+delete — OK

5. Segment deletion и replication:

Leader partition:
  Segment-5 (offsets 1M–1.5M) удалён по retention

Follower partition:
  Segment-5 ещё не удалён (clock skew или slower cleaner)
  
  Leader не может replicate deleted data
  Follower пытается читать с leader
  Leader: "Segment doesn't exist"
  Follower: OffsetOutOfRange → falls out of ISR

Решение:
  retention.ms одинаковый на всех brokers
  Clock synchronization (NTP)
  retention.ms > max replication lag time

6. Transaction index cleanup:

Топик с transactional messages:
  .txnindex файл содержит transaction markers
  При deletion сегмента:
    .txnindex удаляется вместе с .log
  
  Transaction coordinator:
    Отслеживает committed/aborted transactions
    Если transaction markers удалены:
      Committed transaction records всё ещё в consumer-visible segments
      Aborted transaction records тоже удалены
  
  Consumer isolation.level = read_committed:
    Не видит aborted transactions
    Retention deletion не влияет на transaction semantics

Производительность (production numbers)

Operation Time I/O CPU Impact
Segment deletion (1GB) 10–100ms Low (unlink) Minimal Negligible
Log cleaner compact pass 5–30 min High (read+write) Medium I/O impact
Segment rollover (1GB) 50–200ms Medium (file create) Low Brief I/O spike
Index rebuild (after crash) 1–5 min High (read all) Medium Startup delay

Production War Story

Ситуация: IoT platform, sensor data, 2M messages/min. retention.ms = 24 часа, log.segment.bytes = 1GB.

Проблема: Каждые 2 часа все 12 partitions одновременно создавали новые сегменты (rollover). I/O spike → consumer fetch latency 5s → lag spike → alerts → engineer wakeup call.

Расследование:

  • 2M msg/min × 500 bytes/msg = 1GB/min
  • log.segment.bytes = 1GB → rollover каждые 1 минуту
  • 12 partitions × rollover/min = 12 file creations/min
  • Без jitter → все partitions rollover синхронно
  • I/O pattern: burst every minute → latency spike

Root cause: log.roll.jitter.ms не был настроен → все partitions rollover одновременно.

Решение:

# server.properties
log.segment.bytes=536870912        # 512MB (больше сегментов, меньше каждый)
log.roll.ms=3600000                # 1 час
log.roll.jitter.ms=600000          # 10 минут jitter

# Результат:
# Rollover: каждые 1 час ± 10 мин
# 12 partitions с distributed rollover
# I/O spike: размазан по 20 минут
# Consumer latency: stable

Post-mortem lesson: jitter — не optional для high-throughput topics с множеством partitions.

Monitoring (JMX, Prometheus, Burrow)

JMX метрики:

kafka.log:type=LogCleaner,name=max-clean-time-secs
  - Max time for a single clean operation

kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
  - Dedupe buffer utilization

kafka.log:type=LogCleaner,name=total-cleans
  - Total number of completed cleans

kafka.log:type=Log,name=NumLogSegments,topic=orders,partition=0
  - Number of segments per partition

kafka.log:type=Log,name=Size,topic=orders,partition=0
  - Size of log in bytes

Prometheus + Grafana:

- record: kafka_partition_segment_count
  expr: kafka_log_num_log_segments

- record: kafka_partition_size_bytes
  expr: kafka_log_size

- alert: KafkaSegmentCountHigh
  expr: kafka_log_num_log_segments > 50
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Partition has > 50 segments  retention may need tuning"

- alert: KafkaLogCleanerSlow
  expr: kafka_log_cleaner_max_clean_time_secs > 1800
  for: 30m
  labels:
    severity: warning
  annotations:
    summary: "Log cleaner taking > 30 min per clean"

- alert: KafkaPartitionSizeHigh
  expr: kafka_log_size > 10737418240  # 10GB
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Partition size exceeded 10GB"

Highload Best Practices

✅ log.segment.bytes = 500MB–1GB (balance granularity vs I/O)
✅ log.roll.jitter.ms = 10–20% of log.roll.ms
✅ log.cleaner.threads = number of disks (1 per disk)
✅ log.cleaner.dedupe.buffer.size = 128MB–256MB
✅ log.segment.delete.delay.ms = 60000 (default OK)
✅ Monitor segment count per partition
✅ Monitor log cleaner throughput
✅ retention.ms >> consumer processing time
✅ Disk space alerting (70%, 85%, 95%)

❌ log.segment.bytes < 100MB (too many files)
❌ log.segment.bytes > 2GB (too large for efficient deletion)
❌ Без jitter (thundering herd rollover)
❌ log.cleaner.enable=false (segments не удаляются)
❌ retention.ms < consumer downtime + catch-up
❌ Без monitoring segment count

Архитектурные решения

  1. Segment-based deletion — efficient (file delete), not per-message
  2. Async deletion — delay для in-flight consumers
  3. Jitter — critical для high-throughput, multi-partition topics
  4. Log cleaner — background process, needs monitoring
  5. Index co-deletion — .log + .index + .timeindex всегда вместе

Резюме для Senior

  • Удаление по сегментам (не per-message) — efficient file operations
  • Каждый сегмент = .log + .index + .timeindex (+ .txnindex) файлы
  • Log cleaner handles compact, delete просто удаляет expired segments
  • log.segment.delete.delay.ms (60s default) — grace period для in-flight reads
  • Jitter критичен для high-throughput multi-partition topics
  • retention.bytes = per-partition limit, не total topic
  • Compact+delete может удалить все значения ключа (intended!)
  • Replication: leader deleted segment unavailable для followers
  • Monitor: segment count, cleaner time, partition size
  • At highload: jitter + proper segment size = stable I/O

🎯 Шпаргалка для интервью

Обязательно знать:

  • Удаление по сегментам (не per-message) — efficient file operations
  • Каждый сегмент = .log + .index + .timeindex (+ .txnindex) файлы
  • Сегмент deletable когда: не active, timestamp expired, или размер превышен
  • log.segment.delete.delay.ms=60s — grace period для in-flight reads
  • Log Cleaner handles compact (rewrite), delete просто удаляет expired segments
  • Jitter (log.roll.jitter.ms) критичен для high-throughput — предотвращает thundering herd
  • Rollover: log.segment.bytes (1GB) или log.segment.ms (7 дней)

Частые уточняющие вопросы:

  • Что если consumer читает удаляемый сегмент? — 60s delay даёт время finish read; после — OffsetOutOfRange.
  • Compact+delete может удалить все данные ключа? — Да, если одно значение и retention expired.
  • Зачем нужен jitter? — Без jitter все partitions rollover одновременно → I/O spike.
  • Leader удалил сегмент, follower нет — что будет? — Follower не может replicate → falls out of ISR.

Красные флаги (НЕ говорить):

  • «Сообщения удаляются по одному» — целыми сегментами
  • «Без jitter всё работает» — thundering herd rollover при high-throughput
  • «Compact удаляет сегменты целиком» — compact переписывает (rewrite), не delete
  • «retention.bytes=-1 — норма» — disk fill guaranteed

Связанные темы:

  • [[27. Что такое retention policy]]
  • [[2. Что такое партиция (partition) и зачем она нужна]]
  • [[1. Что такое топик (topic) в Kafka]]
  • [[16. Что такое репликация в Kafka]]