Question 28 · Section 15

How are old messages deleted from a Kafka topic

Kafka deletes old messages by segment when they become older than the configured retention period (for cleanup.policy=delete; for compact — deletion is based on the existence of...

Language versions: English Russian Ukrainian

Junior Level

Simple Definition

Kafka deletes old messages by segment when they become older than the configured retention period (for cleanup.policy=delete; for compact — deletion is based on the existence of newer values for the same key). Not one message at a time, but whole files (segments).

Analogy

Imagine a cabinet with folders. Each day gets a new folder. When a folder is older than 7 days, you throw it away entirely, not individual documents from it. This is faster and simpler.

Segment Visualization

Topic "orders", log.segment.bytes = 1GB:

[Segment 0] [Segment 1] [Segment 2] [Segment 3] [Active]
1GB         1GB         1GB         1GB         300MB
offsets:    offsets:    offsets:    offsets:    offsets:
0-500000    500001-     1000001-    1500001-    2000001-
            1000000     1500000     2000000     (writing)

retention.ms = 7 days
Segment 0 was created 8 days ago → DELETED AS A WHOLE

Each Segment is a Set of Files

00000000000000000000.log       ← message data
00000000000000000000.index     ← offset→position index
00000000000000000000.timeindex ← timestamp→offset index

When This Matters

  • Understanding disk usage — how Kafka manages space
  • Troubleshooting — why data “disappeared”
  • Capacity planning — how much disk is needed
  • Retention tuning — how retention changes affect data

Middle Level

Deletion Process Step by Step

1. Log Cleaner thread wakes up (every log.cleaner.backoff.ms)
2. Scans topic segments
3. Finds "deletable" segments:
   - Segment is closed (not active)
   - Segment's max timestamp < now - retention.ms
   - OR topic size > retention.bytes (delete oldest first)
4. Checks log.segment.delete.delay.ms (60s by default)
5. Deletes segment files:
   - .log file
   - .index file
   - .timeindex file
   - .txnindex file (if any)
6. Updates log metadata

Segment Rollover

A new segment is created when:
  - log.segment.bytes is reached (1GB by default)
  - log.segment.ms is reached (7 days by default)
  - log.roll.ms (alias for log.segment.ms)

Active segment → Closed → Eligible for deletion

Rollover with jitter:

log.roll.ms = 7 days
log.roll.jitter.ms = 1 hour (random)

Segment is created after 7 days ± 1 hour
Jitter prevents simultaneous rollover of all partitions

Delete vs Compact — How Deletion Works

Aspect Delete Compact
What deletes Old segments as a whole Old values of the same keys
When retention.ms or retention.bytes When a newer value exists for the same key
Granularity Segment (1GB) Individual records
Speed Fast (delete file) Slow (rewrite segment)
Result Data deleted Latest value for each key

Common Errors Table

Error Symptoms Consequences Solution
Segment not deleted Disk fill Log cleaner not working Check log.cleaner.enable
Needed data deleted Consumer offset out of range Data loss retention.ms > consumer processing time
Frequent rollover Many small segments I/O overhead Increase log.segment.bytes
Rare rollover One huge segment Delete = big impact Decrease log.segment.bytes
Delete delay too large Slow disk freeing Disk fill Decrease log.segment.delete.delay.ms
Compact for topics without keys Nothing gets compacted Wasted CPU Ensure messages have keys

When Messages Are NOT Deleted

  1. Active segment — always protected
  2. retention.ms = -1 — store forever
  3. retention.bytes = -1 — no limit
  4. Log cleaner disabled — log.cleaner.enable=false
  5. Segment not deletable — timestamp not yet expired
  6. Unclean shutdown — segment may be locked

Senior Level

Common Interview Question

What if the consumer is slower than retention? Answer: the consumer gets OffsetOutOfRange and starts reading from the beginning (auto.offset.reset=earliest) or skips data (latest). retention.ms must be greater than the maximum consumer downtime time.

Deep Internals

Log Cleaner — Detailed Architecture

LogCleaner (kafka.log.LogCleaner):
├── Cleaner threads (log.cleaner.threads)
│   └── Each thread:
│       ├── LogCleanerManager (coordinates work)
│       ├── CleanablePartitions (selects what to clean)
│       └── Cleaner (performs actual cleaning)
│
├── Dedupe Buffer (log.cleaner.dedupe.buffer.size)
│   └── OffsetMap: key → latest offset
│   └── Hash table for compact deduplication
│
└── I/O throttling
    └── log.cleaner.io.buffer.size (512KB)
    └── log.cleaner.io.buffer.load.factor (0.9)

Delete process (for cleanup.policy=delete):

// Simplified algorithm
def deleteOldSegments():
    for each partition:
        segments = partition.logSegments()
        deletable = segments.filter(s =>
            !s.isActive &&                              // not active
            s.maxTimestamp < now - retentionMs &&       // expired
            s.age > logSegmentDeleteDelayMs             // delay passed
        )
        for segment in deletable:
            segment.delete()                             // async delete
            asyncDelete(segment)

Compact process (for cleanup.policy=compact):

// Simplified algorithm
def compactPartition():
    // 1. Build clean map (key → latest offset)
    cleanMap = OffsetMap()
    for segment in segments:
        for record in segment:
            if record.key != null:
                cleanMap.put(record.key, record.offset)

    // 2. Determine live records (latest value per key)
    liveOffsets = cleanMap.values()

    // 3. Copy live records to new segment
    newSegment = createCleanSegment()
    for record in segments:
        if record.offset in liveOffsets:
            newSegment.append(record)

    // 4. Swap old segments with new segment
    partition.replaceSegments(segments, newSegment)

    // 5. Delete old segment files
    for segment in segments:
        segment.delete()

Segment Deletion — Async and File System

Async deletion process:
1. Thread marks segment for deletion
2. Waits log.segment.delete.delay.ms (60s default)
3. Deletes .log, .index, .timeindex, .txnindex files
4. Files removed from directory

Async deletion rationale:
  - Consumers may still be reading this segment
  - Delay gives time to finish in-flight reads
  - Prevents "file not found" errors

File system behavior (Linux):
  rm(segment.log) → unlink() system call
  File is not deleted while there are open file descriptors
  Consumers with in-flight reads continue reading
  After all FDs are closed → disk space is freed

Index Files and Their Deletion

Each segment has 2 indexes:

Offset Index (.index):
  Maps offset → byte position in .log file
  Sparse index: every ~4KB of data
  Size: log.index.size.max.bytes = 10MB

Time Index (.timeindex):
  Maps timestamp → offset
  Sparse index: every ~4KB of data
  Size: log.index.size.max.bytes = 10MB

When a segment is deleted:
  All 3 files (.log, .index, .timeindex) are deleted together
  Cannot delete .log without indexes (atomically together)

Trade-offs

Parameter Small segments Large segments
Deletion granularity High (less data at a time) Low (more data at a time)
I/O overhead Higher (more files) Lower
Log cleaner efficiency Lower (more overhead) Higher
Recovery time Faster (less data per segment) Slower
Disk fragmentation Higher Lower
Recommended 100MB–500MB 1GB–2GB

Edge Cases

1. Consumer reads a segment being deleted:

Timeline:
  T=0:  Consumer reads offset 500 from Segment-1
  T=1:  retention.ms expired for Segment-1
  T=2:  Log cleaner starts deleting Segment-1
  T=2.1: Consumer is still reading (in-flight read)

What happens:
  log.segment.delete.delay.ms = 60s (default)
  Consumer has 60 seconds to finish reading
  After 60s: files are deleted
  If consumer hasn't finished → OffsetOutOfRange error

Resolution:
  retention.ms > max consumer read time
  consumers read faster than retention expires

2. Unclean broker shutdown during deletion:

Scenario:
  Log cleaner is deleting Segment-5
  .log file deleted, .index not yet
  Broker crashes

On restart:
  Broker sees orphan .index file without .log
  Broker removes .index file (cleanup orphan)
  Data is already deleted, not recoverable

This is safe behavior — segment deletion is idempotent

3. Retention bytes and partition imbalance:

retention.bytes = 10GB per partition
Partition 0: 5GB (low throughput)
Partition 1: 10GB (high throughput)
Partition 2: 2GB (low throughput)

Partition 1 reaches 10GB → starts deleting
Partition 0 and 2 still have old data

Result:
  Partition 1: stores 2 days of data
  Partition 0: stores 7 days of data
  Time-based retention is NOT uniform across partitions!

Resolution:
  retention.ms for uniform time-based retention
  retention.bytes for disk protection, not uniform retention

4. Compacted topic — deletion of all values:

Scenario:
  cleanup.policy = compact
  retention.ms = 86400000 (1 day)
  Key "user-1": last update was 2 days ago

  Log cleaner:
    1. Compact: "user-1" has only one value → nothing to compact
    2. Delete: value is older than 1 day → DELETE

  Result: key "user-1" completely deleted!
  Consumer on restart: key "user-1" = null

This is intended behavior for compact+delete!
  For permanent state: retention.ms=-1
  For changelog with TTL: compact+delete — OK

5. Segment deletion and replication:

Leader partition:
  Segment-5 (offsets 1M–1.5M) deleted by retention

Follower partition:
  Segment-5 not yet deleted (clock skew or slower cleaner)

  Leader cannot replicate deleted data
  Follower tries to read from leader
  Leader: "Segment doesn't exist"
  Follower: OffsetOutOfRange → falls out of ISR

Resolution:
  Same retention.ms on all brokers
  Clock synchronization (NTP)
  retention.ms > max replication lag time

6. Transaction index cleanup:

Topic with transactional messages:
  .txnindex file contains transaction markers
  On segment deletion:
    .txnindex is deleted along with .log

  Transaction coordinator:
    Tracks committed/aborted transactions
    If transaction markers are deleted:
      Committed transaction records are still in consumer-visible segments
      Aborted transaction records are also deleted

  Consumer isolation.level = read_committed:
    Does not see aborted transactions
    Retention deletion doesn't affect transaction semantics

Performance (Production Numbers)

Operation Time I/O CPU Impact
Segment deletion (1GB) 10–100ms Low (unlink) Minimal Negligible
Log cleaner compact pass 5–30 min High (read+write) Medium I/O impact
Segment rollover (1GB) 50–200ms Medium (file create) Low Brief I/O spike
Index rebuild (after crash) 1–5 min High (read all) Medium Startup delay

Production War Story

Situation: IoT platform, sensor data, 2M messages/min. retention.ms = 24 hours, log.segment.bytes = 1GB.

Problem: Every 2 hours, all 12 partitions simultaneously created new segments (rollover). I/O spike → consumer fetch latency 5s → lag spike → alerts → engineer wakeup call.

Investigation:

  • 2M msg/min × 500 bytes/msg = 1GB/min
  • log.segment.bytes = 1GB → rollover every 1 minute
  • 12 partitions × rollover/min = 12 file creations/min
  • Without jitter → all partitions rollover synchronously
  • I/O pattern: burst every minute → latency spike

Root cause: log.roll.jitter.ms was not configured → all partitions rollover simultaneously.

Resolution:

# server.properties
log.segment.bytes=536870912        # 512MB (more segments, smaller each)
log.roll.ms=3600000                # 1 hour
log.roll.jitter.ms=600000          # 10 minutes jitter

# Result:
# Rollover: every 1 hour ± 10 min
# 12 partitions with distributed rollover
# I/O spike: spread over 20 minutes
# Consumer latency: stable

Post-mortem lesson: jitter is not optional for high-throughput topics with multiple partitions.

Monitoring (JMX, Prometheus, Burrow)

JMX metrics:

kafka.log:type=LogCleaner,name=max-clean-time-secs
  - Max time for a single clean operation

kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
  - Dedupe buffer utilization

kafka.log:type=LogCleaner,name=total-cleans
  - Total number of completed cleans

kafka.log:type=Log,name=NumLogSegments,topic=orders,partition=0
  - Number of segments per partition

kafka.log:type=Log,name=Size,topic=orders,partition=0
  - Size of log in bytes

Prometheus + Grafana:

- record: kafka_partition_segment_count
  expr: kafka_log_num_log_segments

- record: kafka_partition_size_bytes
  expr: kafka_log_size

- alert: KafkaSegmentCountHigh
  expr: kafka_log_num_log_segments > 50
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Partition has > 50 segments  retention may need tuning"

- alert: KafkaLogCleanerSlow
  expr: kafka_log_cleaner_max_clean_time_secs > 1800
  for: 30m
  labels:
    severity: warning
  annotations:
    summary: "Log cleaner taking > 30 min per clean"

- alert: KafkaPartitionSizeHigh
  expr: kafka_log_size > 10737418240  # 10GB
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Partition size exceeded 10GB"

Highload Best Practices

✅ log.segment.bytes = 500MB–1GB (balance granularity vs I/O)
✅ log.roll.jitter.ms = 10–20% of log.roll.ms
✅ log.cleaner.threads = number of disks (1 per disk)
✅ log.cleaner.dedupe.buffer.size = 128MB–256MB
✅ log.segment.delete.delay.ms = 60000 (default OK)
✅ Monitor segment count per partition
✅ Monitor log cleaner throughput
✅ retention.ms >> consumer processing time
✅ Disk space alerting (70%, 85%, 95%)

❌ log.segment.bytes < 100MB (too many files)
❌ log.segment.bytes > 2GB (too large for efficient deletion)
❌ Without jitter (thundering herd rollover)
❌ log.cleaner.enable=false (segments not deleted)
❌ retention.ms < consumer downtime + catch-up
❌ Without monitoring segment count

Architectural Decisions

  1. Segment-based deletion — efficient (file delete), not per-message
  2. Async deletion — delay for in-flight consumers
  3. Jitter — critical for high-throughput, multi-partition topics
  4. Log cleaner — background process, needs monitoring
  5. Index co-deletion — .log + .index + .timeindex always together

Summary for Senior

  • Deletion by segments (not per-message) — efficient file operations
  • Each segment = .log + .index + .timeindex (+ .txnindex) files
  • Log cleaner handles compact (rewrite), delete simply removes expired segments
  • log.segment.delete.delay.ms (60s default) — grace period for in-flight reads
  • Jitter is critical for high-throughput multi-partition topics
  • retention.bytes = per-partition limit, not total topic
  • Compact+delete can delete all values for a key (intended!)
  • Replication: leader deleted segment is unavailable for followers
  • Monitor: segment count, cleaner time, partition size
  • At highload: jitter + proper segment size = stable I/O

🎯 Interview Cheat Sheet

Must know:

  • Deletion by segments (not per-message) — efficient file operations
  • Each segment = .log + .index + .timeindex (+ .txnindex) files
  • Segment is deletable when: not active, timestamp expired, or size exceeded
  • log.segment.delete.delay.ms=60s — grace period for in-flight reads
  • Log Cleaner handles compact (rewrite), delete simply removes expired segments
  • Jitter (log.roll.jitter.ms) is critical for high-throughput — prevents thundering herd
  • Rollover: log.segment.bytes (1GB) or log.segment.ms (7 days)

Common follow-up questions:

  • What if a consumer reads a segment being deleted? — 60s delay gives time to finish reading; after that — OffsetOutOfRange.
  • Can compact+delete delete all data for a key? — Yes, if one value and retention expired.
  • Why is jitter needed? — Without jitter, all partitions rollover simultaneously → I/O spike.
  • Leader deleted a segment, follower didn’t — what happens? — Follower cannot replicate → falls out of ISR.

Red flags (DO NOT say):

  • “Messages are deleted one by one” — whole segments
  • “Everything works without jitter” — thundering herd rollover at high-throughput
  • “Compact deletes segments entirely” — compact rewrites (rewrite), not delete
  • “retention.bytes=-1 is fine” — disk fill guaranteed

Related topics:

  • [[27. What is retention policy]]
  • [[2. What is a partition and why is it needed]]
  • [[1. What is a topic in Kafka]]
  • [[16. What is replication in Kafka]]