Как удаляются старые сообщения из топика Kafka
Kafka удаляет старые сообщения по сегментам, когда они становятся старше заданного retention периода (для cleanup.policy=delete; для compact — удаление по наличию более новых зн...
🟢 Junior Level
Простое определение
Kafka удаляет старые сообщения по сегментам, когда они становятся старше заданного retention периода (для cleanup.policy=delete; для compact — удаление по наличию более новых значений того же ключа). Не по одному сообщению, а целыми файлами (сегментами).
Аналогия
Представьте шкаф с папками. Каждый день — новая папка. Когда папка становится старше 7 дней, вы выбрасываете её целиком, а не отдельные документы из неё. Это быстрее и проще.
Визуализация сегментов
Топик "orders", log.segment.bytes = 1GB:
[Segment 0] [Segment 1] [Segment 2] [Segment 3] [Active]
1GB 1GB 1GB 1GB 300MB
offsets: offsets: offsets: offsets: offsets:
0-500000 500001- 1000001- 1500001- 2000001-
1000000 1500000 2000000 (writing)
retention.ms = 7 дней
Segment 0 создан 8 дней назад → УДАЛЯЕТСЯ ЦЕЛИКОМ
Каждый сегмент — это файлы
00000000000000000000.log ← данные сообщений
00000000000000000000.index ← индекс offset→position
00000000000000000000.timeindex ← индекс timestamp→offset
Когда это важно
- Понимание disk usage — как Kafka управляет местом
- Troubleshooting — почему данные “пропали”
- Capacity planning — сколько disk нужно
- Retention tuning — как изменение retention влияет на данные
🟡 Middle Level
Процесс удаления по шагам
1. Log Cleaner thread просыпается (каждые log.cleaner.backoff.ms)
2. Сканирует сегменты топика
3. Находит "deletable" сегменты:
- Сегмент закрыт (не active)
- Максимальный timestamp сегмента < now - retention.ms
- ИЛИ размер топика > retention.bytes (удаляем oldest first)
4. Проверяет log.segment.delete.delay.ms (60s по умолчанию)
5. Удаляет файлы сегмента:
- .log file
- .index file
- .timeindex file
- .txnindex file (если есть)
6. Обновляет log metadata
Сегмент rollover
Новый сегмент создаётся когда:
- log.segment.bytes достигнут (1GB по умолчанию)
- log.segment.ms достигнут (7 дней по умолчанию)
- log.roll.ms (alias для log.segment.ms)
Active segment → Closed → Eligible for deletion
Rollover с jitter:
log.roll.ms = 7 дней
log.roll.jitter.ms = 1 час (random)
Сегмент создаётся через 7 дней ± 1 час
Jitter предотвращает одновременный rollover всех partitions
Delete vs Compact — как удаляется
| Аспект | Delete | Compact |
|---|---|---|
| Что удаляет | Старые сегменты целиком | Старые значения тех же ключей |
| Когда | retention.ms или retention.bytes | Когда есть newer value для того же ключа |
| Granularity | Сегмент (1GB) | Отдельные records |
| Speed | Быстро (удалить файл) | Медленно (rewrite сегмент) |
| Result | Данные удалены | Последнее значение для каждого ключа |
Таблица типичных ошибок
| Ошибка | Симптомы | Последствия | Решение |
|---|---|---|---|
| Segment не удаляется | Disk fill | Log cleaner не работает | Проверить log.cleaner.enable |
| Удаление нужных данных | Consumer offset out of range | Data loss | retention.ms > consumer processing time |
| Частый rollover | Много маленьких сегментов | I/O overhead | Увеличить log.segment.bytes |
| Редкий rollover | Один огромный сегмент | Delete = большой impact | Уменьшить log.segment.bytes |
| Delete delay слишком большой | Медленное освобождение disk | Disk fill | Уменьшить log.segment.delete.delay.ms |
| Compact для topics без ключей | Ничего не compactится | Wasted CPU | Проверить что messages имеют keys |
Когда сообщения НЕ удаляются
- Active segment — всегда protected
- retention.ms = -1 — хранить вечно
- retention.bytes = -1 — без лимита
- Log cleaner disabled — log.cleaner.enable=false
- Segment не deletable — timestamp ещё не expired
- Unclean shutdown — segment может быть locked
🔴 Senior Level
Частый вопрос на собеседовании
Что если консьюмер медленнее retention? Ответ: консьюмер получит OffsetOutOfRange и начнёт читать сначала (auto.offset.reset=earliest) или пропустит данные (latest). retention.ms должен быть больше максимального времени простоя консьюмера.
Глубокие внутренности
Log Cleaner — детальная архитектура
LogCleaner (kafka.log.LogCleaner):
├── Cleaner threads (log.cleaner.threads)
│ └── Каждый thread:
│ ├── LogCleanerManager (координирует работу)
│ ├── CleanablePartitions (выбирает что чистить)
│ └── Cleaner (выполняет actual cleaning)
│
├── Dedupe Buffer (log.cleaner.dedupe.buffer.size)
│ └── OffsetMap: key → latest offset
│ └── Hash table для compact deduplication
│
└── I/O throttling
└── log.cleaner.io.buffer.size (512KB)
└── log.cleaner.io.buffer.load.factor (0.9)
Delete process (для cleanup.policy=delete):
// Упрощённый алгоритм
def deleteOldSegments():
for each partition:
segments = partition.logSegments()
deletable = segments.filter(s =>
!s.isActive && // не active
s.maxTimestamp < now - retentionMs && // expired
s.age > logSegmentDeleteDelayMs // delay passed
)
for segment in deletable:
segment.delete() // async delete
asyncDelete(segment)
Compact process (для cleanup.policy=compact):
// Упрощённый алгоритм
def compactPartition():
// 1. Build clean map (key → latest offset)
cleanMap = OffsetMap()
for segment in segments:
for record in segment:
if record.key != null:
cleanMap.put(record.key, record.offset)
// 2. Determine live records (latest value per key)
liveOffsets = cleanMap.values()
// 3. Copy live records to new segment
newSegment = createCleanSegment()
for record in segments:
if record.offset in liveOffsets:
newSegment.append(record)
// 4. Swap old segments with new segment
partition.replaceSegments(segments, newSegment)
// 5. Delete old segment files
for segment in segments:
segment.delete()
Segment deletion — async и file system
Async deletion process:
1. Thread marks segment for deletion
2. Waits log.segment.delete.delay.ms (60s default)
3. Deletes .log, .index, .timeindex, .txnindex files
4. Files removed from directory
Async deletion rationale:
- Consumers могут ещё читать этот сегмент
- Delay дает время finish in-flight reads
- Prevents "file not found" errors
File system behavior (Linux):
rm(segment.log) → unlink() system call
File не удаляется пока есть open file descriptors
Consumers с in-flight reads продолжают читать
После close всех FDs → disk space freed
Index files и их удаление
Каждый сегмент имеет 2 индекса:
Offset Index (.index):
Maps offset → byte position in .log file
Sparse index: каждые ~4KB данных
Размер: log.index.size.max.bytes = 10MB
Time Index (.timeindex):
Maps timestamp → offset
Sparse index: каждые ~4KB данных
Размер: log.index.size.max.bytes = 10MB
При удалении сегмента:
Все 3 файла (.log, .index, .timeindex) удаляются вместе
Нельзя удалить .log без индексов (atomically together)
Trade-offs
| Параметр | Маленькие сегменты | Большие сегменты |
|---|---|---|
| Deletion granularity | Высокая (меньше данных за раз) | Низкая (больше данных за раз) |
| I/O overhead | Выше (больше файлов) | Ниже |
| Log cleaner efficiency | Ниже (больше overhead) | Выше |
| Recovery time | Быстрее (меньше данных per segment) | Медленнее |
| Disk fragmentation | Выше | Ниже |
| Recommended | 100MB–500MB | 1GB–2GB |
Edge Cases
1. Consumer читает сегмент который удаляется:
Timeline:
T=0: Consumer читает offset 500 из Segment-1
T=1: retention.ms expired для Segment-1
T=2: Log cleaner начинает удаление Segment-1
T=2.1: Consumer всё ещё читает (in-flight read)
Что происходит:
log.segment.delete.delay.ms = 60s (default)
Consumer имеет 60 секунд finish read
После 60s: файлы удаляются
Consumer если не дочитал → OffsetOutOfRange error
Решение:
retention.ms > max consumer read time
consumers читают быстрее чем retention expires
2. Unclean broker shutdown во время deletion:
Сценарий:
Log cleaner удаляет Segment-5
.log файл удалён, .index ещё нет
Broker crash
При restart:
Broker видит orphan .index файл без .log
Broker удаляет .index файл (cleanup orphan)
Данные уже удалены, не recoverable
Это safe behavior — segment deletion is idempotent
3. Retention bytes и partition imbalance:
retention.bytes = 10GB на partition
Partition 0: 5GB (низкий throughput)
Partition 1: 10GB (высокий throughput)
Partition 2: 2GB (низкий throughput)
Partition 1 достигает 10GB → начинает удалять
Partition 0 и 2 всё ещё имеют старые данные
Результат:
Partition 1: хранит 2 дня данных
Partition 0: хранит 7 дней данных
Retention по времени НЕ одинаковый across partitions!
Решение:
retention.ms для uniform time-based retention
retention.bytes для disk protection, не для uniform retention
4. Compacted topic — удаление всех значений:
Сценарий:
cleanup.policy = compact
retention.ms = 86400000 (1 день)
Key "user-1": последнее обновление 2 дня назад
Log cleaner:
1. Compact: у "user-1" только одно значение → нечего compact
2. Delete: значение старше 1 дня → DELETE
Результат: ключ "user-1" полностью удалён!
Consumer при restart: key "user-1" = null
Это intended behavior для compact+delete!
Для permanent state: retention.ms=-1
Для changelog с TTL: compact+delete — OK
5. Segment deletion и replication:
Leader partition:
Segment-5 (offsets 1M–1.5M) удалён по retention
Follower partition:
Segment-5 ещё не удалён (clock skew или slower cleaner)
Leader не может replicate deleted data
Follower пытается читать с leader
Leader: "Segment doesn't exist"
Follower: OffsetOutOfRange → falls out of ISR
Решение:
retention.ms одинаковый на всех brokers
Clock synchronization (NTP)
retention.ms > max replication lag time
6. Transaction index cleanup:
Топик с transactional messages:
.txnindex файл содержит transaction markers
При deletion сегмента:
.txnindex удаляется вместе с .log
Transaction coordinator:
Отслеживает committed/aborted transactions
Если transaction markers удалены:
Committed transaction records всё ещё в consumer-visible segments
Aborted transaction records тоже удалены
Consumer isolation.level = read_committed:
Не видит aborted transactions
Retention deletion не влияет на transaction semantics
Производительность (production numbers)
| Operation | Time | I/O | CPU | Impact |
|---|---|---|---|---|
| Segment deletion (1GB) | 10–100ms | Low (unlink) | Minimal | Negligible |
| Log cleaner compact pass | 5–30 min | High (read+write) | Medium | I/O impact |
| Segment rollover (1GB) | 50–200ms | Medium (file create) | Low | Brief I/O spike |
| Index rebuild (after crash) | 1–5 min | High (read all) | Medium | Startup delay |
Production War Story
Ситуация: IoT platform, sensor data, 2M messages/min. retention.ms = 24 часа, log.segment.bytes = 1GB.
Проблема: Каждые 2 часа все 12 partitions одновременно создавали новые сегменты (rollover). I/O spike → consumer fetch latency 5s → lag spike → alerts → engineer wakeup call.
Расследование:
- 2M msg/min × 500 bytes/msg = 1GB/min
- log.segment.bytes = 1GB → rollover каждые 1 минуту
- 12 partitions × rollover/min = 12 file creations/min
- Без jitter → все partitions rollover синхронно
- I/O pattern: burst every minute → latency spike
Root cause: log.roll.jitter.ms не был настроен → все partitions rollover одновременно.
Решение:
# server.properties
log.segment.bytes=536870912 # 512MB (больше сегментов, меньше каждый)
log.roll.ms=3600000 # 1 час
log.roll.jitter.ms=600000 # 10 минут jitter
# Результат:
# Rollover: каждые 1 час ± 10 мин
# 12 partitions с distributed rollover
# I/O spike: размазан по 20 минут
# Consumer latency: stable
Post-mortem lesson: jitter — не optional для high-throughput topics с множеством partitions.
Monitoring (JMX, Prometheus, Burrow)
JMX метрики:
kafka.log:type=LogCleaner,name=max-clean-time-secs
- Max time for a single clean operation
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
- Dedupe buffer utilization
kafka.log:type=LogCleaner,name=total-cleans
- Total number of completed cleans
kafka.log:type=Log,name=NumLogSegments,topic=orders,partition=0
- Number of segments per partition
kafka.log:type=Log,name=Size,topic=orders,partition=0
- Size of log in bytes
Prometheus + Grafana:
- record: kafka_partition_segment_count
expr: kafka_log_num_log_segments
- record: kafka_partition_size_bytes
expr: kafka_log_size
- alert: KafkaSegmentCountHigh
expr: kafka_log_num_log_segments > 50
for: 10m
labels:
severity: warning
annotations:
summary: "Partition has > 50 segments — retention may need tuning"
- alert: KafkaLogCleanerSlow
expr: kafka_log_cleaner_max_clean_time_secs > 1800
for: 30m
labels:
severity: warning
annotations:
summary: "Log cleaner taking > 30 min per clean"
- alert: KafkaPartitionSizeHigh
expr: kafka_log_size > 10737418240 # 10GB
for: 5m
labels:
severity: warning
annotations:
summary: "Partition size exceeded 10GB"
Highload Best Practices
✅ log.segment.bytes = 500MB–1GB (balance granularity vs I/O)
✅ log.roll.jitter.ms = 10–20% of log.roll.ms
✅ log.cleaner.threads = number of disks (1 per disk)
✅ log.cleaner.dedupe.buffer.size = 128MB–256MB
✅ log.segment.delete.delay.ms = 60000 (default OK)
✅ Monitor segment count per partition
✅ Monitor log cleaner throughput
✅ retention.ms >> consumer processing time
✅ Disk space alerting (70%, 85%, 95%)
❌ log.segment.bytes < 100MB (too many files)
❌ log.segment.bytes > 2GB (too large for efficient deletion)
❌ Без jitter (thundering herd rollover)
❌ log.cleaner.enable=false (segments не удаляются)
❌ retention.ms < consumer downtime + catch-up
❌ Без monitoring segment count
Архитектурные решения
- Segment-based deletion — efficient (file delete), not per-message
- Async deletion — delay для in-flight consumers
- Jitter — critical для high-throughput, multi-partition topics
- Log cleaner — background process, needs monitoring
- Index co-deletion — .log + .index + .timeindex всегда вместе
Резюме для Senior
- Удаление по сегментам (не per-message) — efficient file operations
- Каждый сегмент = .log + .index + .timeindex (+ .txnindex) файлы
- Log cleaner handles compact, delete просто удаляет expired segments
- log.segment.delete.delay.ms (60s default) — grace period для in-flight reads
- Jitter критичен для high-throughput multi-partition topics
- retention.bytes = per-partition limit, не total topic
- Compact+delete может удалить все значения ключа (intended!)
- Replication: leader deleted segment unavailable для followers
- Monitor: segment count, cleaner time, partition size
- At highload: jitter + proper segment size = stable I/O
🎯 Шпаргалка для интервью
Обязательно знать:
- Удаление по сегментам (не per-message) — efficient file operations
- Каждый сегмент = .log + .index + .timeindex (+ .txnindex) файлы
- Сегмент deletable когда: не active, timestamp expired, или размер превышен
log.segment.delete.delay.ms=60s— grace period для in-flight reads- Log Cleaner handles compact (rewrite), delete просто удаляет expired segments
- Jitter (
log.roll.jitter.ms) критичен для high-throughput — предотвращает thundering herd - Rollover:
log.segment.bytes(1GB) илиlog.segment.ms(7 дней)
Частые уточняющие вопросы:
- Что если consumer читает удаляемый сегмент? — 60s delay даёт время finish read; после — OffsetOutOfRange.
- Compact+delete может удалить все данные ключа? — Да, если одно значение и retention expired.
- Зачем нужен jitter? — Без jitter все partitions rollover одновременно → I/O spike.
- Leader удалил сегмент, follower нет — что будет? — Follower не может replicate → falls out of ISR.
Красные флаги (НЕ говорить):
- «Сообщения удаляются по одному» — целыми сегментами
- «Без jitter всё работает» — thundering herd rollover при high-throughput
- «Compact удаляет сегменты целиком» — compact переписывает (rewrite), не delete
- «retention.bytes=-1 — норма» — disk fill guaranteed
Связанные темы:
- [[27. Что такое retention policy]]
- [[2. Что такое партиция (partition) и зачем она нужна]]
- [[1. Что такое топик (topic) в Kafka]]
- [[16. Что такое репликация в Kafka]]