Вопрос 27 · Раздел 15

Что такое retention policy в Kafka

Представьте DVR (видеорегистратор). Вы настраиваете: "хранить записи 7 дней". Через 7 дней самая старая запись автоматически удаляется. Если диск заполнился раньше — удаляются с...

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

Retention policy — правило, определяющее как долго Kafka хранит сообщения в топике. После истечения retention периода сообщения удаляются автоматически.

Аналогия

Представьте DVR (видеорегистратор). Вы настраиваете: “хранить записи 7 дней”. Через 7 дней самая старая запись автоматически удаляется. Если диск заполнился раньше — удаляются старые записи чтобы освободить место.

Основные параметры

# Время хранения (миллисекунды)
retention.ms=604800000        # 7 дней (7 × 24 × 60 × 60 × 1000)

# Размер хранения (байты)
retention.bytes=10737418240   # 10GB на партицию (НЕ на весь топик!)

# Для compact топиков
cleanup.policy=compact        # хранить последнее значение по ключу
cleanup.policy=delete         # удалять по времени/размеру (default)
cleanup.policy=compact,delete # оба режима одновременно

Пример

// Создание топика с retention policy
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// AdminClient — программный интерфейс для управления топиками.
// NewTopic создаёт конфигурацию нового топика, .configs() добавляет параметры.

NewTopic topic = new NewTopic("orders", 6, (short) 3)
    .configs(Map.of(
        "retention.ms", "604800000",      // 7 дней
        "retention.bytes", "5368709120",   // 5GB
        "cleanup.policy", "delete"
    ));

admin.createTopics(List.of(topic)).all().get();

Когда использовать

  • Любой production topic — всегда настраивать retention
  • Event streaming — delete policy (хранить N дней)
  • State/Configuration data — compact policy (последнее значение)
  • Compliance/Audit — long retention (30–90 дней)

🟡 Middle Level

Политики очистки

1. Delete (по умолчанию)

Сообщения удаляются когда:
  - Прошло retention.ms времени, ИЛИ
  - Топик превысил retention.bytes

retention.ms=-1  → хранить вечно (dangerous!)
retention.bytes=-1 → без лимита размера (dangerous!)

Как работает:

Log Segments:
  [Segment-1] [Segment-2] [Segment-3] [Segment-4] [Active]
  1GB         1GB         1GB         1GB         500MB

retention.ms = 7 дней
Segment-1 создан 8 дней назад → ELIGIBLE FOR DELETION
Log cleaner удаляет Segment-1 целиком (не отдельные messages)

2. Compact

Для каждого ключа хранится только ПОСЛЕДНЕЕ значение

До compact:
  key=A, value=1 (offset 0)
  key=B, value=2 (offset 1)
  key=A, value=3 (offset 2)
  key=C, value=4 (offset 3)
  key=B, value=5 (offset 4)

После compact:
  key=A, value=3 (offset 2)  ← latest
  key=B, value=5 (offset 4)  ← latest
  key=C, value=4 (offset 3)  ← latest

Use cases:

  • User profiles (текущее состояние)
  • Configuration (latest config)
  • Inventory (текущий stock level)
  • Cache backing store

3. Compact + Delete

Комбинация обоих:
  - Compact: хранит последнее значение для каждого ключа
  - Delete: удаляет сообщения старше retention.ms

Используется для:
  - Changelog topics (компактные, но старые changelog не нужны)
  - State stores с ограниченным lifetime

Настройка retention

# Создать topic с retention
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 6 --replication-factor 3 \
  --config retention.ms=86400000 \
  --config retention.bytes=10737418240

# Изменить retention существующего топика
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=259200000  # 3 дня

# Проверить текущие настройки
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders --describe

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
retention.ms=-1 Данные хранятся вечно Disk fill → broker crash Всегда ставить лимит
retention.bytes=-1 Без лимита размера Disk fill при burst traffic Установить разумный лимит
retention.bytes на partition Лимит на partition, не topic 6 partitions × 10GB = 60GB Учитывать # partitions
Compact без ключей Все сообщения = разные ключи Compact не работает Убедиться что keys есть
retention.ms < processing time Данные удалены до обработки Data loss retention.ms > max processing delay
Разные retention на replicas Inconsistent cleanup Replication issues Одинаковый retention на все brokers

Сравнение политик

Политика Когда удаляет Что удаляет Use case
delete По времени/размеру Старые сегменты Events, logs, metrics
compact При наличии newer value Старые значения того же ключа State, profiles, config
compact,delete По времени ИЛИ newer value Старые segments И старые keys Changelog с expiry
нет политики Никогда Ничего ⚠️ Dangerous!

Когда retention НЕ нужен

  • Infinite replay — нужно хранить все сообщения навсегда (редко)
  • Audit/compliance — legal requirement хранить всё (но тогда monitor disk!)
  • Тестирование — временные topics, удалятся при cleanup

🔴 Senior Level

Глубокие внутренности

Log Cleaner — как работает удаление

Log Cleaner Thread (log.cleaner.threads=1 по умолчанию):

1. Log Cleaner сканирует active log segments
2. Определяет "cleanable" segments:
   - segment.lastModified < now - retention.ms
   - segment.size > log.segment.bytes (для compact)
3. Создаёт "clean map" (ключ → latest offset) для compact
4. Копирует live records в новый .clean сегмент
5. Атомарно заменяет старый сегмент новым
6. Удаляет старый сегмент (.log, .index, .timeindex)

File structure:
  00000000000000000000.log      ← segment data
  00000000000000000000.index    ← offset→position index
  00000000000000000000.timeindex ← timestamp→offset index
  00000000000000000000.txnindex ← transaction index (если есть)

Log Cleaner configuration:

# server.properties
log.cleaner.enable=true
log.cleaner.threads=2                     # параллельные cleaner threads
log.cleaner.io.buffer.size=524288         # 512KB buffer per cleaner
log.cleaner.io.buffer.load.factor=0.9     # hash table load factor
log.cleaner.dedupe.buffer.size=134217728  # 128MB dedupe buffer (compact)
log.cleaner.backoff.ms=15000             # backoff между passes
log.cleaner.min.cleanable.ratio=0.5      # min dead data ratio to trigger clean
log.segment.delete.delay.ms=60000        # delay before deleting segment

Segment Rollover и Retention

log.roll.ms=7 дней (default)
log.roll.jitter.ms=random(0–1 час)

Каждые 7 дней (±jitter) создаётся новый active segment:
  [Segment-N]   [Active Segment]
  1GB (closed)  500MB (writing)

Jitter prevents "thundering herd":
  Все partitions rollover одновременно → I/O spike
  Jitter distributes rollover over time

Segment deletion logic:

// Упрощённая логика
def isSegmentDeletable(segment):
    if segment.isActive:
        return false
    
    if cleanupPolicy.contains("delete"):
        if retentionMs > 0 && segment.maxTimestamp < now - retentionMs:
            return true
        if retentionBytes > 0 && topicSize > retentionBytes:
            return true  // delete oldest first
    
    if cleanupPolicy.contains("compact"):
        if segmentHasKeysWithNewerValuesInLaterSegments(segment):
            return true  // compact этот segment
    
    return false

Retention и Replication

Leader partition:
  retention.ms = 7 дней
  Segment удаляется на leader после 7 дней

Follower partitions:
  Тоже удаляют segments после 7 дней
  НЕ зависят от leader retention status
  Каждый broker applies retention locally

Это важно:
  Если follower отстал на 8 дней → segment удалён на leader
  Follower не может replicate deleted segment
  → Follower falls out of ISR
  → Under-replicated partition

Trade-offs

Параметр Большой retention Маленький retention
Disk usage Высокий Низкий
Replay capability Долгая история Короткая история
Consumer flexibility Медленные консьюмеры OK Нужно успевать
Recovery options Больше данных для recovery Меньше данных
Broker cost Выше (disk, I/O) Ниже
Compliance Легче compliance Может не хватить

Edge Cases

1. Retention меньше чем consumer processing time:

Сценарий:
  retention.ms = 86400000 (24 часа)
  Consumer processing time = 36 часов (очень медленный)
  Consumer читает offset 1000 (created 30 hours ago)
  Broker уже удалил этот segment!
  
  Consumer: OffsetOutOfRange error
  auto.offset.reset → earliest/none/latest
  Данные потеряны для этого consumer!

Решение:
  retention.ms >> max consumer processing delay
  retention.ms >= max consumer downtime + catch-up time

2. Retention и compact: удаление всех данных:

Сценарий:
  cleanup.policy = compact
  retention.ms = 3600000 (1 час)
  Ключ A: последнее значение 2 часа назад
  
  Log cleaner:
    - Compact: A имеет только одно значение → нельзя compact
    - Delete: значение старше 1 часа → удалить
  
  Результат: ключ A удалён полностью!
  При restart consumer: key A не существует

Это может быть unexpected!
  Для state stores: retention.ms=-1 или очень большой
  Для changelog: retention.ms = reasonable window

3. Partition-level retention ≠ topic-level:

retention.bytes = 10GB на topic
Topic has 10 partitions
→ Each partition gets 1GB (10GB / 10)

Это per-partition лимит, не total topic!

Для topic-level enforcement:
  retention.bytes = desired_total / num_partitions
  
  Example: want 10GB total, 10 partitions
  retention.bytes = 10GB / 10 = 1GB per partition

4. Retention при broker restart:

Сценарий:
  retention.ms = 604800000 (7 дней)
  Broker down 10 дней
  При restart: все segments старше 7 дней удалены
  
  Если follower был down 10 дней:
  - Leader удалил segments 3+ дней назад
  - Follower restarts → не может replicate deleted data
  - Follower needs full re-replication from leader
  - Это может занять часы для больших topics

Решение:
  retention.ms > max expected broker downtime
  ИЛИ: отключить retention для критичных topics

5. Retention и transactional messages:

Сценарий:
  Transaction T1: offsets 1000-1050, committed
  Transaction T2: offsets 1051-1100, aborted
  retention.ms expires для offset 1000

Log cleaner:
  - Aborted transaction data (T2) удаляется
  - Committed transaction data (T1) удаляется по retention
  
  Consumer с isolation.level=read_committed:
  Видит только committed messages
  Aborted transaction data не виден никогда
  Retention не влияет на visibility aborted transactions

6. Negative retention.bytes и overflow:

retention.bytes должен быть > 0
retention.bytes = -1 → unlimited
retention.bytes = 0 → удалять немедленно (practically useless)

Integer overflow:
  retention.bytes = 10GB = 10737418240
  В 32-bit int: overflow → negative number
  Kafka использует 64-bit long → safe до 9 EB

Производительность (production numbers)

Config Disk usage (100K msg/s, 1KB/msg, RF=3) Cleaner CPU I/O impact
retention.ms = 1 час 1GB Low Low
retention.ms = 24 часа 24GB Low Low
retention.ms = 7 дней 168GB Medium Medium (cleaner)
retention.ms = 30 дней 720GB High High (cleaner + disk)
compact, 1M keys 100MB High (dedupe) High
retention.ms = -1 ∞ (disk fill!) N/A N/A

Production War Story

Ситуация: Социальная сеть, activity feed, 1M events/min. retention.ms = 7 дней, retention.bytes = -1 (unlimited).

Проблема: После product launch, event rate вырос до 5M/min. retention.bytes = -1 → disk fill. За 5 дней brokers заполнили disk → broker crashes → cascade failure → cluster down.

Timeline:

Day 0:  Launch, event rate 1M → 5M/min
Day 1:  Disk usage: 20% → 40% (no alert set)
Day 2:  Disk usage: 60% (engineer noticed, didn't act)
Day 3:  Disk usage: 80% (still no action)
Day 4:  Disk usage: 95% (alerts finally fired)
Day 5:  Broker 1 disk full → crash
         Replication to brokers 2,3 → overload
         Broker 2 crash
         Broker 3: under-replicated, can't serve
         → Cluster DOWN
         → 15M events lost
         → 3 hours downtime

Root cause:

  1. retention.bytes = -1 (no limit!)
  2. No disk usage alerting
  3. No capacity planning for launch
  4. No emergency retention change procedure

Post-mortem actions:

# Emergency retention change (на работающем кластере)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name activity-feed \
  --alter --add-config retention.ms=86400000  # 1 день, не 7

# Monitor disk usage after change
watch -n 5 'du -sh /var/kafka-logs/'

# Set up alerts
# disk_usage > 70% → warning
# disk_usage > 85% → critical
# disk_usage > 95% → page on-call

Long-term fixes:

  • retention.bytes set per topic (не -1!)
  • Disk usage monitoring + alerting
  • Capacity planning для anticipated traffic spikes
  • Emergency runbook: reduce retention in 5 minutes

Monitoring (JMX, Prometheus, Burrow)

JMX метрики:

kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0
  - Log end offset

kafka.log:type=LogCleaner,name=max-clean-time-secs
  - Time spent in log cleaning

kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
  - Cleaner buffer usage

kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
  - Network I/O (affected by retention)

Prometheus + Grafana:

- record: kafka_topic_disk_usage_bytes
  expr: sum(kafka_log_size_bytes) by (topic)

- record: kafka_topic_disk_usage_percent
  expr: kafka_topic_disk_usage_bytes / kafka_disk_capacity_bytes

- alert: KafkaDiskUsageHigh
  expr: kafka_topic_disk_usage_percent > 0.8
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "Kafka disk usage > 80%"

- alert: KafkaRetentionTooLarge
  expr: |
    kafka_topic_config{config="retention.bytes"} < 0
    or
    kafka_topic_config{config="retention.ms"} > 2592000000  # > 30 дней
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Topic retention is very large or unlimited"

- alert: KafkaLogCleanerBacklog
  expr: kafka_log_cleaner_max_clean_time_secs > 3600
  for: 30m
  labels:
    severity: warning
  annotations:
    summary: "Log cleaner taking > 1 hour  cleaning backlog"

Disk usage dashboard:

Per-broker disk usage:
  Broker 1: 450GB / 500GB (90%) ⚠️
  Broker 2: 420GB / 500GB (84%) ⚠️
  Broker 3: 430GB / 500GB (86%) ⚠️

Per-topic disk usage:
  orders:       120GB (retention: 7 days, 10GB limit)
  activity:     250GB (retention: -1, unlimited!) ⚠️
  notifications: 80GB (retention: 3 days, 5GB limit)

Action items:
  - Set retention.bytes on activity topic
  - Reduce retention.ms on orders topic
  - Monitor disk usage trend

Highload Best Practices

✅ retention.ms под бизнес-требования (не бесконечный)
✅ retention.bytes для disk protection (не -1!)
✅ retention.bytes = desired_total / num_partitions
✅ Disk usage alerting (70%, 85%, 95%)
✅ Compact для state topics (user profiles, config)
✅ Delete для event topics (logs, metrics, events)
✅ Monitor log cleaner throughput
✅ Test retention change procedure (emergency runbook)
✅ retention.ms > max consumer downtime + catch-up time
✅ Regular capacity planning review

❌ retention.ms=-1 (infinite — disk fill guaranteed)
❌ retention.bytes=-1 (unlimited — disk fill guaranteed)
❌ retention.bytes без учёта num_partitions
❌ retention.ms < consumer processing time
❌ Без disk usage monitoring
❌ Без emergency retention change procedure
❌ Compact для topics без keys (wasted CPU)

Архитектурные решения

  1. Delete vs Compact — events vs state, определяет policy
  2. Retention = business decision — не technical, compliance + cost trade-off
  3. Per-partition limits — retention.bytes делится на partitions
  4. Log cleaner — background process, monitor its health
  5. Emergency reduction — must be able to reduce retention in minutes

Резюме для Senior

  • Retention policy: delete (by time/size) или compact (latest value per key)
  • Log cleaner handles actual deletion — monitor its health and throughput
  • retention.bytes = per-partition limit, not total topic
  • retention.ms=-1 или retention.bytes=-1 = guaranteed disk fill
  • Compact для state, delete для events, compact+delete для changelog
  • retention.ms > max consumer downtime + catch-up time
  • Disk usage monitoring + alerting обязательна
  • Emergency retention change procedure must be tested
  • At highload: capacity planning + burst traffic consideration
  • Retention change on running cluster — possible, no restart needed

🎯 Шпаргалка для интервью

Обязательно знать:

  • Retention policy: delete (по времени/размеру) или compact (последнее значение по ключу)
  • retention.ms — время хранения; retention.bytes — лимит на партицию (не на топик!)
  • retention.ms=-1 или retention.bytes=-1 — хранить вечно → disk fill guaranteed
  • Compact + Delete одновременно возможны: cleanup.policy=compact,delete
  • Log Cleaner — background процесс удаления; monitor его health и throughput
  • retention.ms > max consumer downtime + catch-up time — иначе data loss
  • Retention change на работающем кластере — возможно без рестарта

Частые уточняющие вопросы:

  • retention.bytes на топик или партицию? — На партицию! 10 partitions × 10GB = 100GB total.
  • Что будет при retention.ms < processing time? — Consumer получит OffsetOutOfRange, data loss.
  • Compact удаляет все значения ключа? — Если только одно значение и retention.ms expired — да.
  • Как работает Log Cleaner? — Сканирует сегменты, удаляет expired, compact переписывает с latest values.

Красные флаги (НЕ говорить):

  • «retention.ms=-1 — безопасный выбор» — disk fill guaranteed
  • «retention.bytes — лимит на весь топик» — на каждую партицию
  • «Compact для topics без ключей работает» — все сообщения = разные ключи, compact бесполезен
  • «Retention можно игнорировать в production» — disk fill → broker crash

Связанные темы:

  • [[28. Как удаляются старые сообщения из топика]]
  • [[1. Что такое топик (topic) в Kafka]]
  • [[2. Что такое партиция (partition) и зачем она нужна]]
  • [[26. Как мониторить lag консьюмера]]