What is retention policy in Kafka
Imagine a DVR (video recorder). You configure it: "keep recordings for 7 days." After 7 days, the oldest recording is automatically deleted. If the disk fills up sooner — old re...
Junior Level
Simple Definition
Retention policy — a rule defining how long Kafka stores messages in a topic. After the retention period expires, messages are automatically deleted.
Analogy
Imagine a DVR (video recorder). You configure it: “keep recordings for 7 days.” After 7 days, the oldest recording is automatically deleted. If the disk fills up sooner — old recordings are deleted to free up space.
Key Parameters
# Storage time (milliseconds)
retention.ms=604800000 # 7 days (7 × 24 × 60 × 60 × 1000)
# Storage size (bytes)
retention.bytes=10737418240 # 10GB per partition (NOT per entire topic!)
# For compacted topics
cleanup.policy=compact # keep the latest value per key
cleanup.policy=delete # delete by time/size (default)
cleanup.policy=compact,delete # both modes simultaneously
Example
// Creating a topic with retention policy
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// AdminClient — programmatic interface for topic management.
// NewTopic creates the configuration for a new topic, .configs() adds parameters.
NewTopic topic = new NewTopic("orders", 6, (short) 3)
.configs(Map.of(
"retention.ms", "604800000", // 7 days
"retention.bytes", "5368709120", // 5GB
"cleanup.policy", "delete"
));
admin.createTopics(List.of(topic)).all().get();
When to Use
- Any production topic — always configure retention
- Event streaming — delete policy (store for N days)
- State/Configuration data — compact policy (latest value)
- Compliance/Audit — long retention (30–90 days)
Middle Level
Cleanup Policies
1. Delete (default)
Messages are deleted when:
- retention.ms time has passed, OR
- Topic exceeded retention.bytes
retention.ms=-1 → store forever (dangerous!)
retention.bytes=-1 → no size limit (dangerous!)
How it works:
Log Segments:
[Segment-1] [Segment-2] [Segment-3] [Segment-4] [Active]
1GB 1GB 1GB 1GB 500MB
retention.ms = 7 days
Segment-1 was created 8 days ago → ELIGIBLE FOR DELETION
Log cleaner deletes Segment-1 as a whole (not individual messages)
2. Compact
For each key, only the LATEST value is kept
Before compact:
key=A, value=1 (offset 0)
key=B, value=2 (offset 1)
key=A, value=3 (offset 2)
key=C, value=4 (offset 3)
key=B, value=5 (offset 4)
After compact:
key=A, value=3 (offset 2) ← latest
key=B, value=5 (offset 4) ← latest
key=C, value=4 (offset 3) ← latest
Use cases:
- User profiles (current state)
- Configuration (latest config)
- Inventory (current stock level)
- Cache backing store
3. Compact + Delete
Combination of both:
- Compact: keeps the latest value for each key
- Delete: deletes messages older than retention.ms
Used for:
- Changelog topics (compacted, but old changelog not needed)
- State stores with limited lifetime
Configuring Retention
# Create topic with retention
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders --partitions 6 --replication-factor 3 \
--config retention.ms=86400000 \
--config retention.bytes=10737418240
# Change retention on existing topic
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders \
--alter --add-config retention.ms=259200000 # 3 days
# Check current settings
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders --describe
Common Errors Table
| Error | Symptoms | Consequences | Solution |
|---|---|---|---|
| retention.ms=-1 | Data stored forever | Disk fill → broker crash | Always set a limit |
| retention.bytes=-1 | No size limit | Disk fill on burst traffic | Set a reasonable limit |
| retention.bytes on partition | Limit is per partition, not topic | 6 partitions × 10GB = 60GB | Account for # partitions |
| Compact without keys | All messages = different keys | Compact doesn’t work | Ensure messages have keys |
| retention.ms < processing time | Data deleted before processing | Data loss | retention.ms > max processing delay |
| Different retention on replicas | Inconsistent cleanup | Replication issues | Same retention on all brokers |
Policy Comparison
| Policy | When It Deletes | What It Deletes | Use Case |
|---|---|---|---|
| delete | By time/size | Old segments | Events, logs, metrics |
| compact | When a newer value exists | Old values of the same key | State, profiles, config |
| compact,delete | By time OR newer value | Old segments AND old keys | Changelog with expiry |
| no policy | Never | Nothing | ⚠️ Dangerous! |
When Retention is NOT Needed
- Infinite replay — need to store all messages forever (rare)
- Audit/compliance — legal requirement to store everything (but then monitor disk!)
- Testing — temporary topics, deleted on cleanup
Senior Level
Deep Internals
Log Cleaner — How Deletion Works
Log Cleaner Thread (log.cleaner.threads=1 by default):
1. Log Cleaner scans active log segments
2. Identifies "cleanable" segments:
- segment.lastModified < now - retention.ms
- segment.size > log.segment.bytes (for compact)
3. Creates a "clean map" (key → latest offset) for compact
4. Copies live records to a new .clean segment
5. Atomically replaces the old segment with the new one
6. Deletes the old segment (.log, .index, .timeindex)
File structure:
00000000000000000000.log ← segment data
00000000000000000000.index ← offset→position index
00000000000000000000.timeindex ← timestamp→offset index
00000000000000000000.txnindex ← transaction index (if any)
Log Cleaner configuration:
# server.properties
log.cleaner.enable=true
log.cleaner.threads=2 # parallel cleaner threads
log.cleaner.io.buffer.size=524288 # 512KB buffer per cleaner
log.cleaner.io.buffer.load.factor=0.9 # hash table load factor
log.cleaner.dedupe.buffer.size=134217728 # 128MB dedupe buffer (compact)
log.cleaner.backoff.ms=15000 # backoff between passes
log.cleaner.min.cleanable.ratio=0.5 # min dead data ratio to trigger clean
log.segment.delete.delay.ms=60000 # delay before deleting segment
Segment Rollover and Retention
log.roll.ms=7 days (default)
log.roll.jitter.ms=random(0–1 hour)
Every 7 days (±jitter) a new active segment is created:
[Segment-N] [Active Segment]
1GB (closed) 500MB (writing)
Jitter prevents "thundering herd":
All partitions rolling over simultaneously → I/O spike
Jitter distributes rollover over time
Segment deletion logic:
// Simplified logic
def isSegmentDeletable(segment):
if segment.isActive:
return false
if cleanupPolicy.contains("delete"):
if retentionMs > 0 && segment.maxTimestamp < now - retentionMs:
return true
if retentionBytes > 0 && topicSize > retentionBytes:
return true // delete oldest first
if cleanupPolicy.contains("compact"):
if segmentHasKeysWithNewerValuesInLaterSegments(segment):
return true // compact this segment
return false
Retention and Replication
Leader partition:
retention.ms = 7 days
Segment is deleted on the leader after 7 days
Follower partitions:
Also delete segments after 7 days
NOT dependent on leader retention status
Each broker applies retention locally
This matters:
If a follower is 8 days behind → segment is deleted on the leader
Follower cannot replicate a deleted segment
→ Follower falls out of ISR
→ Under-replicated partition
Trade-offs
| Parameter | Large retention | Small retention |
|---|---|---|
| Disk usage | High | Low |
| Replay capability | Long history | Short history |
| Consumer flexibility | Slow consumers are OK | Need to keep up |
| Recovery options | More data for recovery | Less data |
| Broker cost | Higher (disk, I/O) | Lower |
| Compliance | Easier compliance | May not be enough |
Edge Cases
1. Retention less than consumer processing time:
Scenario:
retention.ms = 86400000 (24 hours)
Consumer processing time = 36 hours (very slow)
Consumer reads offset 1000 (created 30 hours ago)
Broker has already deleted that segment!
Consumer: OffsetOutOfRange error
auto.offset.reset → earliest/none/latest
Data is lost for this consumer!
Resolution:
retention.ms >> max consumer processing delay
retention.ms >= max consumer downtime + catch-up time
2. Retention and compact: deletion of all data:
Scenario:
cleanup.policy = compact
retention.ms = 3600000 (1 hour)
Key A: last value was 2 hours ago
Log cleaner:
- Compact: A has only one value → nothing to compact
- Delete: value is older than 1 hour → delete
Result: key A is completely deleted!
On consumer restart: key A does not exist
This can be unexpected!
For state stores: retention.ms=-1 or very large
For changelog: retention.ms = reasonable window
3. Partition-level retention ≠ topic-level:
retention.bytes = 10GB per topic
Topic has 10 partitions
→ Each partition gets 1GB (10GB / 10)
This is a per-partition limit, not total topic!
For topic-level enforcement:
retention.bytes = desired_total / num_partitions
Example: want 10GB total, 10 partitions
retention.bytes = 10GB / 10 = 1GB per partition
4. Retention on broker restart:
Scenario:
retention.ms = 604800000 (7 days)
Broker down for 10 days
On restart: all segments older than 7 days are deleted
If a follower was down for 10 days:
- Leader deleted segments 3+ days ago
- Follower restarts → cannot replicate deleted data
- Follower needs full re-replication from leader
- This can take hours for large topics
Resolution:
retention.ms > max expected broker downtime
OR: disable retention for critical topics
5. Retention and transactional messages:
Scenario:
Transaction T1: offsets 1000-1050, committed
Transaction T2: offsets 1051-1100, aborted
retention.ms expires for offset 1000
Log cleaner:
- Aborted transaction data (T2) is deleted
- Committed transaction data (T1) is deleted by retention
Consumer with isolation.level=read_committed:
Sees only committed messages
Aborted transaction data is never visible
Retention doesn't affect visibility of aborted transactions
6. Negative retention.bytes and overflow:
retention.bytes must be > 0
retention.bytes = -1 → unlimited
retention.bytes = 0 → delete immediately (practically useless)
Integer overflow:
retention.bytes = 10GB = 10737418240
In 32-bit int: overflow → negative number
Kafka uses 64-bit long → safe up to 9 EB
Performance (Production Numbers)
| Config | Disk usage (100K msg/s, 1KB/msg, RF=3) | Cleaner CPU | I/O impact |
|---|---|---|---|
| retention.ms = 1 hour | 1GB | Low | Low |
| retention.ms = 24 hours | 24GB | Low | Low |
| retention.ms = 7 days | 168GB | Medium | Medium (cleaner) |
| retention.ms = 30 days | 720GB | High | High (cleaner + disk) |
| compact, 1M keys | 100MB | High (dedupe) | High |
| retention.ms = -1 | ∞ (disk fill!) | N/A | N/A |
Production War Story
Situation: Social network, activity feed, 1M events/min. retention.ms = 7 days, retention.bytes = -1 (unlimited).
Problem: After a product launch, event rate grew to 5M/min. retention.bytes = -1 → disk fill. Within 5 days, brokers filled their disks → broker crashes → cascade failure → cluster down.
Timeline:
Day 0: Launch, event rate 1M → 5M/min
Day 1: Disk usage: 20% → 40% (no alert set)
Day 2: Disk usage: 60% (engineer noticed, didn't act)
Day 3: Disk usage: 80% (still no action)
Day 4: Disk usage: 95% (alerts finally fired)
Day 5: Broker 1 disk full → crash
Replication to brokers 2,3 → overload
Broker 2 crash
Broker 3: under-replicated, can't serve
→ Cluster DOWN
→ 15M events lost
→ 3 hours downtime
Root cause:
- retention.bytes = -1 (no limit!)
- No disk usage alerting
- No capacity planning for launch
- No emergency retention change procedure
Post-mortem actions:
# Emergency retention change (on a running cluster)
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name activity-feed \
--alter --add-config retention.ms=86400000 # 1 day, not 7
# Monitor disk usage after change
watch -n 5 'du -sh /var/kafka-logs/'
# Set up alerts
# disk_usage > 70% → warning
# disk_usage > 85% → critical
# disk_usage > 95% → page on-call
Long-term fixes:
- retention.bytes set per topic (not -1!)
- Disk usage monitoring + alerting
- Capacity planning for anticipated traffic spikes
- Emergency runbook: reduce retention in 5 minutes
Monitoring (JMX, Prometheus, Burrow)
JMX metrics:
kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0
- Log end offset
kafka.log:type=LogCleaner,name=max-clean-time-secs
- Time spent in log cleaning
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
- Cleaner buffer usage
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
- Network I/O (affected by retention)
Prometheus + Grafana:
- record: kafka_topic_disk_usage_bytes
expr: sum(kafka_log_size_bytes) by (topic)
- record: kafka_topic_disk_usage_percent
expr: kafka_topic_disk_usage_bytes / kafka_disk_capacity_bytes
- alert: KafkaDiskUsageHigh
expr: kafka_topic_disk_usage_percent > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka disk usage > 80%"
- alert: KafkaRetentionTooLarge
expr: |
kafka_topic_config{config="retention.bytes"} < 0
or
kafka_topic_config{config="retention.ms"} > 2592000000 # > 30 days
for: 10m
labels:
severity: warning
annotations:
summary: "Topic retention is very large or unlimited"
- alert: KafkaLogCleanerBacklog
expr: kafka_log_cleaner_max_clean_time_secs > 3600
for: 30m
labels:
severity: warning
annotations:
summary: "Log cleaner taking > 1 hour — cleaning backlog"
Disk usage dashboard:
Per-broker disk usage:
Broker 1: 450GB / 500GB (90%) ⚠️
Broker 2: 420GB / 500GB (84%) ⚠️
Broker 3: 430GB / 500GB (86%) ⚠️
Per-topic disk usage:
orders: 120GB (retention: 7 days, 10GB limit)
activity: 250GB (retention: -1, unlimited!) ⚠️
notifications: 80GB (retention: 3 days, 5GB limit)
Action items:
- Set retention.bytes on activity topic
- Reduce retention.ms on orders topic
- Monitor disk usage trend
Highload Best Practices
✅ retention.ms based on business requirements (not infinite)
✅ retention.bytes for disk protection (not -1!)
✅ retention.bytes = desired_total / num_partitions
✅ Disk usage alerting (70%, 85%, 95%)
✅ Compact for state topics (user profiles, config)
✅ Delete for event topics (logs, metrics, events)
✅ Monitor log cleaner throughput
✅ Test retention change procedure (emergency runbook)
✅ retention.ms > max consumer downtime + catch-up time
✅ Regular capacity planning review
❌ retention.ms=-1 (infinite — disk fill guaranteed)
❌ retention.bytes=-1 (unlimited — disk fill guaranteed)
❌ retention.bytes without considering num_partitions
❌ retention.ms < consumer processing time
❌ Without disk usage monitoring
❌ Without emergency retention change procedure
❌ Compact for topics without keys (wasted CPU)
Architectural Decisions
- Delete vs Compact — events vs state, determines the policy
- Retention is a business decision — not technical, compliance + cost trade-off
- Per-partition limits — retention.bytes is divided across partitions
- Log cleaner — background process, monitor its health
- Emergency reduction — must be able to reduce retention in minutes
Summary for Senior
- Retention policy: delete (by time/size) or compact (latest value per key)
- Log cleaner handles actual deletion — monitor its health and throughput
- retention.bytes = per-partition limit, not total topic
- retention.ms=-1 or retention.bytes=-1 = guaranteed disk fill
- Compact for state, delete for events, compact+delete for changelog
- retention.ms > max consumer downtime + catch-up time
- Disk usage monitoring + alerting is mandatory
- Emergency retention change procedure must be tested
- At highload: capacity planning + burst traffic consideration
- Retention change on a running cluster — possible, no restart needed
🎯 Interview Cheat Sheet
Must know:
- Retention policy: delete (by time/size) or compact (latest value per key)
retention.ms— storage time;retention.bytes— limit per partition (not per topic!)retention.ms=-1orretention.bytes=-1— store forever → disk fill guaranteed- Compact + Delete simultaneously is possible:
cleanup.policy=compact,delete - Log Cleaner — background deletion process; monitor its health and throughput
- retention.ms > max consumer downtime + catch-up time — otherwise data loss
- Retention change on a running cluster — possible without restart
Common follow-up questions:
- Is retention.bytes per topic or per partition? — Per partition! 10 partitions × 10GB = 100GB total.
- What happens if retention.ms < processing time? — Consumer gets OffsetOutOfRange, data loss.
- Can compact delete all values for a key? — If there is only one value and retention.ms expired — yes.
- How does Log Cleaner work? — Scans segments, deletes expired ones, compact rewrites with latest values.
Red flags (DO NOT say):
- “retention.ms=-1 is a safe choice” — disk fill guaranteed
- “retention.bytes is a limit for the entire topic” — it is per partition
- “Compact works for topics without keys” — all messages are different keys, compact is useless
- “Retention can be ignored in production” — disk fill → broker crash
Related topics:
- [[28. How are old messages deleted from a topic]]
- [[1. What is a topic in Kafka]]
- [[2. What is a partition and why is it needed]]
- [[26. How to monitor consumer lag]]