Question 3 · Section 15

How is data distributed across partitions

Data is distributed across partitions based on the message key:

Language versions: English Russian Ukrainian

🟢 Junior Level

Basic rule

Data is distributed across partitions based on the message key:

partition = hash(key) % number_of_partitions

Analogy

Think of sorting mail into mailboxes:

  • Each letter has a zip code (key)
  • The mail machine looks at the code and puts the letter in a specific mailbox (partition)
  • Letters with the same code always go to the same mailbox
key="user-1" → always partition 2
key="user-2" → always partition 0
key="user-3" → always partition 1

With key and without key

With key — ordering is guaranteed:

// user-123 always lands in the same partition
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "user-123", "Order created");

Without key — even distribution (sticky partitioning):

// Will land in a random partition, evenly distributed
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "Order created");

🟡 Middle Level

Partitioner — how it works

Kafka uses a Partitioner to determine the target partition.

Standard algorithm:

If key != null → MurmurHash2(key) % numPartitions
If key == null → Sticky Partitioning (batch to one partition)

Sticky Partitioning (Kafka 2.4+)

Old approach (Round-Robin):
  msg1 → P0, msg2 → P1, msg3 → P2, msg4 → P0 ...
  Each message → new partition → small batches

Sticky Partitioning:
  msg1-50 → P0 (until batch is full)
  msg51-100 → P2
  msg101-150 → P1
  Larger batches → higher throughput → lower latency

Round-Robin — the old approach (before Kafka 2.4), used when key=null. Replaced by Sticky Partitioning.

Custom Partitioner

public class RegionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Math.abs(key.hashCode()) % numPartitions;
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}
props.put("partitioner.class", "com.example.RegionPartitioner");

Hot Partition Problem

All messages with key="all" → one partition
Hot partition → bottleneck → 100% CPU on one broker
Other partitions are idle

Strategy comparison

Strategy Ordering Throughput When to use
Key-based ✅ Within partition Medium Need ordering by key
Sticky (no key) Maximum Ordering not important
Round-Robin Medium Even distribution
Custom Depends Depends Business logic

Common mistakes

Mistake Consequence Solution
Uneven keys Hot partitions Add salt to keys
No key when ordering is needed Messages are mixed up Use key = business entity ID
Null key for related events Ordering lost Key = aggregate ID
Adding partitions Key distribution changes Create a new topic

🔴 Senior Level

Internal Implementation — Default Partitioner

// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    if (keyBytes == null) {
        // Sticky partitioning
        return stickyPartitionCache.partition(topic, cluster);
    }
    // MurmurHash 2
    int hash = Utils.murmur2(keyBytes);
    return Math.abs(hash) % numPartitions;
}

MurmurHash 2:

  • Non-cryptographic, fast. Optimized for CPU instructions (bit shifts, XOR), doesn’t require special cryptographic operations. 5-10x faster than SHA-256.
  • Good uniform distribution (minimum collisions)
  • 32-bit output → Math.abs → modulo

Sticky Partition Cache:

class StickyPartitionCache {
    int index;      // current partition
    int epoch;      // epoch for invalidation on metadata change

    int partition(String topic, Cluster cluster) {
        // Returns the same partition until batch is full
        // Then picks a new one (random among available)
    }
}

UniformStickyPartitioner (Kafka 3.3+)

// org.apache.kafka.clients.producer.UniformStickyPartitioner
// Improved version of sticky partitioning
// Guarantees more even distribution when key == null
// Picks the next partition via bounded random
// Solves the unevenness problem of sticky partitioning —
// guarantees more uniform distribution via bounded random instead of simple sticking.
props.put("partitioner.class",
    "org.apache.kafka.clients.producer.UniformStickyPartitioner");

Consistent Hashing for dynamic partitions

Problem: hash(key) % N changes when N changes
Solution: Consistent Hashing (ring-based)

Ring: 2^32 positions
Each partition occupies several positions on the ring
Key → hash → nearest partition on ring

When adding a partition:
  Old approach: ~50% of keys move
  Consistent hashing: ~1/N of keys move

Libraries: ketama, aws-dynamodb-consistent-hash

Edge Cases (3+)

  1. Integer Overflow with MurmurHash: Utils.murmur2() returns a signed int. Math.abs(Integer.MIN_VALUE) returns Integer.MIN_VALUE (negative). Kafka handles this via hash & 0x7fffffff (bitmask). Without this — NegativeArraySizeException or wrong partition.

  2. Key Serialization Overhead: Large key (e.g., JSON object 10KB) gets serialized → MurmurHash processes 10KB per message → CPU bottleneck. Solution: hash the key before sending (SHA-256 → first 8 bytes) or use a compact key (UUID, Long).

  3. Partitioner + Transactional Producer: When using transactional.id, the partitioner is called before the transaction begins. If the partitioner depends on cluster metadata (Custom Partitioner) and metadata is stale — message may land in a non-leader partition → NotLeaderForPartitionException → transaction abort.

  4. Custom Partitioner + Key Changes: If a custom partitioner uses external data (e.g., config from a database) for routing, and that data changes — distribution becomes non-deterministic. The same key in different batches lands in different partitions.

  5. Sticky Partitioning + Small Batches: With batch.size=16KB and small messages (100 bytes), batch fills up in ~160 messages. But with linger.ms=0, batch is sent instantly — sticky partitioner switches to a new partition every message, effectively becoming round-robin with extra overhead.

Performance Numbers

Metric Value Conditions
MurmurHash throughput ~2 GB/s Single core, modern CPU
Sticky partition overhead ~10-50 ns/message Without key serialization
Hot partition impact 3-10x latency increase 90% traffic in 1 partition out of 10
Optimal batch.size 256KB Latency/throughput balance
Compression ratio (similar keys) 5-10x lz4, similar keys in same partition

Production War Story

Situation: Payment system with 20 partitions on topic transactions. Key = merchantId. During testing with synthetic data (1000 merchants), distribution was even. In production — 3 large merchants generated 70% of traffic.

Problem: 3 partitions handled 70% of the load. Consumer lag on these partitions grew to 500K messages. Two other consumers were idle. P99 latency grew from 10ms to 200ms.

Diagnosis:

# JMX: BytesInPerSec per partition
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 --topic transactions --time -1
# Showed: P7=50GB, P12=45GB, P15=42GB, rest < 5GB

Solution:

  1. Added “salt” to keys: key = merchantId + "_" + ThreadLocalRandom.nextInt(10)
  2. Increased partitions to 60 (each “salted” key distributes more evenly)
  3. Introduced UniformStickyPartitioner for messages without keys
  4. Set up per-partition throughput monitoring in Grafana

Lesson: Always test distribution on production-like data with real keys, not synthetic ones.

Monitoring (JMX + Burrow)

JMX metrics:

kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce

Burrow:

  • Per-partition consumer lag
  • Status per consumer group per partition
  • Alert on partition-level stall

Highload Best Practices

  1. Key = business entity ID (userId, orderId) — ensures ordering and traceability
  2. Even distribution is critical — check per-partition bytes in histogram
  3. Sticky partitioning by default — optimal for throughput when no key
  4. Custom partitioner for business logic — but make it deterministic
  5. If you use keys for ordering guarantees — do not add partitions, create a new topic. If ordering doesn’t matter (key=null), adding partitions is safe.
  6. Batch tuning: batch.size=256KB, linger.ms=10 for high-throughput
  7. Compression: lz4 — best CPU/compression balance for partitioned data
  8. Test on production-like data — real keys often have skew

🎯 Interview Cheat Sheet

Must know:

  • With key: MurmurHash2(key) % numPartitions — deterministic distribution
  • Without key: Sticky Partitioning (Kafka 2.4+) — batching to one partition for throughput
  • Ordering is guaranteed only within a partition (one key = one partition)
  • Hot partition — problem with uneven keys; solved by salting the key
  • When adding partitions, hash(key) % N changes — ordering broken
  • UniformStickyPartitioner (Kafka 3.3+) — improved sticky partitioning
  • Custom Partitioner — for business routing logic
  • Test distribution on production-like data, not synthetic

Common follow-up questions:

  • What is Sticky Partitioning? — With key=null, messages batch to one partition until full, then switch.
  • How to deal with hot partition? — Salt the key: key = entityId + "_" + random(N).
  • What happens when you add partitions? — Same keys land in different partitions → ordering broken.
  • What hash does Kafka use? — MurmurHash 2, fast non-cryptographic.

Red flags (DO NOT say):

  • “Without key, ordering is guaranteed” — without key it’s sticky/random, no ordering
  • “Adding partitions doesn’t affect keys” — it does, hash(key) % N changes
  • “Round-Robin is used by default” — replaced by Sticky Partitioning
  • “Key can be any size without consequences” — large key = CPU overhead on hashing

Related topics:

  • [[2. What is partition and why is it needed]]
  • [[4. What is message key and how does it affect partitioning]]
  • [[1. What is topic in Kafka]]
  • [[21. What is batch in Kafka producer]]