Question 27 · Section 15

What is retention policy in Kafka

Imagine a DVR (video recorder). You configure it: "keep recordings for 7 days." After 7 days, the oldest recording is automatically deleted. If the disk fills up sooner — old re...

Language versions: English Russian Ukrainian

Junior Level

Simple Definition

Retention policy — a rule defining how long Kafka stores messages in a topic. After the retention period expires, messages are automatically deleted.

Analogy

Imagine a DVR (video recorder). You configure it: “keep recordings for 7 days.” After 7 days, the oldest recording is automatically deleted. If the disk fills up sooner — old recordings are deleted to free up space.

Key Parameters

# Storage time (milliseconds)
retention.ms=604800000        # 7 days (7 × 24 × 60 × 60 × 1000)

# Storage size (bytes)
retention.bytes=10737418240   # 10GB per partition (NOT per entire topic!)

# For compacted topics
cleanup.policy=compact        # keep the latest value per key
cleanup.policy=delete         # delete by time/size (default)
cleanup.policy=compact,delete # both modes simultaneously

Example

// Creating a topic with retention policy
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// AdminClient — programmatic interface for topic management.
// NewTopic creates the configuration for a new topic, .configs() adds parameters.

NewTopic topic = new NewTopic("orders", 6, (short) 3)
    .configs(Map.of(
        "retention.ms", "604800000",      // 7 days
        "retention.bytes", "5368709120",   // 5GB
        "cleanup.policy", "delete"
    ));

admin.createTopics(List.of(topic)).all().get();

When to Use

  • Any production topic — always configure retention
  • Event streaming — delete policy (store for N days)
  • State/Configuration data — compact policy (latest value)
  • Compliance/Audit — long retention (30–90 days)

Middle Level

Cleanup Policies

1. Delete (default)

Messages are deleted when:
  - retention.ms time has passed, OR
  - Topic exceeded retention.bytes

retention.ms=-1  → store forever (dangerous!)
retention.bytes=-1 → no size limit (dangerous!)

How it works:

Log Segments:
  [Segment-1] [Segment-2] [Segment-3] [Segment-4] [Active]
  1GB         1GB         1GB         1GB         500MB

retention.ms = 7 days
Segment-1 was created 8 days ago → ELIGIBLE FOR DELETION
Log cleaner deletes Segment-1 as a whole (not individual messages)

2. Compact

For each key, only the LATEST value is kept

Before compact:
  key=A, value=1 (offset 0)
  key=B, value=2 (offset 1)
  key=A, value=3 (offset 2)
  key=C, value=4 (offset 3)
  key=B, value=5 (offset 4)

After compact:
  key=A, value=3 (offset 2)  ← latest
  key=B, value=5 (offset 4)  ← latest
  key=C, value=4 (offset 3)  ← latest

Use cases:

  • User profiles (current state)
  • Configuration (latest config)
  • Inventory (current stock level)
  • Cache backing store

3. Compact + Delete

Combination of both:
  - Compact: keeps the latest value for each key
  - Delete: deletes messages older than retention.ms

Used for:
  - Changelog topics (compacted, but old changelog not needed)
  - State stores with limited lifetime

Configuring Retention

# Create topic with retention
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 6 --replication-factor 3 \
  --config retention.ms=86400000 \
  --config retention.bytes=10737418240

# Change retention on existing topic
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=259200000  # 3 days

# Check current settings
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders --describe

Common Errors Table

Error Symptoms Consequences Solution
retention.ms=-1 Data stored forever Disk fill → broker crash Always set a limit
retention.bytes=-1 No size limit Disk fill on burst traffic Set a reasonable limit
retention.bytes on partition Limit is per partition, not topic 6 partitions × 10GB = 60GB Account for # partitions
Compact without keys All messages = different keys Compact doesn’t work Ensure messages have keys
retention.ms < processing time Data deleted before processing Data loss retention.ms > max processing delay
Different retention on replicas Inconsistent cleanup Replication issues Same retention on all brokers

Policy Comparison

Policy When It Deletes What It Deletes Use Case
delete By time/size Old segments Events, logs, metrics
compact When a newer value exists Old values of the same key State, profiles, config
compact,delete By time OR newer value Old segments AND old keys Changelog with expiry
no policy Never Nothing ⚠️ Dangerous!

When Retention is NOT Needed

  • Infinite replay — need to store all messages forever (rare)
  • Audit/compliance — legal requirement to store everything (but then monitor disk!)
  • Testing — temporary topics, deleted on cleanup

Senior Level

Deep Internals

Log Cleaner — How Deletion Works

Log Cleaner Thread (log.cleaner.threads=1 by default):

1. Log Cleaner scans active log segments
2. Identifies "cleanable" segments:
   - segment.lastModified < now - retention.ms
   - segment.size > log.segment.bytes (for compact)
3. Creates a "clean map" (key → latest offset) for compact
4. Copies live records to a new .clean segment
5. Atomically replaces the old segment with the new one
6. Deletes the old segment (.log, .index, .timeindex)

File structure:
  00000000000000000000.log      ← segment data
  00000000000000000000.index    ← offset→position index
  00000000000000000000.timeindex ← timestamp→offset index
  00000000000000000000.txnindex ← transaction index (if any)

Log Cleaner configuration:

# server.properties
log.cleaner.enable=true
log.cleaner.threads=2                     # parallel cleaner threads
log.cleaner.io.buffer.size=524288         # 512KB buffer per cleaner
log.cleaner.io.buffer.load.factor=0.9     # hash table load factor
log.cleaner.dedupe.buffer.size=134217728  # 128MB dedupe buffer (compact)
log.cleaner.backoff.ms=15000             # backoff between passes
log.cleaner.min.cleanable.ratio=0.5      # min dead data ratio to trigger clean
log.segment.delete.delay.ms=60000        # delay before deleting segment

Segment Rollover and Retention

log.roll.ms=7 days (default)
log.roll.jitter.ms=random(0–1 hour)

Every 7 days (±jitter) a new active segment is created:
  [Segment-N]   [Active Segment]
  1GB (closed)  500MB (writing)

Jitter prevents "thundering herd":
  All partitions rolling over simultaneously → I/O spike
  Jitter distributes rollover over time

Segment deletion logic:

// Simplified logic
def isSegmentDeletable(segment):
    if segment.isActive:
        return false

    if cleanupPolicy.contains("delete"):
        if retentionMs > 0 && segment.maxTimestamp < now - retentionMs:
            return true
        if retentionBytes > 0 && topicSize > retentionBytes:
            return true  // delete oldest first

    if cleanupPolicy.contains("compact"):
        if segmentHasKeysWithNewerValuesInLaterSegments(segment):
            return true  // compact this segment

    return false

Retention and Replication

Leader partition:
  retention.ms = 7 days
  Segment is deleted on the leader after 7 days

Follower partitions:
  Also delete segments after 7 days
  NOT dependent on leader retention status
  Each broker applies retention locally

This matters:
  If a follower is 8 days behind → segment is deleted on the leader
  Follower cannot replicate a deleted segment
  → Follower falls out of ISR
  → Under-replicated partition

Trade-offs

Parameter Large retention Small retention
Disk usage High Low
Replay capability Long history Short history
Consumer flexibility Slow consumers are OK Need to keep up
Recovery options More data for recovery Less data
Broker cost Higher (disk, I/O) Lower
Compliance Easier compliance May not be enough

Edge Cases

1. Retention less than consumer processing time:

Scenario:
  retention.ms = 86400000 (24 hours)
  Consumer processing time = 36 hours (very slow)
  Consumer reads offset 1000 (created 30 hours ago)
  Broker has already deleted that segment!

  Consumer: OffsetOutOfRange error
  auto.offset.reset → earliest/none/latest
  Data is lost for this consumer!

Resolution:
  retention.ms >> max consumer processing delay
  retention.ms >= max consumer downtime + catch-up time

2. Retention and compact: deletion of all data:

Scenario:
  cleanup.policy = compact
  retention.ms = 3600000 (1 hour)
  Key A: last value was 2 hours ago

  Log cleaner:
    - Compact: A has only one value → nothing to compact
    - Delete: value is older than 1 hour → delete

  Result: key A is completely deleted!
  On consumer restart: key A does not exist

This can be unexpected!
  For state stores: retention.ms=-1 or very large
  For changelog: retention.ms = reasonable window

3. Partition-level retention ≠ topic-level:

retention.bytes = 10GB per topic
Topic has 10 partitions
→ Each partition gets 1GB (10GB / 10)

This is a per-partition limit, not total topic!

For topic-level enforcement:
  retention.bytes = desired_total / num_partitions

  Example: want 10GB total, 10 partitions
  retention.bytes = 10GB / 10 = 1GB per partition

4. Retention on broker restart:

Scenario:
  retention.ms = 604800000 (7 days)
  Broker down for 10 days
  On restart: all segments older than 7 days are deleted

  If a follower was down for 10 days:
  - Leader deleted segments 3+ days ago
  - Follower restarts → cannot replicate deleted data
  - Follower needs full re-replication from leader
  - This can take hours for large topics

Resolution:
  retention.ms > max expected broker downtime
  OR: disable retention for critical topics

5. Retention and transactional messages:

Scenario:
  Transaction T1: offsets 1000-1050, committed
  Transaction T2: offsets 1051-1100, aborted
  retention.ms expires for offset 1000

Log cleaner:
  - Aborted transaction data (T2) is deleted
  - Committed transaction data (T1) is deleted by retention

  Consumer with isolation.level=read_committed:
  Sees only committed messages
  Aborted transaction data is never visible
  Retention doesn't affect visibility of aborted transactions

6. Negative retention.bytes and overflow:

retention.bytes must be > 0
retention.bytes = -1 → unlimited
retention.bytes = 0 → delete immediately (practically useless)

Integer overflow:
  retention.bytes = 10GB = 10737418240
  In 32-bit int: overflow → negative number
  Kafka uses 64-bit long → safe up to 9 EB

Performance (Production Numbers)

Config Disk usage (100K msg/s, 1KB/msg, RF=3) Cleaner CPU I/O impact
retention.ms = 1 hour 1GB Low Low
retention.ms = 24 hours 24GB Low Low
retention.ms = 7 days 168GB Medium Medium (cleaner)
retention.ms = 30 days 720GB High High (cleaner + disk)
compact, 1M keys 100MB High (dedupe) High
retention.ms = -1 ∞ (disk fill!) N/A N/A

Production War Story

Situation: Social network, activity feed, 1M events/min. retention.ms = 7 days, retention.bytes = -1 (unlimited).

Problem: After a product launch, event rate grew to 5M/min. retention.bytes = -1 → disk fill. Within 5 days, brokers filled their disks → broker crashes → cascade failure → cluster down.

Timeline:

Day 0:  Launch, event rate 1M → 5M/min
Day 1:  Disk usage: 20% → 40% (no alert set)
Day 2:  Disk usage: 60% (engineer noticed, didn't act)
Day 3:  Disk usage: 80% (still no action)
Day 4:  Disk usage: 95% (alerts finally fired)
Day 5:  Broker 1 disk full → crash
         Replication to brokers 2,3 → overload
         Broker 2 crash
         Broker 3: under-replicated, can't serve
         → Cluster DOWN
         → 15M events lost
         → 3 hours downtime

Root cause:

  1. retention.bytes = -1 (no limit!)
  2. No disk usage alerting
  3. No capacity planning for launch
  4. No emergency retention change procedure

Post-mortem actions:

# Emergency retention change (on a running cluster)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name activity-feed \
  --alter --add-config retention.ms=86400000  # 1 day, not 7

# Monitor disk usage after change
watch -n 5 'du -sh /var/kafka-logs/'

# Set up alerts
# disk_usage > 70% → warning
# disk_usage > 85% → critical
# disk_usage > 95% → page on-call

Long-term fixes:

  • retention.bytes set per topic (not -1!)
  • Disk usage monitoring + alerting
  • Capacity planning for anticipated traffic spikes
  • Emergency runbook: reduce retention in 5 minutes

Monitoring (JMX, Prometheus, Burrow)

JMX metrics:

kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0
  - Log end offset

kafka.log:type=LogCleaner,name=max-clean-time-secs
  - Time spent in log cleaning

kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
  - Cleaner buffer usage

kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
  - Network I/O (affected by retention)

Prometheus + Grafana:

- record: kafka_topic_disk_usage_bytes
  expr: sum(kafka_log_size_bytes) by (topic)

- record: kafka_topic_disk_usage_percent
  expr: kafka_topic_disk_usage_bytes / kafka_disk_capacity_bytes

- alert: KafkaDiskUsageHigh
  expr: kafka_topic_disk_usage_percent > 0.8
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "Kafka disk usage > 80%"

- alert: KafkaRetentionTooLarge
  expr: |
    kafka_topic_config{config="retention.bytes"} < 0
    or
    kafka_topic_config{config="retention.ms"} > 2592000000  # > 30 days
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Topic retention is very large or unlimited"

- alert: KafkaLogCleanerBacklog
  expr: kafka_log_cleaner_max_clean_time_secs > 3600
  for: 30m
  labels:
    severity: warning
  annotations:
    summary: "Log cleaner taking > 1 hour  cleaning backlog"

Disk usage dashboard:

Per-broker disk usage:
  Broker 1: 450GB / 500GB (90%) ⚠️
  Broker 2: 420GB / 500GB (84%) ⚠️
  Broker 3: 430GB / 500GB (86%) ⚠️

Per-topic disk usage:
  orders:       120GB (retention: 7 days, 10GB limit)
  activity:     250GB (retention: -1, unlimited!) ⚠️
  notifications: 80GB (retention: 3 days, 5GB limit)

Action items:
  - Set retention.bytes on activity topic
  - Reduce retention.ms on orders topic
  - Monitor disk usage trend

Highload Best Practices

✅ retention.ms based on business requirements (not infinite)
✅ retention.bytes for disk protection (not -1!)
✅ retention.bytes = desired_total / num_partitions
✅ Disk usage alerting (70%, 85%, 95%)
✅ Compact for state topics (user profiles, config)
✅ Delete for event topics (logs, metrics, events)
✅ Monitor log cleaner throughput
✅ Test retention change procedure (emergency runbook)
✅ retention.ms > max consumer downtime + catch-up time
✅ Regular capacity planning review

❌ retention.ms=-1 (infinite — disk fill guaranteed)
❌ retention.bytes=-1 (unlimited — disk fill guaranteed)
❌ retention.bytes without considering num_partitions
❌ retention.ms < consumer processing time
❌ Without disk usage monitoring
❌ Without emergency retention change procedure
❌ Compact for topics without keys (wasted CPU)

Architectural Decisions

  1. Delete vs Compact — events vs state, determines the policy
  2. Retention is a business decision — not technical, compliance + cost trade-off
  3. Per-partition limits — retention.bytes is divided across partitions
  4. Log cleaner — background process, monitor its health
  5. Emergency reduction — must be able to reduce retention in minutes

Summary for Senior

  • Retention policy: delete (by time/size) or compact (latest value per key)
  • Log cleaner handles actual deletion — monitor its health and throughput
  • retention.bytes = per-partition limit, not total topic
  • retention.ms=-1 or retention.bytes=-1 = guaranteed disk fill
  • Compact for state, delete for events, compact+delete for changelog
  • retention.ms > max consumer downtime + catch-up time
  • Disk usage monitoring + alerting is mandatory
  • Emergency retention change procedure must be tested
  • At highload: capacity planning + burst traffic consideration
  • Retention change on a running cluster — possible, no restart needed

🎯 Interview Cheat Sheet

Must know:

  • Retention policy: delete (by time/size) or compact (latest value per key)
  • retention.ms — storage time; retention.bytes — limit per partition (not per topic!)
  • retention.ms=-1 or retention.bytes=-1 — store forever → disk fill guaranteed
  • Compact + Delete simultaneously is possible: cleanup.policy=compact,delete
  • Log Cleaner — background deletion process; monitor its health and throughput
  • retention.ms > max consumer downtime + catch-up time — otherwise data loss
  • Retention change on a running cluster — possible without restart

Common follow-up questions:

  • Is retention.bytes per topic or per partition? — Per partition! 10 partitions × 10GB = 100GB total.
  • What happens if retention.ms < processing time? — Consumer gets OffsetOutOfRange, data loss.
  • Can compact delete all values for a key? — If there is only one value and retention.ms expired — yes.
  • How does Log Cleaner work? — Scans segments, deletes expired ones, compact rewrites with latest values.

Red flags (DO NOT say):

  • “retention.ms=-1 is a safe choice” — disk fill guaranteed
  • “retention.bytes is a limit for the entire topic” — it is per partition
  • “Compact works for topics without keys” — all messages are different keys, compact is useless
  • “Retention can be ignored in production” — disk fill → broker crash

Related topics:

  • [[28. How are old messages deleted from a topic]]
  • [[1. What is a topic in Kafka]]
  • [[2. What is a partition and why is it needed]]
  • [[26. How to monitor consumer lag]]