Питання 28 · Розділ 15

Як видаляються старі повідомлення з топика Kafka

Kafka видаляє старі повідомлення по сегментах, коли вони стають старшими за заданий retention період (для cleanup.policy=delete; для compact — видалення за наявністю новіших зна...

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Просте визначення

Kafka видаляє старі повідомлення по сегментах, коли вони стають старшими за заданий retention період (для cleanup.policy=delete; для compact — видалення за наявністю новіших значень того ж ключа). Не по одному повідомленню, а цілими файлами (сегментами).

Аналогія

Уявіть шафу з папками. Кожен день — нова папка. Коли папка стає старшою 7 днів, ви викидаєте її цілком, а не окремі документи з неї. Це швидше і простіше.

Візуалізація сегментів

Топик "orders", log.segment.bytes = 1GB:

[Segment 0] [Segment 1] [Segment 2] [Segment 3] [Active]
1GB         1GB         1GB         1GB         300MB

retention.ms = 7 днів
Segment 0 створено 8 днів тому → ВИДАЛЯЄТЬСЯ ЦІЛКОМ

Кожен сегмент — це файли

00000000000000000000.log       ← дані повідомлень
00000000000000000000.index     ← індекс offset→position
00000000000000000000.timeindex ← індекс timestamp→offset

Коли це важливо

  • Розуміння disk usage — як Kafka управляє місцем
  • Troubleshooting — чому дані “пропали”
  • Capacity planning — скільки disk потрібно
  • Retention tuning — як зміна retention впливає на дані

🟡 Middle Level

Процес видалення по кроках

1. Log Cleaner thread прокидається (кожні log.cleaner.backoff.ms)
2. Сканує сегменти топика
3. Знаходить "deletable" сегменти:
   - Сегмент закритий (не active)
   - Максимальний timestamp сегмента < now - retention.ms
   - АБО розмір топика > retention.bytes (видаляємо oldest first)
4. Перевіряє log.segment.delete.delay.ms (60s за замовчуванням)
5. Видаляє файли сегмента: .log, .index, .timeindex, .txnindex
6. Оновлює log metadata

Delete vs Compact — як видаляється

Аспект Delete Compact
Що видаляє Старі сегменти цілком Старі значення тих самих ключів
Коли retention.ms або retention.bytes Коли є newer value для того ж ключа
Granularity Сегмент (1GB) Окремі records
Speed Швидко (видалити файл) Повільно (rewrite сегмент)

Коли повідомлення НЕ видаляються

  1. Active segment — завжди protected
  2. retention.ms = -1 — зберігати вічно
  3. retention.bytes = -1 — без ліміту
  4. Log cleaner disabled — log.cleaner.enable=false
  5. Segment не deletable — timestamp ще не expired

🔴 Senior Level

Log Cleaner — детальна архітектура

LogCleaner:
├── Cleaner threads (log.cleaner.threads)
├── Dedupe Buffer (log.cleaner.dedupe.buffer.size)
└── I/O throttling

Delete process (для cleanup.policy=delete):

def deleteOldSegments():
    for each partition:
        deletable = segments.filter(s =>
            !s.isActive &&
            s.maxTimestamp < now - retentionMs &&
            s.age > logSegmentDeleteDelayMs
        )
        for segment in deletable:
            asyncDelete(segment)

Compact process (для cleanup.policy=compact):

def compactPartition():
    // 1. Build clean map (key → latest offset)
    // 2. Determine live records
    // 3. Copy live records to new segment
    // 4. Swap old segments with new segment
    // 5. Delete old segment files

Segment deletion — async і file system

Async deletion process:
1. Thread marks segment for deletion
2. Waits log.segment.delete.delay.ms (60s default)
3. Deletes .log, .index, .timeindex, .txnindex files

Rationale:
  - Consumers можуть ще читати цей сегмент
  - Delay дає час finish in-flight reads
  - Prevents "file not found" errors

Linux: rm() → unlink(). File не видаляється поки є open file descriptors.

Edge Cases

1. Consumer читає сегмент який видаляється:

log.segment.delete.delay.ms = 60s (default)
Consumer має 60 секунд finish read
Після 60s: файли видаляються → OffsetOutOfRange error

2. Retention bytes і partition imbalance:

retention.bytes = 10GB на partition
Partition 1: досягає 10GB → починає видаляти (2 дні даних)
Partition 0: все ще має 7 днів даних
Retention по часу НЕ однаковий across partitions!

3. Compact+delete — видалення всіх значень:

cleanup.policy = compact, retention.ms = 1 день
Key "user-1": останнє оновлення 2 дні тому
→ Compact: тільки одне значення → нічого compact
→ Delete: значення старше 1 дня → DELETE
→ Ключ "user-1" повністю видалений!

Highload Best Practices

✅ log.segment.bytes = 500MB–1GB (balance granularity vs I/O)
✅ log.roll.jitter.ms = 10–20% of log.roll.ms
✅ log.cleaner.threads = number of disks
✅ log.segment.delete.delay.ms = 60000 (default OK)
✅ Monitor segment count per partition
✅ retention.ms >> consumer processing time
✅ Disk space alerting (70%, 85%, 95%)

❌ log.segment.bytes < 100MB (too many files)
❌ Без jitter (thundering herd rollover)
❌ log.cleaner.enable=false (segments не видаляються)
❌ retention.ms < consumer downtime + catch-up

Архітектурні рішення

  1. Segment-based deletion — efficient (file delete), not per-message
  2. Async deletion — delay для in-flight consumers
  3. Jitter — critical для high-throughput, multi-partition topics
  4. Log cleaner — background process, needs monitoring
  5. Index co-deletion — .log + .index + .timeindex завжди разом

Резюме для Senior

  • Видалення по сегментах (не per-message) — efficient file operations
  • Кожен сегмент = .log + .index + .timeindex (+ .txnindex) файли
  • Log cleaner handles compact (rewrite), delete просто видаляє expired segments
  • log.segment.delete.delay.ms=60s — grace period для in-flight reads
  • Jitter критичний для high-throughput multi-partition topics
  • retention.bytes = per-partition limit, не total topic
  • Compact+delete може видалити всі значення ключа (intended!)
  • Monitor: segment count, cleaner time, partition size

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Видалення по сегментах (не per-message) — efficient file operations
  • Кожен сегмент = .log + .index + .timeindex (+ .txnindex) файли
  • Сегмент deletable коли: не active, timestamp expired, або розмір перевищений
  • log.segment.delete.delay.ms=60s — grace period для in-flight reads
  • Log Cleaner handles compact (rewrite), delete просто видаляє expired segments
  • Jitter (log.roll.jitter.ms) критичний для high-throughput — запобігає thundering herd
  • Rollover: log.segment.bytes (1GB) або log.segment.ms (7 днів)

Часті уточнюючі запитання:

  • Що якщо consumer читає сегмент який видаляється? — 60s delay дає час finish read; після — OffsetOutOfRange.
  • Compact+delete може видалити всі дані ключа? — Так, якщо одне значення і retention expired.
  • Навіщо потрібен jitter? — Без jitter всі partitions rollover одночасно → I/O spike.
  • Leader видалив сегмент, follower ні — що буде? — Follower не може replicate → falls out of ISR.

Червоні прапорці (НЕ говорити):

  • «Повідомлення видаляються по одному» — цілими сегментами
  • «Без jitter все працює» — thundering herd rollover при high-throughput
  • «Compact видаляє сегменти цілком» — compact переписує (rewrite), не delete
  • «retention.bytes=-1 — норма» — disk fill guaranteed

Пов’язані теми:

  • [[27. Що таке retention policy]]
  • [[2. Що таке партиція (partition) і навіщо вона потрібна]]
  • [[1. Що таке топiк (topic) в Kafka]]
  • [[16. Що таке реплікація в Kafka]]