Что такое retention policy в Kafka
Представьте DVR (видеорегистратор). Вы настраиваете: "хранить записи 7 дней". Через 7 дней самая старая запись автоматически удаляется. Если диск заполнился раньше — удаляются с...
🟢 Junior Level
Простое определение
Retention policy — правило, определяющее как долго Kafka хранит сообщения в топике. После истечения retention периода сообщения удаляются автоматически.
Аналогия
Представьте DVR (видеорегистратор). Вы настраиваете: “хранить записи 7 дней”. Через 7 дней самая старая запись автоматически удаляется. Если диск заполнился раньше — удаляются старые записи чтобы освободить место.
Основные параметры
# Время хранения (миллисекунды)
retention.ms=604800000 # 7 дней (7 × 24 × 60 × 60 × 1000)
# Размер хранения (байты)
retention.bytes=10737418240 # 10GB на партицию (НЕ на весь топик!)
# Для compact топиков
cleanup.policy=compact # хранить последнее значение по ключу
cleanup.policy=delete # удалять по времени/размеру (default)
cleanup.policy=compact,delete # оба режима одновременно
Пример
// Создание топика с retention policy
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// AdminClient — программный интерфейс для управления топиками.
// NewTopic создаёт конфигурацию нового топика, .configs() добавляет параметры.
NewTopic topic = new NewTopic("orders", 6, (short) 3)
.configs(Map.of(
"retention.ms", "604800000", // 7 дней
"retention.bytes", "5368709120", // 5GB
"cleanup.policy", "delete"
));
admin.createTopics(List.of(topic)).all().get();
Когда использовать
- Любой production topic — всегда настраивать retention
- Event streaming — delete policy (хранить N дней)
- State/Configuration data — compact policy (последнее значение)
- Compliance/Audit — long retention (30–90 дней)
🟡 Middle Level
Политики очистки
1. Delete (по умолчанию)
Сообщения удаляются когда:
- Прошло retention.ms времени, ИЛИ
- Топик превысил retention.bytes
retention.ms=-1 → хранить вечно (dangerous!)
retention.bytes=-1 → без лимита размера (dangerous!)
Как работает:
Log Segments:
[Segment-1] [Segment-2] [Segment-3] [Segment-4] [Active]
1GB 1GB 1GB 1GB 500MB
retention.ms = 7 дней
Segment-1 создан 8 дней назад → ELIGIBLE FOR DELETION
Log cleaner удаляет Segment-1 целиком (не отдельные messages)
2. Compact
Для каждого ключа хранится только ПОСЛЕДНЕЕ значение
До compact:
key=A, value=1 (offset 0)
key=B, value=2 (offset 1)
key=A, value=3 (offset 2)
key=C, value=4 (offset 3)
key=B, value=5 (offset 4)
После compact:
key=A, value=3 (offset 2) ← latest
key=B, value=5 (offset 4) ← latest
key=C, value=4 (offset 3) ← latest
Use cases:
- User profiles (текущее состояние)
- Configuration (latest config)
- Inventory (текущий stock level)
- Cache backing store
3. Compact + Delete
Комбинация обоих:
- Compact: хранит последнее значение для каждого ключа
- Delete: удаляет сообщения старше retention.ms
Используется для:
- Changelog topics (компактные, но старые changelog не нужны)
- State stores с ограниченным lifetime
Настройка retention
# Создать topic с retention
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders --partitions 6 --replication-factor 3 \
--config retention.ms=86400000 \
--config retention.bytes=10737418240
# Изменить retention существующего топика
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders \
--alter --add-config retention.ms=259200000 # 3 дня
# Проверить текущие настройки
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders --describe
Таблица типичных ошибок
| Ошибка | Симптомы | Последствия | Решение |
|---|---|---|---|
| retention.ms=-1 | Данные хранятся вечно | Disk fill → broker crash | Всегда ставить лимит |
| retention.bytes=-1 | Без лимита размера | Disk fill при burst traffic | Установить разумный лимит |
| retention.bytes на partition | Лимит на partition, не topic | 6 partitions × 10GB = 60GB | Учитывать # partitions |
| Compact без ключей | Все сообщения = разные ключи | Compact не работает | Убедиться что keys есть |
| retention.ms < processing time | Данные удалены до обработки | Data loss | retention.ms > max processing delay |
| Разные retention на replicas | Inconsistent cleanup | Replication issues | Одинаковый retention на все brokers |
Сравнение политик
| Политика | Когда удаляет | Что удаляет | Use case |
|---|---|---|---|
| delete | По времени/размеру | Старые сегменты | Events, logs, metrics |
| compact | При наличии newer value | Старые значения того же ключа | State, profiles, config |
| compact,delete | По времени ИЛИ newer value | Старые segments И старые keys | Changelog с expiry |
| нет политики | Никогда | Ничего | ⚠️ Dangerous! |
Когда retention НЕ нужен
- Infinite replay — нужно хранить все сообщения навсегда (редко)
- Audit/compliance — legal requirement хранить всё (но тогда monitor disk!)
- Тестирование — временные topics, удалятся при cleanup
🔴 Senior Level
Глубокие внутренности
Log Cleaner — как работает удаление
Log Cleaner Thread (log.cleaner.threads=1 по умолчанию):
1. Log Cleaner сканирует active log segments
2. Определяет "cleanable" segments:
- segment.lastModified < now - retention.ms
- segment.size > log.segment.bytes (для compact)
3. Создаёт "clean map" (ключ → latest offset) для compact
4. Копирует live records в новый .clean сегмент
5. Атомарно заменяет старый сегмент новым
6. Удаляет старый сегмент (.log, .index, .timeindex)
File structure:
00000000000000000000.log ← segment data
00000000000000000000.index ← offset→position index
00000000000000000000.timeindex ← timestamp→offset index
00000000000000000000.txnindex ← transaction index (если есть)
Log Cleaner configuration:
# server.properties
log.cleaner.enable=true
log.cleaner.threads=2 # параллельные cleaner threads
log.cleaner.io.buffer.size=524288 # 512KB buffer per cleaner
log.cleaner.io.buffer.load.factor=0.9 # hash table load factor
log.cleaner.dedupe.buffer.size=134217728 # 128MB dedupe buffer (compact)
log.cleaner.backoff.ms=15000 # backoff между passes
log.cleaner.min.cleanable.ratio=0.5 # min dead data ratio to trigger clean
log.segment.delete.delay.ms=60000 # delay before deleting segment
Segment Rollover и Retention
log.roll.ms=7 дней (default)
log.roll.jitter.ms=random(0–1 час)
Каждые 7 дней (±jitter) создаётся новый active segment:
[Segment-N] [Active Segment]
1GB (closed) 500MB (writing)
Jitter prevents "thundering herd":
Все partitions rollover одновременно → I/O spike
Jitter distributes rollover over time
Segment deletion logic:
// Упрощённая логика
def isSegmentDeletable(segment):
if segment.isActive:
return false
if cleanupPolicy.contains("delete"):
if retentionMs > 0 && segment.maxTimestamp < now - retentionMs:
return true
if retentionBytes > 0 && topicSize > retentionBytes:
return true // delete oldest first
if cleanupPolicy.contains("compact"):
if segmentHasKeysWithNewerValuesInLaterSegments(segment):
return true // compact этот segment
return false
Retention и Replication
Leader partition:
retention.ms = 7 дней
Segment удаляется на leader после 7 дней
Follower partitions:
Тоже удаляют segments после 7 дней
НЕ зависят от leader retention status
Каждый broker applies retention locally
Это важно:
Если follower отстал на 8 дней → segment удалён на leader
Follower не может replicate deleted segment
→ Follower falls out of ISR
→ Under-replicated partition
Trade-offs
| Параметр | Большой retention | Маленький retention |
|---|---|---|
| Disk usage | Высокий | Низкий |
| Replay capability | Долгая история | Короткая история |
| Consumer flexibility | Медленные консьюмеры OK | Нужно успевать |
| Recovery options | Больше данных для recovery | Меньше данных |
| Broker cost | Выше (disk, I/O) | Ниже |
| Compliance | Легче compliance | Может не хватить |
Edge Cases
1. Retention меньше чем consumer processing time:
Сценарий:
retention.ms = 86400000 (24 часа)
Consumer processing time = 36 часов (очень медленный)
Consumer читает offset 1000 (created 30 hours ago)
Broker уже удалил этот segment!
Consumer: OffsetOutOfRange error
auto.offset.reset → earliest/none/latest
Данные потеряны для этого consumer!
Решение:
retention.ms >> max consumer processing delay
retention.ms >= max consumer downtime + catch-up time
2. Retention и compact: удаление всех данных:
Сценарий:
cleanup.policy = compact
retention.ms = 3600000 (1 час)
Ключ A: последнее значение 2 часа назад
Log cleaner:
- Compact: A имеет только одно значение → нельзя compact
- Delete: значение старше 1 часа → удалить
Результат: ключ A удалён полностью!
При restart consumer: key A не существует
Это может быть unexpected!
Для state stores: retention.ms=-1 или очень большой
Для changelog: retention.ms = reasonable window
3. Partition-level retention ≠ topic-level:
retention.bytes = 10GB на topic
Topic has 10 partitions
→ Each partition gets 1GB (10GB / 10)
Это per-partition лимит, не total topic!
Для topic-level enforcement:
retention.bytes = desired_total / num_partitions
Example: want 10GB total, 10 partitions
retention.bytes = 10GB / 10 = 1GB per partition
4. Retention при broker restart:
Сценарий:
retention.ms = 604800000 (7 дней)
Broker down 10 дней
При restart: все segments старше 7 дней удалены
Если follower был down 10 дней:
- Leader удалил segments 3+ дней назад
- Follower restarts → не может replicate deleted data
- Follower needs full re-replication from leader
- Это может занять часы для больших topics
Решение:
retention.ms > max expected broker downtime
ИЛИ: отключить retention для критичных topics
5. Retention и transactional messages:
Сценарий:
Transaction T1: offsets 1000-1050, committed
Transaction T2: offsets 1051-1100, aborted
retention.ms expires для offset 1000
Log cleaner:
- Aborted transaction data (T2) удаляется
- Committed transaction data (T1) удаляется по retention
Consumer с isolation.level=read_committed:
Видит только committed messages
Aborted transaction data не виден никогда
Retention не влияет на visibility aborted transactions
6. Negative retention.bytes и overflow:
retention.bytes должен быть > 0
retention.bytes = -1 → unlimited
retention.bytes = 0 → удалять немедленно (practically useless)
Integer overflow:
retention.bytes = 10GB = 10737418240
В 32-bit int: overflow → negative number
Kafka использует 64-bit long → safe до 9 EB
Производительность (production numbers)
| Config | Disk usage (100K msg/s, 1KB/msg, RF=3) | Cleaner CPU | I/O impact |
|---|---|---|---|
| retention.ms = 1 час | 1GB | Low | Low |
| retention.ms = 24 часа | 24GB | Low | Low |
| retention.ms = 7 дней | 168GB | Medium | Medium (cleaner) |
| retention.ms = 30 дней | 720GB | High | High (cleaner + disk) |
| compact, 1M keys | 100MB | High (dedupe) | High |
| retention.ms = -1 | ∞ (disk fill!) | N/A | N/A |
Production War Story
Ситуация: Социальная сеть, activity feed, 1M events/min. retention.ms = 7 дней, retention.bytes = -1 (unlimited).
Проблема: После product launch, event rate вырос до 5M/min. retention.bytes = -1 → disk fill. За 5 дней brokers заполнили disk → broker crashes → cascade failure → cluster down.
Timeline:
Day 0: Launch, event rate 1M → 5M/min
Day 1: Disk usage: 20% → 40% (no alert set)
Day 2: Disk usage: 60% (engineer noticed, didn't act)
Day 3: Disk usage: 80% (still no action)
Day 4: Disk usage: 95% (alerts finally fired)
Day 5: Broker 1 disk full → crash
Replication to brokers 2,3 → overload
Broker 2 crash
Broker 3: under-replicated, can't serve
→ Cluster DOWN
→ 15M events lost
→ 3 hours downtime
Root cause:
- retention.bytes = -1 (no limit!)
- No disk usage alerting
- No capacity planning for launch
- No emergency retention change procedure
Post-mortem actions:
# Emergency retention change (на работающем кластере)
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name activity-feed \
--alter --add-config retention.ms=86400000 # 1 день, не 7
# Monitor disk usage after change
watch -n 5 'du -sh /var/kafka-logs/'
# Set up alerts
# disk_usage > 70% → warning
# disk_usage > 85% → critical
# disk_usage > 95% → page on-call
Long-term fixes:
- retention.bytes set per topic (не -1!)
- Disk usage monitoring + alerting
- Capacity planning для anticipated traffic spikes
- Emergency runbook: reduce retention in 5 minutes
Monitoring (JMX, Prometheus, Burrow)
JMX метрики:
kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0
- Log end offset
kafka.log:type=LogCleaner,name=max-clean-time-secs
- Time spent in log cleaning
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
- Cleaner buffer usage
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
- Network I/O (affected by retention)
Prometheus + Grafana:
- record: kafka_topic_disk_usage_bytes
expr: sum(kafka_log_size_bytes) by (topic)
- record: kafka_topic_disk_usage_percent
expr: kafka_topic_disk_usage_bytes / kafka_disk_capacity_bytes
- alert: KafkaDiskUsageHigh
expr: kafka_topic_disk_usage_percent > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka disk usage > 80%"
- alert: KafkaRetentionTooLarge
expr: |
kafka_topic_config{config="retention.bytes"} < 0
or
kafka_topic_config{config="retention.ms"} > 2592000000 # > 30 дней
for: 10m
labels:
severity: warning
annotations:
summary: "Topic retention is very large or unlimited"
- alert: KafkaLogCleanerBacklog
expr: kafka_log_cleaner_max_clean_time_secs > 3600
for: 30m
labels:
severity: warning
annotations:
summary: "Log cleaner taking > 1 hour — cleaning backlog"
Disk usage dashboard:
Per-broker disk usage:
Broker 1: 450GB / 500GB (90%) ⚠️
Broker 2: 420GB / 500GB (84%) ⚠️
Broker 3: 430GB / 500GB (86%) ⚠️
Per-topic disk usage:
orders: 120GB (retention: 7 days, 10GB limit)
activity: 250GB (retention: -1, unlimited!) ⚠️
notifications: 80GB (retention: 3 days, 5GB limit)
Action items:
- Set retention.bytes on activity topic
- Reduce retention.ms on orders topic
- Monitor disk usage trend
Highload Best Practices
✅ retention.ms под бизнес-требования (не бесконечный)
✅ retention.bytes для disk protection (не -1!)
✅ retention.bytes = desired_total / num_partitions
✅ Disk usage alerting (70%, 85%, 95%)
✅ Compact для state topics (user profiles, config)
✅ Delete для event topics (logs, metrics, events)
✅ Monitor log cleaner throughput
✅ Test retention change procedure (emergency runbook)
✅ retention.ms > max consumer downtime + catch-up time
✅ Regular capacity planning review
❌ retention.ms=-1 (infinite — disk fill guaranteed)
❌ retention.bytes=-1 (unlimited — disk fill guaranteed)
❌ retention.bytes без учёта num_partitions
❌ retention.ms < consumer processing time
❌ Без disk usage monitoring
❌ Без emergency retention change procedure
❌ Compact для topics без keys (wasted CPU)
Архитектурные решения
- Delete vs Compact — events vs state, определяет policy
- Retention = business decision — не technical, compliance + cost trade-off
- Per-partition limits — retention.bytes делится на partitions
- Log cleaner — background process, monitor its health
- Emergency reduction — must be able to reduce retention in minutes
Резюме для Senior
- Retention policy: delete (by time/size) или compact (latest value per key)
- Log cleaner handles actual deletion — monitor its health and throughput
- retention.bytes = per-partition limit, not total topic
- retention.ms=-1 или retention.bytes=-1 = guaranteed disk fill
- Compact для state, delete для events, compact+delete для changelog
- retention.ms > max consumer downtime + catch-up time
- Disk usage monitoring + alerting обязательна
- Emergency retention change procedure must be tested
- At highload: capacity planning + burst traffic consideration
- Retention change on running cluster — possible, no restart needed
🎯 Шпаргалка для интервью
Обязательно знать:
- Retention policy: delete (по времени/размеру) или compact (последнее значение по ключу)
retention.ms— время хранения;retention.bytes— лимит на партицию (не на топик!)retention.ms=-1илиretention.bytes=-1— хранить вечно → disk fill guaranteed- Compact + Delete одновременно возможны:
cleanup.policy=compact,delete - Log Cleaner — background процесс удаления; monitor его health и throughput
- retention.ms > max consumer downtime + catch-up time — иначе data loss
- Retention change на работающем кластере — возможно без рестарта
Частые уточняющие вопросы:
- retention.bytes на топик или партицию? — На партицию! 10 partitions × 10GB = 100GB total.
- Что будет при retention.ms < processing time? — Consumer получит OffsetOutOfRange, data loss.
- Compact удаляет все значения ключа? — Если только одно значение и retention.ms expired — да.
- Как работает Log Cleaner? — Сканирует сегменты, удаляет expired, compact переписывает с latest values.
Красные флаги (НЕ говорить):
- «retention.ms=-1 — безопасный выбор» — disk fill guaranteed
- «retention.bytes — лимит на весь топик» — на каждую партицию
- «Compact для topics без ключей работает» — все сообщения = разные ключи, compact бесполезен
- «Retention можно игнорировать в production» — disk fill → broker crash
Связанные темы:
- [[28. Как удаляются старые сообщения из топика]]
- [[1. Что такое топик (topic) в Kafka]]
- [[2. Что такое партиция (partition) и зачем она нужна]]
- [[26. Как мониторить lag консьюмера]]