Вопрос 3 · Раздел 15

Как данные распределяются по партициям

Данные распределяются по партициям на основе ключа сообщения:

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Основное правило

Данные распределяются по партициям на основе ключа сообщения:

partition = hash(key) % number_of_partitions

Аналогия

Представьте сортировку писем по почтовым ящикам:

  • У каждого письма есть индекс (ключ)
  • Почтовая машина смотрит на индекс и кладёт письмо в определённый ящик (партицию)
  • Письма с одинаковым индексом всегда попадают в один и тот же ящик
key="user-1" → всегда партиция 2
key="user-2" → всегда партиция 0
key="user-3" → всегда партиция 1

С ключом и без ключа

С ключом — порядок гарантирован:

// user-123 всегда попадёт в одну и ту же партицию
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "user-123", "Order created");

Без ключа — равномерное распределение (sticky partitioning):

// Попадёт в случайную партицию, равномерно
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "Order created");

🟡 Middle Level

Partitioner — как это работает

Kafka использует Partitioner для определения целевой партиции.

Стандартный алгоритм:

Если key != null → MurmurHash2(key) % numPartitions
Если key == null → Sticky Partitioning (батч в одну партицию)

Sticky Partitioning (Kafka 2.4+)

Старый подход (Round-Robin):
  msg1 → P0, msg2 → P1, msg3 → P2, msg4 → P0 ...
  Каждый message → новая партиция → маленькие батчи

Sticky Partitioning:
  msg1-50 → P0 (пока batch не заполнится)
  msg51-100 → P2
  msg101-150 → P1
  Больше батчи → выше throughput → меньше latency

Round-Robin — старый подход (до Kafka 2.4), который использовался при key=null. Заменён на Sticky Partitioning.

Custom Partitioner

public class RegionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Math.abs(key.hashCode()) % numPartitions;
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}
props.put("partitioner.class", "com.example.RegionPartitioner");

Hot Partition Problem

Все сообщения с key="all" → одна партиция
Hot partition → bottleneck → 100% CPU на одном брокере
Остальные партиции простаивают

Сравнение стратегий

Стратегия Порядок Throughput Когда использовать
Key-based ✅ Внутри партиции Средний Нужен порядок по ключу
Sticky (no key) Максимальный Порядок не важен
Round-Robin Средний Равномерное распределение
Custom Зависит Зависит Бизнес-логика

Типичные ошибки

Ошибка Последствие Решение
Неравномерные ключи Hot partitions Добавляйте соль к ключу
Без ключа когда нужен порядок Сообщения перемешаны Используйте ключ = business entity ID
Null key для связанных событий Порядок потерян Ключ = aggregate ID
Добавление партиций Распределение ключей меняется Создавайте новый топик

🔴 Senior Level

Internal Implementation — Default Partitioner

// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, 
                     Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    if (keyBytes == null) {
        // Sticky partitioning
        return stickyPartitionCache.partition(topic, cluster);
    }
    // MurmurHash 2
    int hash = Utils.murmur2(keyBytes);
    return Math.abs(hash) % numPartitions;
}

MurmurHash 2:

  • Некрептографический, быстрый. Оптимизирован под CPU-инструкции (битовые сдвиги, XOR), не требует специальных криптографических операций. В 5-10x быстрее SHA-256.
  • Хорошее равномерное распределение (минимум коллизий)
  • 32-bit output → Math.abs → modulo

Sticky Partition Cache:

class StickyPartitionCache {
    int index;      // текущая партиция
    int epoch;      // epoch для invalidation при изменении метаданных
    
    int partition(String topic, Cluster cluster) {
        // Возвращает ту же партицию пока батч не заполнится
        // Затем выбирает новую (random среди доступных)
    }
}

UniformStickyPartitioner (Kafka 3.3+)

// org.apache.kafka.clients.producer.UniformStickyPartitioner
// Улучшенная версия sticky partitioning
// Гарантирует более равномерное распределение при key == null
// Выбирает следующую партицию через bounded random
// Решает проблему неравномерности при sticky partitioning —
// гарантирует более равномерное распределение через bounded random вместо простого sticking.
props.put("partitioner.class", 
    "org.apache.kafka.clients.producer.UniformStickyPartitioner");

Consistent Hashing для динамических партиций

Проблема: hash(key) % N меняется при изменении N
Решение: Consistent Hashing (ring-based)

Ring: 2^32 позиций
Каждая партиция занимает несколько позиций на ring
Ключ → hash → ближайшая партиция на ring

При добавлении партиции:
  Старый подход: ~50% ключей перемещаются
  Consistent hashing: ~1/N ключей перемещаются

Библиотека: ketama, aws-dynamodb-consistent-hash

Edge Cases (3+)

  1. Integer Overflow при MurmurHash: Utils.murmur2() возвращает signed int. Math.abs(Integer.MIN_VALUE) возвращает Integer.MIN_VALUE (отрицательное). В Kafka это обрабатывается через hash & 0x7fffffff (битовая маска). Без этого — NegativeArraySizeException или неправильная партиция.

  2. Key Serialization Overhead: Большой ключ (например, JSON-объект 10KB) сериализуется → MurmurHash обрабатывает 10KB на каждое сообщение → CPU bottleneck. Решение: хешируйте ключ до отправки (SHA-256 → первые 8 байт) или используйте компактный ключ (UUID, Long).

  3. Partitioner + Transactional Producer: При использовании transactional.id, партиционер вызывается до начала транзакции. Если партиционер зависит от метаданных кластера (Custom Partitioner), а метаданные устарели — сообщение может попасть в нелидерную партицию → NotLeaderForPartitionException → abort транзакции.

  4. Custom Partitioner + Key Changes: Если кастомный партиционер использует外部ние данные (например, конфигурацию из БД) для маршрутизации, и эти данные меняются — распределение становится недетерминированным. Один и тот же ключ в разных батчах попадает в разные партиции.

  5. Sticky Partitioning + Small Batches: При batch.size=16KB и маленьких сообщениях (100 bytes) батч заполняется за ~160 сообщений. Но при linger.ms=0 батч отправляется мгновенно — sticky партиционер переключается на новую партицию каждое сообщение, превращаясь в round-robin с дополнительным overhead.

Performance Numbers

Метрика Значение Условия
MurmurHash throughput ~2 GB/s Single core, современные CPU
Sticky partition overhead ~10-50 ns/message Без сериализации ключа
Hot partition impact 3-10x latency increase 90% трафика в 1 партицию из 10
Optimal batch.size 256KB Баланс latency/throughput
Compression ratio (similar keys) 5-10x lz4, похожие ключи в одной партиции

Production War Story

Ситуация: Платёжная система с 20 партициями на топик transactions. Ключ = merchantId. При тестировании на синтетических данных (1000 мерчантов) распределение было равномерным. В production — 3 крупных мерчанта генерировали 70% трафика.

Проблема: 3 партиции обрабатывали 70% нагрузки. Consumer lag на этих партициях рос до 500K сообщений. Два других консьюмера простаивали. P99 latency выросла с 10ms до 200ms.

Диагностика:

# JMX: BytesInPerSec per partition
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 --topic transactions --time -1
# Показал: P7=50GB, P12=45GB, P15=42GB, остальные < 5GB

Решение:

  1. Добавили «соль» к ключу: key = merchantId + "_" + ThreadLocalRandom.nextInt(10)
  2. Увеличили партиции до 60 (каждый «засоленный» ключ равномернее)
  3. Ввели UniformStickyPartitioner для сообщений без ключа
  4. Настроили мониторинг per-partition throughput в Grafana

Урок: Всегда тестируйте распределение на production-like данных с реальными ключами, а не синтетическими.

Мониторинг (JMX + Burrow)

JMX метрики:

kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce

Burrow:

  • Per-partition consumer lag
  • Status per consumer group per partition
  • Alert на partition-level stall

Highload Best Practices

  1. Ключ = business entity ID (userId, orderId) — обеспечивает порядок и traceability
  2. Равномерное распределение критично — проверяйте гистограмму per-partition bytes in
  3. Sticky partitioning по умолчанию — оптимален для throughput при отсутствии ключа
  4. Custom партиционер для бизнес-логики — но делайте его детерминированным
  5. Если вы используете ключи для гарантий порядка – не добавляйте партиции, создавайте новый топик. Если порядок не важен (key=null), добавление партиций безопасно.
  6. Batch tuning: batch.size=256KB, linger.ms=10 для high-throughput
  7. Compression: lz4 — лучший баланс CPU/compression для partitioned данных
  8. Тестируйте на production-like данных — реальные ключи часто имеют skew

🎯 Шпаргалка для интервью

Обязательно знать:

  • С ключом: MurmurHash2(key) % numPartitions — детерминированное распределение
  • Без ключа: Sticky Partitioning (Kafka 2.4+) — батч в одну партицию для throughput
  • Порядок гарантирован только внутри партиции (один ключ = одна партиция)
  • Hot partition — проблема при неравномерных ключах; решается солью к ключу
  • При добавлении партиций hash(key) % N меняется — порядок нарушается
  • UniformStickyPartitioner (Kafka 3.3+) — улучшенная версия sticky partitioning
  • Custom Partitioner — для бизнес-логики маршрутизации
  • Тестируйте распределение на production-like данных, не синтетических

Частые уточняющие вопросы:

  • Что такое Sticky Partitioning? — При key=null сообщения группируются в одну партицию пока батч не заполнится, затем переключение.
  • Как бороться с hot partition? — Добавить соль к ключу: key = entityId + "_" + random(N).
  • Что будет при добавлении партиций? — Те же ключи попадут в другие партиции → порядок нарушен.
  • Какой хеш использует Kafka? — MurmurHash 2, быстрый non-cryptographic.

Красные флаги (НЕ говорить):

  • «Без ключа порядок гарантирован» — без ключа sticky/random, порядка нет
  • «Добавление партиций не влияет на ключи» — влияет, hash(key) % N меняется
  • «Round-Robin используется по умолчанию» — заменён на Sticky Partitioning
  • «Ключ может быть любым размера без последствий» — большой ключ = CPU overhead на хеширование

Связанные темы:

  • [[2. Что такое партиция (partition) и зачем она нужна]]
  • [[4. Что такое ключ сообщения и как он влияет на партиционирование]]
  • [[1. Что такое топик (topic) в Kafka]]
  • [[21. Что такое batch в Kafka producer]]