Питання 3 · Розділ 15

Як дані розподіляються по партиціях

Дані розподіляються по партиціях на основі ключа повідомлення:

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Основне правило

Дані розподіляються по партиціях на основі ключа повідомлення:

partition = hash(key) % number_of_partitions

Аналогія

Уявіть сортування листів по поштових скриньках:

  • У кожного листа є індекс (ключ)
  • Поштова машина дивиться на індекс і кладе лист у певну скриньку (партицію)
  • Листи з однаковим індексом завжди потрапляють в одну й ту саму скриньку
key="user-1" → завжди партиція 2
key="user-2" → завжди партиція 0
key="user-3" → завжди партиція 1

З ключем і без ключа

З ключем — порядок гарантовано:

// user-123 завжди потрапить в одну й ту саму партицію
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "user-123", "Order created");

Без ключа — рівномірний розподіл (sticky partitioning):

// Потрапить у випадкову партицію, рівномірно
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "Order created");

🟡 Middle Level

Partitioner — як це працює

Kafka використовує Partitioner для визначення цільової партиції.

Стандартний алгоритм:

Якщо key != null → MurmurHash2(key) % numPartitions
Якщо key == null → Sticky Partitioning (батч в одну партицію)

Sticky Partitioning (Kafka 2.4+)

Старий підхід (Round-Robin):
  msg1 → P0, msg2 → P1, msg3 → P2, msg4 → P0 ...
  Кожне message → нова партиція → маленькі батчі

Sticky Partitioning:
  msg1-50 → P0 (поки batch не заповниться)
  msg51-100 → P2
  msg101-150 → P1
  Більше батчі → вища throughput → менша latency

Round-Robin — старий підхід (до Kafka 2.4), який використовувався при key=null. Замінений на Sticky Partitioning.

Custom Partitioner

public class RegionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Math.abs(key.hashCode()) % numPartitions;
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}
props.put("partitioner.class", "com.example.RegionPartitioner");

Hot Partition Problem

Всі повідомлення з key="all" → одна партиція
Hot partition → bottleneck → 100% CPU на одному брокері
Інші партиції простоюють

Порівняння стратегій

Стратегія Порядок Throughput Коли використовувати
Key-based ✅ Всередині партиції Середній Потрібен порядок по ключу
Sticky (no key) Максимальний Порядок не важливий
Round-Robin Середній Рівномірний розподіл
Custom Залежить Залежить Бізнес-логіка

Типові помилки

Помилка Наслідок Рішення
Нерівномірні ключі Hot partitions Додавайте сіль до ключа
Без ключа коли потрібен порядок Повідомлення перемішані Використовуйте ключ = business entity ID
Null key для пов’язаних подій Порядок втрачено Ключ = aggregate ID
Додавання партицій Розподіл ключів змінюється Створюйте новий топик

🔴 Senior Level

Internal Implementation — Default Partitioner

// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes,
                     Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    if (keyBytes == null) {
        // Sticky partitioning
        return stickyPartitionCache.partition(topic, cluster);
    }
    // MurmurHash 2
    int hash = Utils.murmur2(keyBytes);
    return Math.abs(hash) % numPartitions;
}

MurmurHash 2:

  • Некриптографічний, швидкий. Оптимізований під CPU-інструкції (бітові зсуви, XOR), не потребує спеціальних криптографічних операцій. У 5-10x швидший за SHA-256.
  • Гарний рівномірний розподіл (мінімум колізій)
  • 32-bit output → Math.abs → modulo

Sticky Partition Cache:

class StickyPartitionCache {
    int index;      // поточна партиція
    int epoch;      // epoch для invalidation при зміні метаданих

    int partition(String topic, Cluster cluster) {
        // Повертає ту саму партицію поки batch не заповниться
        // Потім вибирає нову (random серед доступних)
    }
}

UniformStickyPartitioner (Kafka 3.3+)

// org.apache.kafka.clients.producer.UniformStickyPartitioner
// Покращена версія sticky partitioning
// Гарантує більш рівномірний розподіл при key == null
// Вибирає наступну партицію через bounded random
// Вирішує проблему нерівномірності при sticky partitioning —
// гарантує більш рівномірний розподіл через bounded random замість простого sticking.
props.put("partitioner.class",
    "org.apache.kafka.clients.producer.UniformStickyPartitioner");

Consistent Hashing для динамічних партицій

Проблема: hash(key) % N змінюється при зміні N
Рішення: Consistent Hashing (ring-based)

Ring: 2^32 позицій
Кожна партиція займає кілька позицій на ring
Ключ → hash → найближча партиція на ring

При додаванні партиції:
  Старий підхід: ~50% ключів переміщуються
  Consistent hashing: ~1/N ключів переміщуються

Бібліотека: ketama, aws-dynamodb-consistent-hash

Edge Cases (3+)

  1. Integer Overflow при MurmurHash: Utils.murmur2() повертає signed int. Math.abs(Integer.MIN_VALUE) повертає Integer.MIN_VALUE (від’ємне). В Kafka це обробляється через hash & 0x7fffffff (бітова маска). Без цього — NegativeArraySizeException або неправильна партиція.

  2. Key Serialization Overhead: Великий ключ (наприклад, JSON-об’єкт 10KB) серіалізується → MurmurHash обробляє 10KB на кожне повідомлення → CPU bottleneck. Рішення: хешуйте ключ до відправки (SHA-256 → перші 8 байт) або використовуйте компактний ключ (UUID, Long).

  3. Partitioner + Transactional Producer: При використанні transactional.id, партиціонер викликається до початку транзакції. Якщо партиціонер залежить від метаданих кластера (Custom Partitioner), а метадані застарілі — повідомлення може потрапити в нелідерну партицію → NotLeaderForPartitionException → abort транзакції.

  4. Custom Partitioner + Key Changes: Якщо кастомний партиціонер використовує зовнішні дані (наприклад, конфігурацію з БД) для маршрутизації, і ці дані змінюються — розподіл стає недетермінованим. Один і той самий ключ у різних батчах потрапляє в різні партиції.

  5. Sticky Partitioning + Small Batches: При batch.size=16KB і маленьких повідомленнях (100 bytes) батч заповнюється за ~160 повідомлень. Але при linger.ms=0 батч відправляється миттєво — sticky партиціонер перемикається на нову партицію кожне повідомлення, перетворюючись на round-robin з додатковим overhead.

Performance Numbers

Метрика Значення Умови
MurmurHash throughput ~2 GB/s Single core, сучасні CPU
Sticky partition overhead ~10-50 ns/message Без серіалізації ключа
Hot partition impact 3-10x latency increase 90% трафіку в 1 партицію з 10
Optimal batch.size 256KB Баланс latency/throughput
Compression ratio (similar keys) 5-10x lz4, схожі ключі в одній партиції

Production War Story

Ситуація: Платіжна система з 20 партиціями на топик transactions. Ключ = merchantId. При тестуванні на синтетичних даних (1000 мерчантів) розподіл був рівномірним. В production — 3 великих мерчанти генерували 70% трафіку.

Проблема: 3 партиції обробляли 70% навантаження. Consumer lag на цих партиціях зростав до 500K повідомлень. Два інших консьюмери простоювали. P99 latency виросла з 10ms до 200ms.

Діагностика:

# JMX: BytesInPerSec per partition
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 --topic transactions --time -1
# Показав: P7=50GB, P12=45GB, P15=42GB, решта < 5GB

Рішення:

  1. Додали «сіль» до ключа: key = merchantId + "_" + ThreadLocalRandom.nextInt(10)
  2. Збільшили партиції до 60 (кожен «засолений» ключ рівномірніше)
  3. Ввели UniformStickyPartitioner для повідомлень без ключа
  4. Налаштували моніторинг per-partition throughput в Grafana

Урок: Завжди тестуйте розподіл на production-like даних з реальними ключами, а не синтетичними.

Моніторинг (JMX + Burrow)

JMX метрики:

kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce

Burrow:

  • Per-partition consumer lag
  • Status per consumer group per partition
  • Alert на partition-level stall

Highload Best Practices

  1. Ключ = business entity ID (userId, orderId) — забезпечує порядок і traceability
  2. Рівномірний розподіл критично — перевіряйте гістограму per-partition bytes in
  3. Sticky partitioning за замовчуванням — оптимален для throughput за відсутності ключа
  4. Custom партиціонер для бізнес-логіки — але робіть його детермінованим
  5. Якщо ви використовуєте ключі для гарантій порядку – не додавайте партиції, створюйте новий топик. Якщо порядок не важливий (key=null), додавання партицій безпечне.
  6. Batch tuning: batch.size=256KB, linger.ms=10 для high-throughput
  7. Compression: lz4 — найкращий баланс CPU/compression для partitioned даних
  8. Тестуйте на production-like даних — реальні ключі часто мають skew

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • З ключем: MurmurHash2(key) % numPartitions — детермінований розподіл
  • Без ключа: Sticky Partitioning (Kafka 2.4+) — батч в одну партицію для throughput
  • Порядок гарантовано тільки всередині партиції (один ключ = одна партиція)
  • Hot partition — проблема при нерівномірних ключах; вирішується сіллю до ключа
  • При додаванні партицій hash(key) % N змінюється — порядок порушується
  • UniformStickyPartitioner (Kafka 3.3+) — покращена версія sticky partitioning
  • Custom Partitioner — для бізнес-логіки маршрутизації
  • Тестуйте розподіл на production-like даних, не синтетичних

Часті уточнюючі запитання:

  • Що таке Sticky Partitioning? — При key=null повідомлення групуються в одну партицію поки batch не заповниться, потім перемикання.
  • Як боротися з hot partition? — Додати сіль до ключа: key = entityId + "_" + random(N).
  • Що буде при додаванні партицій? — Ті самі ключі потраплять в інші партиції → порядок порушено.
  • Який хеш використовує Kafka? — MurmurHash 2, швидкий non-cryptographic.

Червоні прапорці (НЕ говорити):

  • «Без ключа порядок гарантовано» — без ключа sticky/random, порядку немає
  • «Додавання партицій не впливає на ключі» — впливає, hash(key) % N змінюється
  • «Round-Robin використовується за замовчуванням» — замінений на Sticky Partitioning
  • «Ключ може бути будь-якого розміру без наслідків» — великий ключ = CPU overhead на хешування

Пов’язані теми:

  • [[2. Що таке партиція (partition) і навіщо вона потрібна]]
  • [[4. Що таке ключ повідомлення і як він впливає на партиціонування]]
  • [[1. Що таке топiк (topic) в Kafka]]
  • [[21. Що таке batch в Kafka producer]]