How are old messages deleted from a Kafka topic
Kafka deletes old messages by segment when they become older than the configured retention period (for cleanup.policy=delete; for compact — deletion is based on the existence of...
Junior Level
Simple Definition
Kafka deletes old messages by segment when they become older than the configured retention period (for cleanup.policy=delete; for compact — deletion is based on the existence of newer values for the same key). Not one message at a time, but whole files (segments).
Analogy
Imagine a cabinet with folders. Each day gets a new folder. When a folder is older than 7 days, you throw it away entirely, not individual documents from it. This is faster and simpler.
Segment Visualization
Topic "orders", log.segment.bytes = 1GB:
[Segment 0] [Segment 1] [Segment 2] [Segment 3] [Active]
1GB 1GB 1GB 1GB 300MB
offsets: offsets: offsets: offsets: offsets:
0-500000 500001- 1000001- 1500001- 2000001-
1000000 1500000 2000000 (writing)
retention.ms = 7 days
Segment 0 was created 8 days ago → DELETED AS A WHOLE
Each Segment is a Set of Files
00000000000000000000.log ← message data
00000000000000000000.index ← offset→position index
00000000000000000000.timeindex ← timestamp→offset index
When This Matters
- Understanding disk usage — how Kafka manages space
- Troubleshooting — why data “disappeared”
- Capacity planning — how much disk is needed
- Retention tuning — how retention changes affect data
Middle Level
Deletion Process Step by Step
1. Log Cleaner thread wakes up (every log.cleaner.backoff.ms)
2. Scans topic segments
3. Finds "deletable" segments:
- Segment is closed (not active)
- Segment's max timestamp < now - retention.ms
- OR topic size > retention.bytes (delete oldest first)
4. Checks log.segment.delete.delay.ms (60s by default)
5. Deletes segment files:
- .log file
- .index file
- .timeindex file
- .txnindex file (if any)
6. Updates log metadata
Segment Rollover
A new segment is created when:
- log.segment.bytes is reached (1GB by default)
- log.segment.ms is reached (7 days by default)
- log.roll.ms (alias for log.segment.ms)
Active segment → Closed → Eligible for deletion
Rollover with jitter:
log.roll.ms = 7 days
log.roll.jitter.ms = 1 hour (random)
Segment is created after 7 days ± 1 hour
Jitter prevents simultaneous rollover of all partitions
Delete vs Compact — How Deletion Works
| Aspect | Delete | Compact |
|---|---|---|
| What deletes | Old segments as a whole | Old values of the same keys |
| When | retention.ms or retention.bytes | When a newer value exists for the same key |
| Granularity | Segment (1GB) | Individual records |
| Speed | Fast (delete file) | Slow (rewrite segment) |
| Result | Data deleted | Latest value for each key |
Common Errors Table
| Error | Symptoms | Consequences | Solution |
|---|---|---|---|
| Segment not deleted | Disk fill | Log cleaner not working | Check log.cleaner.enable |
| Needed data deleted | Consumer offset out of range | Data loss | retention.ms > consumer processing time |
| Frequent rollover | Many small segments | I/O overhead | Increase log.segment.bytes |
| Rare rollover | One huge segment | Delete = big impact | Decrease log.segment.bytes |
| Delete delay too large | Slow disk freeing | Disk fill | Decrease log.segment.delete.delay.ms |
| Compact for topics without keys | Nothing gets compacted | Wasted CPU | Ensure messages have keys |
When Messages Are NOT Deleted
- Active segment — always protected
- retention.ms = -1 — store forever
- retention.bytes = -1 — no limit
- Log cleaner disabled — log.cleaner.enable=false
- Segment not deletable — timestamp not yet expired
- Unclean shutdown — segment may be locked
Senior Level
Common Interview Question
What if the consumer is slower than retention? Answer: the consumer gets OffsetOutOfRange and starts reading from the beginning (auto.offset.reset=earliest) or skips data (latest). retention.ms must be greater than the maximum consumer downtime time.
Deep Internals
Log Cleaner — Detailed Architecture
LogCleaner (kafka.log.LogCleaner):
├── Cleaner threads (log.cleaner.threads)
│ └── Each thread:
│ ├── LogCleanerManager (coordinates work)
│ ├── CleanablePartitions (selects what to clean)
│ └── Cleaner (performs actual cleaning)
│
├── Dedupe Buffer (log.cleaner.dedupe.buffer.size)
│ └── OffsetMap: key → latest offset
│ └── Hash table for compact deduplication
│
└── I/O throttling
└── log.cleaner.io.buffer.size (512KB)
└── log.cleaner.io.buffer.load.factor (0.9)
Delete process (for cleanup.policy=delete):
// Simplified algorithm
def deleteOldSegments():
for each partition:
segments = partition.logSegments()
deletable = segments.filter(s =>
!s.isActive && // not active
s.maxTimestamp < now - retentionMs && // expired
s.age > logSegmentDeleteDelayMs // delay passed
)
for segment in deletable:
segment.delete() // async delete
asyncDelete(segment)
Compact process (for cleanup.policy=compact):
// Simplified algorithm
def compactPartition():
// 1. Build clean map (key → latest offset)
cleanMap = OffsetMap()
for segment in segments:
for record in segment:
if record.key != null:
cleanMap.put(record.key, record.offset)
// 2. Determine live records (latest value per key)
liveOffsets = cleanMap.values()
// 3. Copy live records to new segment
newSegment = createCleanSegment()
for record in segments:
if record.offset in liveOffsets:
newSegment.append(record)
// 4. Swap old segments with new segment
partition.replaceSegments(segments, newSegment)
// 5. Delete old segment files
for segment in segments:
segment.delete()
Segment Deletion — Async and File System
Async deletion process:
1. Thread marks segment for deletion
2. Waits log.segment.delete.delay.ms (60s default)
3. Deletes .log, .index, .timeindex, .txnindex files
4. Files removed from directory
Async deletion rationale:
- Consumers may still be reading this segment
- Delay gives time to finish in-flight reads
- Prevents "file not found" errors
File system behavior (Linux):
rm(segment.log) → unlink() system call
File is not deleted while there are open file descriptors
Consumers with in-flight reads continue reading
After all FDs are closed → disk space is freed
Index Files and Their Deletion
Each segment has 2 indexes:
Offset Index (.index):
Maps offset → byte position in .log file
Sparse index: every ~4KB of data
Size: log.index.size.max.bytes = 10MB
Time Index (.timeindex):
Maps timestamp → offset
Sparse index: every ~4KB of data
Size: log.index.size.max.bytes = 10MB
When a segment is deleted:
All 3 files (.log, .index, .timeindex) are deleted together
Cannot delete .log without indexes (atomically together)
Trade-offs
| Parameter | Small segments | Large segments |
|---|---|---|
| Deletion granularity | High (less data at a time) | Low (more data at a time) |
| I/O overhead | Higher (more files) | Lower |
| Log cleaner efficiency | Lower (more overhead) | Higher |
| Recovery time | Faster (less data per segment) | Slower |
| Disk fragmentation | Higher | Lower |
| Recommended | 100MB–500MB | 1GB–2GB |
Edge Cases
1. Consumer reads a segment being deleted:
Timeline:
T=0: Consumer reads offset 500 from Segment-1
T=1: retention.ms expired for Segment-1
T=2: Log cleaner starts deleting Segment-1
T=2.1: Consumer is still reading (in-flight read)
What happens:
log.segment.delete.delay.ms = 60s (default)
Consumer has 60 seconds to finish reading
After 60s: files are deleted
If consumer hasn't finished → OffsetOutOfRange error
Resolution:
retention.ms > max consumer read time
consumers read faster than retention expires
2. Unclean broker shutdown during deletion:
Scenario:
Log cleaner is deleting Segment-5
.log file deleted, .index not yet
Broker crashes
On restart:
Broker sees orphan .index file without .log
Broker removes .index file (cleanup orphan)
Data is already deleted, not recoverable
This is safe behavior — segment deletion is idempotent
3. Retention bytes and partition imbalance:
retention.bytes = 10GB per partition
Partition 0: 5GB (low throughput)
Partition 1: 10GB (high throughput)
Partition 2: 2GB (low throughput)
Partition 1 reaches 10GB → starts deleting
Partition 0 and 2 still have old data
Result:
Partition 1: stores 2 days of data
Partition 0: stores 7 days of data
Time-based retention is NOT uniform across partitions!
Resolution:
retention.ms for uniform time-based retention
retention.bytes for disk protection, not uniform retention
4. Compacted topic — deletion of all values:
Scenario:
cleanup.policy = compact
retention.ms = 86400000 (1 day)
Key "user-1": last update was 2 days ago
Log cleaner:
1. Compact: "user-1" has only one value → nothing to compact
2. Delete: value is older than 1 day → DELETE
Result: key "user-1" completely deleted!
Consumer on restart: key "user-1" = null
This is intended behavior for compact+delete!
For permanent state: retention.ms=-1
For changelog with TTL: compact+delete — OK
5. Segment deletion and replication:
Leader partition:
Segment-5 (offsets 1M–1.5M) deleted by retention
Follower partition:
Segment-5 not yet deleted (clock skew or slower cleaner)
Leader cannot replicate deleted data
Follower tries to read from leader
Leader: "Segment doesn't exist"
Follower: OffsetOutOfRange → falls out of ISR
Resolution:
Same retention.ms on all brokers
Clock synchronization (NTP)
retention.ms > max replication lag time
6. Transaction index cleanup:
Topic with transactional messages:
.txnindex file contains transaction markers
On segment deletion:
.txnindex is deleted along with .log
Transaction coordinator:
Tracks committed/aborted transactions
If transaction markers are deleted:
Committed transaction records are still in consumer-visible segments
Aborted transaction records are also deleted
Consumer isolation.level = read_committed:
Does not see aborted transactions
Retention deletion doesn't affect transaction semantics
Performance (Production Numbers)
| Operation | Time | I/O | CPU | Impact |
|---|---|---|---|---|
| Segment deletion (1GB) | 10–100ms | Low (unlink) | Minimal | Negligible |
| Log cleaner compact pass | 5–30 min | High (read+write) | Medium | I/O impact |
| Segment rollover (1GB) | 50–200ms | Medium (file create) | Low | Brief I/O spike |
| Index rebuild (after crash) | 1–5 min | High (read all) | Medium | Startup delay |
Production War Story
Situation: IoT platform, sensor data, 2M messages/min. retention.ms = 24 hours, log.segment.bytes = 1GB.
Problem: Every 2 hours, all 12 partitions simultaneously created new segments (rollover). I/O spike → consumer fetch latency 5s → lag spike → alerts → engineer wakeup call.
Investigation:
- 2M msg/min × 500 bytes/msg = 1GB/min
- log.segment.bytes = 1GB → rollover every 1 minute
- 12 partitions × rollover/min = 12 file creations/min
- Without jitter → all partitions rollover synchronously
- I/O pattern: burst every minute → latency spike
Root cause: log.roll.jitter.ms was not configured → all partitions rollover simultaneously.
Resolution:
# server.properties
log.segment.bytes=536870912 # 512MB (more segments, smaller each)
log.roll.ms=3600000 # 1 hour
log.roll.jitter.ms=600000 # 10 minutes jitter
# Result:
# Rollover: every 1 hour ± 10 min
# 12 partitions with distributed rollover
# I/O spike: spread over 20 minutes
# Consumer latency: stable
Post-mortem lesson: jitter is not optional for high-throughput topics with multiple partitions.
Monitoring (JMX, Prometheus, Burrow)
JMX metrics:
kafka.log:type=LogCleaner,name=max-clean-time-secs
- Max time for a single clean operation
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
- Dedupe buffer utilization
kafka.log:type=LogCleaner,name=total-cleans
- Total number of completed cleans
kafka.log:type=Log,name=NumLogSegments,topic=orders,partition=0
- Number of segments per partition
kafka.log:type=Log,name=Size,topic=orders,partition=0
- Size of log in bytes
Prometheus + Grafana:
- record: kafka_partition_segment_count
expr: kafka_log_num_log_segments
- record: kafka_partition_size_bytes
expr: kafka_log_size
- alert: KafkaSegmentCountHigh
expr: kafka_log_num_log_segments > 50
for: 10m
labels:
severity: warning
annotations:
summary: "Partition has > 50 segments — retention may need tuning"
- alert: KafkaLogCleanerSlow
expr: kafka_log_cleaner_max_clean_time_secs > 1800
for: 30m
labels:
severity: warning
annotations:
summary: "Log cleaner taking > 30 min per clean"
- alert: KafkaPartitionSizeHigh
expr: kafka_log_size > 10737418240 # 10GB
for: 5m
labels:
severity: warning
annotations:
summary: "Partition size exceeded 10GB"
Highload Best Practices
✅ log.segment.bytes = 500MB–1GB (balance granularity vs I/O)
✅ log.roll.jitter.ms = 10–20% of log.roll.ms
✅ log.cleaner.threads = number of disks (1 per disk)
✅ log.cleaner.dedupe.buffer.size = 128MB–256MB
✅ log.segment.delete.delay.ms = 60000 (default OK)
✅ Monitor segment count per partition
✅ Monitor log cleaner throughput
✅ retention.ms >> consumer processing time
✅ Disk space alerting (70%, 85%, 95%)
❌ log.segment.bytes < 100MB (too many files)
❌ log.segment.bytes > 2GB (too large for efficient deletion)
❌ Without jitter (thundering herd rollover)
❌ log.cleaner.enable=false (segments not deleted)
❌ retention.ms < consumer downtime + catch-up
❌ Without monitoring segment count
Architectural Decisions
- Segment-based deletion — efficient (file delete), not per-message
- Async deletion — delay for in-flight consumers
- Jitter — critical for high-throughput, multi-partition topics
- Log cleaner — background process, needs monitoring
- Index co-deletion — .log + .index + .timeindex always together
Summary for Senior
- Deletion by segments (not per-message) — efficient file operations
- Each segment = .log + .index + .timeindex (+ .txnindex) files
- Log cleaner handles compact (rewrite), delete simply removes expired segments
- log.segment.delete.delay.ms (60s default) — grace period for in-flight reads
- Jitter is critical for high-throughput multi-partition topics
- retention.bytes = per-partition limit, not total topic
- Compact+delete can delete all values for a key (intended!)
- Replication: leader deleted segment is unavailable for followers
- Monitor: segment count, cleaner time, partition size
- At highload: jitter + proper segment size = stable I/O
🎯 Interview Cheat Sheet
Must know:
- Deletion by segments (not per-message) — efficient file operations
- Each segment = .log + .index + .timeindex (+ .txnindex) files
- Segment is deletable when: not active, timestamp expired, or size exceeded
log.segment.delete.delay.ms=60s— grace period for in-flight reads- Log Cleaner handles compact (rewrite), delete simply removes expired segments
- Jitter (
log.roll.jitter.ms) is critical for high-throughput — prevents thundering herd - Rollover:
log.segment.bytes(1GB) orlog.segment.ms(7 days)
Common follow-up questions:
- What if a consumer reads a segment being deleted? — 60s delay gives time to finish reading; after that — OffsetOutOfRange.
- Can compact+delete delete all data for a key? — Yes, if one value and retention expired.
- Why is jitter needed? — Without jitter, all partitions rollover simultaneously → I/O spike.
- Leader deleted a segment, follower didn’t — what happens? — Follower cannot replicate → falls out of ISR.
Red flags (DO NOT say):
- “Messages are deleted one by one” — whole segments
- “Everything works without jitter” — thundering herd rollover at high-throughput
- “Compact deletes segments entirely” — compact rewrites (rewrite), not delete
- “retention.bytes=-1 is fine” — disk fill guaranteed
Related topics:
- [[27. What is retention policy]]
- [[2. What is a partition and why is it needed]]
- [[1. What is a topic in Kafka]]
- [[16. What is replication in Kafka]]