Question 13 · Section 15

How does offset commit work

4. Idempotent processing — duplicate protection on retry

Language versions: English Russian Ukrainian

Junior Level

Definition

Commit offset is saving the number of the last processed message in Kafka.

Where it’s saved: to a special internal Kafka topic (__consumer_offsets). On restart, the consumer reads its last position from there and continues from it. Without a commit — it starts from the beginning.

// Automatic commit
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");  // every 5 seconds

// Manual commit
props.put("enable.auto.commit", "false");
consumer.commitSync();  // after processing

Why commit?

Without commit:
  Consumer crashes → on restart reads everything from the start
  → Duplicates, wasted processing

With commit:
  Consumer crashes → on restart continues from last commit
  → Minimal duplicates

Example

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (var record : records) {
        process(record);  // processing
    }
    consumer.commitSync();  // commit after processing
}

Middle Level

Sync vs Async Commit

Sync Commit:

// Waits for broker confirmation
consumer.commitSync();
// Sync commit: latency 5-50ms per commit. At 100 commits/sec — 500-5000ms overhead.
// Async commit: <5ms (non-blocking).
// On error — throws exception

Async Commit:

// Doesn't wait for confirmation
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});
// Faster, but possible loss on failure

When to use which?

Commit type When to use
commitSync() Critical data, low throughput
commitAsync() High-throughput, rare duplicates acceptable
auto.commit Prototypes, non-production

Batch Commit

// Commit every N messages
int count = 0;
for (var record : records) {
    process(record);
    if (++count % 100 == 0) {
        consumer.commitAsync();
    }
}

Committable Offsets

// Commit specific offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(1000));
offsets.put(new TopicPartition("orders", 1), new OffsetAndMetadata(2000));
consumer.commitSync(offsets);

Common mistakes

  1. Commit before processing:
    consumer.commitSync();  // ❌ commit first
    process(records);       // then process
    // If crashes → data lost
    
  2. Commit inside processing loop:
    for (var record : records) {
     process(record);
     consumer.commitSync();  // ❌ too frequent
    }
    

Senior Level

Internal Implementation

Commit Process:

1. Consumer → CommitOffset request → Broker
2. Broker → writes to __consumer_offsets topic
3. Broker → confirms commit
4. Consumer → receives confirmation

__consumer_offsets topic:

- Internal Kafka topic
- Compacted (log compaction)
- Stores offsets for all consumer groups
- Key: group.id + topic + partition
- Value: offset + metadata + timestamp

Offset Commit Latency

Sync commit latency:
  Normal: 5-20ms
  With issues: >100ms
  Impact: blocks processing

Async commit latency:
  Normal: <5ms (non-blocking)
  Impact: minimal

Failure Scenarios

1. Commit failure:

Consumer → commit → network error → broker didn't receive
On restart:
  - Committed offset is old
  - Messages will be processed again
  - Idempotent processing solves the problem

2. Broker failure during commit:

Consumer → commit → broker wrote → broker crashed before confirmation
Consumer → retry commit → new leader
Commit may be lost or repeated

Offset Management Patterns

1. Per-Record Commit:

// Commit each message (maximum reliability)
for (var record : records) {
    process(record);
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
    consumer.commitSync(offsets);
}

2. Batch Commit with Error Handling:

try {
    for (var record : records) {
        process(record);
    }
    consumer.commitSync();
} catch (Exception e) {
    // Don't commit → messages will be read again
    log.error("Processing failed, will retry", e);
}

3. Async Commit with Retry:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // Sync commit — single retry attempt. If it also fails — log error.
        // Don't do infinite retries to avoid blocking processing.
        try {
            consumer.commitSync(offsets);
        } catch (Exception e) {
            log.error("Commit retry failed", e);
        }
    }
});

Offset Metadata

// Save metadata with offset
OffsetAndMetadata metadata = new OffsetAndMetadata(
    offset,
    "processed-at=" + System.currentTimeMillis()
);
consumer.commitSync(Map.of(partition, metadata));

Performance Optimization

Commit frequency trade-offs:
  More frequent commits → fewer duplicates, more overhead
  Less frequent commits → more throughput, more duplicates on failure

Recommendations:
  - Critical data: commit after each batch
  - High throughput: commit every N messages
  - Balanced: commitAsync with retry on error

Monitoring Offset Commit

Key metrics:

kafka.consumer:commit-latency-avg
kafka.consumer:commit-rate
kafka.consumer:failed-commit-rate
kafka.consumer:last-commit-latency

Alerts:

- Commit latency > 100ms → warning
- Failed commit rate > 1% → critical
- No commits for > 5 minutes → critical

Best Practices

✅ commitSync() after batch processing
✅ commitAsync() for high-throughput
✅ Handle commit errors
✅ Idempotent processing for duplicate protection
✅ Monitor commit latency

❌ Auto commit for critical data
❌ Commit before processing
❌ Commit in loop without need
❌ Ignoring commit errors
❌ Without commit health monitoring

Architectural decisions

  1. Sync commit for reliability — critical data
  2. Async commit for throughput — high-volume systems
  3. Error handling is mandatory — retry or alert
  4. Idempotent processing — duplicate protection on retry

Summary for Senior

  • Commit process: consumer → __consumer_offsets → broker acknowledgment
  • Sync vs async — trade-off between reliability and performance
  • Failure scenarios require retry handling
  • Metadata can store additional information
  • Monitoring commit latency is critical for production health

🎯 Interview Cheat Sheet

Must know:

  • Commit offset — saving the number of the last processed message in __consumer_offsets
  • Sync commit: reliable, 5-50ms, blocks; Async commit: <5ms, non-blocking, possible loss
  • Commit AFTER processing = at-least-once; commit BEFORE = data loss on crash
  • Batch commit: every N messages — balance between overhead and duplicates
  • Auto commit commits the received offset, not the processed one — not for production
  • On failure: committed offset is old → messages will be processed again (idempotent processing)
  • Offset metadata allows storing additional info (timestamp, processing state)

Common follow-up questions:

  • When to use sync vs async? — Sync for critical data, async for high-throughput.
  • What happens on async commit failure? — Fallback to sync commit (single retry attempt).
  • How to commit specific offsets?commitSync(Map<TopicPartition, OffsetAndMetadata>).
  • Why is auto commit bad for production? — Commits before processing, no error control.

Red flags (DO NOT say):

  • “Auto commit is standard for production” — commits unprocessed messages
  • “Commit in every message loop is fine” — huge overhead
  • “Async commit without error handling is OK” — offset loss on failure
  • “Offset is committed automatically by the broker” — it’s client-side logic

Related topics:

  • [[14. What is the difference between auto commit and manual commit]]
  • [[12. What is offset in Kafka]]
  • [[24. How to handle errors when reading messages]]
  • [[15. What is rebalancing and when does it happen]]