Питання 27 · Розділ 15

Що таке retention policy в Kafka

Уявіть DVR (відеореєстратор). Ви налаштовуєте: "зберігати записи 7 днів". Через 7 днів найстаріший запис автоматично видаляється. Якщо диск заповнився раніше — видаляються старі...

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Просте визначення

Retention policy — правило, що визначає як довго Kafka зберігає повідомлення в топику. Після закінчення retention періоду повідомлення видаляються автоматично.

Аналогія

Уявіть DVR (відеореєстратор). Ви налаштовуєте: “зберігати записи 7 днів”. Через 7 днів найстаріший запис автоматично видаляється. Якщо диск заповнився раніше — видаляються старі записи щоб звільнити місце.

Основні параметри

# Час зберігання (мілісекунди)
retention.ms=604800000        # 7 днів (7 × 24 × 60 × 60 × 1000)

# Розмір зберігання (байти)
retention.bytes=10737418240   # 10GB на партицію (НЕ на весь топик!)

# Для compact топиків
cleanup.policy=compact        # зберігати останнє значення по ключу
cleanup.policy=delete         # видаляти по часу/розміру (default)
cleanup.policy=compact,delete # обидва режими одночасно

Приклад

// Створення топика з retention policy
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);

NewTopic topic = new NewTopic("orders", 6, (short) 3)
    .configs(Map.of(
        "retention.ms", "604800000",      // 7 днів
        "retention.bytes", "5368709120",   // 5GB
        "cleanup.policy", "delete"
    ));

admin.createTopics(List.of(topic)).all().get();

Коли використовувати

  • Будь-який production topic — завжди налаштовувати retention
  • Event streaming — delete policy (зберігати N днів)
  • State/Configuration data — compact policy (останнє значення)
  • Compliance/Audit — long retention (30–90 днів)

🟡 Middle Level

Політики очищення

1. Delete (за замовчуванням)

Повідомлення видаляються коли:
  - Минуло retention.ms часу, АБО
  - Топик перевищив retention.bytes

retention.ms=-1  → зберігати вічно (dangerous!)
retention.bytes=-1 → без ліміту розміру (dangerous!)

Як працює:

Log Segments:
  [Segment-1] [Segment-2] [Segment-3] [Segment-4] [Active]
  1GB         1GB         1GB         1GB         500MB

retention.ms = 7 днів
Segment-1 створено 8 днів тому → ELIGIBLE FOR DELETION
Log cleaner видаляє Segment-1 цілком (не окремі messages)

2. Compact

Для кожного ключа зберігається тільки ОСТАННЄ значення

До compact:
  key=A, value=1 (offset 0)
  key=B, value=2 (offset 1)
  key=A, value=3 (offset 2)

Після compact:
  key=A, value=3 (offset 2)  ← latest
  key=B, value=2 (offset 1)  ← latest

Use cases: User profiles, Configuration, Inventory, Cache backing store

3. Compact + Delete

Комбінація обох:
  - Compact: зберігає останнє значення для кожного ключа
  - Delete: видаляє повідомлення старіші за retention.ms

Налаштування retention

# Створити topic з retention
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 6 --replication-factor 3 \
  --config retention.ms=86400000

# Змінити retention існуючого топика
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=259200000  # 3 дні

Порівняння політик

Політика Коли видаляє Що видаляє Use case
delete По часу/розміру Старі сегменти Events, logs, metrics
compact При наявності newer value Старі значення того ж ключа State, profiles
compact,delete По часу АБО newer value Старі segments І старі keys Changelog з expiry
немає політики Ніколи Нічого ⚠️ Dangerous!

🔴 Senior Level

Log Cleaner — як працює видалення

Log Cleaner Thread (log.cleaner.threads=1 за замовчуванням):

1. Log Cleaner сканує active log segments
2. Визначає "cleanable" segments
3. Створює "clean map" (ключ → latest offset) для compact
4. Копіює live records в новий .clean сегмент
5. Атомарно замінює старий сегмент новим
6. Видаляє старий сегмент (.log, .index, .timeindex)

Log Cleaner configuration:

log.cleaner.enable=true
log.cleaner.threads=2
log.cleaner.dedupe.buffer.size=134217728  # 128MB dedupe buffer (compact)
log.segment.delete.delay.ms=60000        # delay before deleting segment

Segment Rollover і Retention

log.roll.ms=7 днів (default)
log.roll.jitter.ms=random(0–1 година)

Кожні 7 днів (±jitter) створюється новий active segment
Jitter prevents "thundering herd" — всі partitions rollover одночасно → I/O spike

Retention і Replication

Leader partition: retention.ms = 7 днів → Segment видаляється на leader
Follower partitions: тоже видаляють segments після 7 днів локально

Якщо follower відстав на 8 днів → segment видалено на leader
→ Follower не може replicate deleted segment
→ Follower falls out of ISR → Under-replicated partition

Edge Cases

1. Retention менше ніж consumer processing time:

retention.ms = 24h, consumer processing = 36h
→ Consumer: OffsetOutOfRange error, дані втрачені!
Рішення: retention.ms >> max consumer processing delay

2. Partition-level retention ≠ topic-level:

retention.bytes = 10GB — це per-partition ліміт!
10 partitions × 10GB = 100GB total

Для topic-level: retention.bytes = desired_total / num_partitions

Highload Best Practices

✅ retention.ms під бізнес-вимоги (не нескінченний)
✅ retention.bytes для disk protection (не -1!)
✅ retention.bytes = desired_total / num_partitions
✅ Disk usage alerting (70%, 85%, 95%)
✅ Compact для state topics, Delete для event topics
✅ retention.ms > max consumer downtime + catch-up time
✅ Log cleaner monitoring

❌ retention.ms=-1 (disk fill guaranteed)
❌ retention.bytes=-1 (disk fill guaranteed)
❌ retention.bytes без урахування num_partitions
❌ Compact для topics без keys (wasted CPU)

Архітектурні рішення

  1. Delete vs Compact — events vs state, визначає policy
  2. Retention = business decision — compliance + cost trade-off
  3. Per-partition limits — retention.bytes ділиться на partitions
  4. Log cleaner — background process, monitor its health
  5. Emergency reduction — must be able to reduce retention in minutes

Резюме для Senior

  • Retention policy: delete (by time/size) або compact (latest value per key)
  • Log cleaner handles actual deletion — monitor its health and throughput
  • retention.bytes = per-partition limit, not total topic
  • retention.ms=-1 або retention.bytes=-1 = guaranteed disk fill
  • Compact для state, delete для events, compact+delete для changelog
  • retention.ms > max consumer downtime + catch-up time
  • Disk usage monitoring + alerting обов’язкова
  • Emergency retention change procedure must be tested
  • Retention change on running cluster — possible, no restart needed

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Retention policy: delete (по часу/розміру) або compact (останнє значення по ключу)
  • retention.ms — час зберігання; retention.bytes — ліміт на партицію (не на топик!)
  • retention.ms=-1 або retention.bytes=-1 — зберігати вічно → disk fill guaranteed
  • Compact + Delete одночасно можливі: cleanup.policy=compact,delete
  • Log Cleaner — background процес видалення; monitor його health і throughput
  • retention.ms > max consumer downtime + catch-up time — інакше data loss
  • Retention change на працюючому кластері — можливе без рестарту

Часті уточнюючі запитання:

  • retention.bytes на топик чи партицію? — На партицію! 10 partitions × 10GB = 100GB total.
  • Що буде при retention.ms < processing time? — Consumer отримає OffsetOutOfRange, data loss.
  • Compact видаляє всі значення ключа? — Якщо тільки одне значення і retention.ms expired — так.
  • Як працює Log Cleaner? — Сканує сегменти, видаляє expired, compact переписує з latest values.

Червоні прапорці (НЕ говорити):

  • «retention.ms=-1 — безпечний вибір» — disk fill guaranteed
  • «retention.bytes — ліміт на весь топик» — на кожну партицію
  • «Compact для topics без ключів працює» — всі повідомлення = різні ключі, compact марний
  • «Retention можна ігнорувати в production» — disk fill → broker crash

Пов’язані теми:

  • [[28. Як видаляються старі повідомлення з топика]]
  • [[1. Що таке топiк (topic) в Kafka]]
  • [[2. Що таке партиція (partition) і навіщо вона потрібна]]
  • [[26. Як моніторити lag консьюмера]]