Как данные распределяются по партициям
Данные распределяются по партициям на основе ключа сообщения:
🟢 Junior Level
Основное правило
Данные распределяются по партициям на основе ключа сообщения:
partition = hash(key) % number_of_partitions
Аналогия
Представьте сортировку писем по почтовым ящикам:
- У каждого письма есть индекс (ключ)
- Почтовая машина смотрит на индекс и кладёт письмо в определённый ящик (партицию)
- Письма с одинаковым индексом всегда попадают в один и тот же ящик
key="user-1" → всегда партиция 2
key="user-2" → всегда партиция 0
key="user-3" → всегда партиция 1
С ключом и без ключа
С ключом — порядок гарантирован:
// user-123 всегда попадёт в одну и ту же партицию
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "user-123", "Order created");
Без ключа — равномерное распределение (sticky partitioning):
// Попадёт в случайную партицию, равномерно
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "Order created");
🟡 Middle Level
Partitioner — как это работает
Kafka использует Partitioner для определения целевой партиции.
Стандартный алгоритм:
Если key != null → MurmurHash2(key) % numPartitions
Если key == null → Sticky Partitioning (батч в одну партицию)
Sticky Partitioning (Kafka 2.4+)
Старый подход (Round-Robin):
msg1 → P0, msg2 → P1, msg3 → P2, msg4 → P0 ...
Каждый message → новая партиция → маленькие батчи
Sticky Partitioning:
msg1-50 → P0 (пока batch не заполнится)
msg51-100 → P2
msg101-150 → P1
Больше батчи → выше throughput → меньше latency
Round-Robin — старый подход (до Kafka 2.4), который использовался при key=null. Заменён на Sticky Partitioning.
Custom Partitioner
public class RegionPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}
props.put("partitioner.class", "com.example.RegionPartitioner");
Hot Partition Problem
Все сообщения с key="all" → одна партиция
Hot partition → bottleneck → 100% CPU на одном брокере
Остальные партиции простаивают
Сравнение стратегий
| Стратегия | Порядок | Throughput | Когда использовать |
|---|---|---|---|
| Key-based | ✅ Внутри партиции | Средний | Нужен порядок по ключу |
| Sticky (no key) | ❌ | Максимальный | Порядок не важен |
| Round-Robin | ❌ | Средний | Равномерное распределение |
| Custom | Зависит | Зависит | Бизнес-логика |
Типичные ошибки
| Ошибка | Последствие | Решение |
|---|---|---|
| Неравномерные ключи | Hot partitions | Добавляйте соль к ключу |
| Без ключа когда нужен порядок | Сообщения перемешаны | Используйте ключ = business entity ID |
| Null key для связанных событий | Порядок потерян | Ключ = aggregate ID |
| Добавление партиций | Распределение ключей меняется | Создавайте новый топик |
🔴 Senior Level
Internal Implementation — Default Partitioner
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
// Sticky partitioning
return stickyPartitionCache.partition(topic, cluster);
}
// MurmurHash 2
int hash = Utils.murmur2(keyBytes);
return Math.abs(hash) % numPartitions;
}
MurmurHash 2:
- Некрептографический, быстрый. Оптимизирован под CPU-инструкции (битовые сдвиги, XOR), не требует специальных криптографических операций. В 5-10x быстрее SHA-256.
- Хорошее равномерное распределение (минимум коллизий)
- 32-bit output →
Math.abs→ modulo
Sticky Partition Cache:
class StickyPartitionCache {
int index; // текущая партиция
int epoch; // epoch для invalidation при изменении метаданных
int partition(String topic, Cluster cluster) {
// Возвращает ту же партицию пока батч не заполнится
// Затем выбирает новую (random среди доступных)
}
}
UniformStickyPartitioner (Kafka 3.3+)
// org.apache.kafka.clients.producer.UniformStickyPartitioner
// Улучшенная версия sticky partitioning
// Гарантирует более равномерное распределение при key == null
// Выбирает следующую партицию через bounded random
// Решает проблему неравномерности при sticky partitioning —
// гарантирует более равномерное распределение через bounded random вместо простого sticking.
props.put("partitioner.class",
"org.apache.kafka.clients.producer.UniformStickyPartitioner");
Consistent Hashing для динамических партиций
Проблема: hash(key) % N меняется при изменении N
Решение: Consistent Hashing (ring-based)
Ring: 2^32 позиций
Каждая партиция занимает несколько позиций на ring
Ключ → hash → ближайшая партиция на ring
При добавлении партиции:
Старый подход: ~50% ключей перемещаются
Consistent hashing: ~1/N ключей перемещаются
Библиотека: ketama, aws-dynamodb-consistent-hash
Edge Cases (3+)
-
Integer Overflow при MurmurHash:
Utils.murmur2()возвращает signed int.Math.abs(Integer.MIN_VALUE)возвращаетInteger.MIN_VALUE(отрицательное). В Kafka это обрабатывается черезhash & 0x7fffffff(битовая маска). Без этого —NegativeArraySizeExceptionили неправильная партиция. -
Key Serialization Overhead: Большой ключ (например, JSON-объект 10KB) сериализуется → MurmurHash обрабатывает 10KB на каждое сообщение → CPU bottleneck. Решение: хешируйте ключ до отправки (SHA-256 → первые 8 байт) или используйте компактный ключ (UUID, Long).
-
Partitioner + Transactional Producer: При использовании
transactional.id, партиционер вызывается до начала транзакции. Если партиционер зависит от метаданных кластера (Custom Partitioner), а метаданные устарели — сообщение может попасть в нелидерную партицию →NotLeaderForPartitionException→ abort транзакции. -
Custom Partitioner + Key Changes: Если кастомный партиционер использует外部ние данные (например, конфигурацию из БД) для маршрутизации, и эти данные меняются — распределение становится недетерминированным. Один и тот же ключ в разных батчах попадает в разные партиции.
-
Sticky Partitioning + Small Batches: При
batch.size=16KBи маленьких сообщениях (100 bytes) батч заполняется за ~160 сообщений. Но приlinger.ms=0батч отправляется мгновенно — sticky партиционер переключается на новую партицию каждое сообщение, превращаясь в round-robin с дополнительным overhead.
Performance Numbers
| Метрика | Значение | Условия |
|---|---|---|
| MurmurHash throughput | ~2 GB/s | Single core, современные CPU |
| Sticky partition overhead | ~10-50 ns/message | Без сериализации ключа |
| Hot partition impact | 3-10x latency increase | 90% трафика в 1 партицию из 10 |
| Optimal batch.size | 256KB | Баланс latency/throughput |
| Compression ratio (similar keys) | 5-10x | lz4, похожие ключи в одной партиции |
Production War Story
Ситуация: Платёжная система с 20 партициями на топик
transactions. Ключ =merchantId. При тестировании на синтетических данных (1000 мерчантов) распределение было равномерным. В production — 3 крупных мерчанта генерировали 70% трафика.Проблема: 3 партиции обрабатывали 70% нагрузки. Consumer lag на этих партициях рос до 500K сообщений. Два других консьюмера простаивали. P99 latency выросла с 10ms до 200ms.
Диагностика:
# JMX: BytesInPerSec per partition kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list localhost:9092 --topic transactions --time -1 # Показал: P7=50GB, P12=45GB, P15=42GB, остальные < 5GBРешение:
- Добавили «соль» к ключу:
key = merchantId + "_" + ThreadLocalRandom.nextInt(10)- Увеличили партиции до 60 (каждый «засоленный» ключ равномернее)
- Ввели
UniformStickyPartitionerдля сообщений без ключа- Настроили мониторинг per-partition throughput в Grafana
Урок: Всегда тестируйте распределение на production-like данных с реальными ключами, а не синтетическими.
Мониторинг (JMX + Burrow)
JMX метрики:
kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,partition=0
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
Burrow:
- Per-partition consumer lag
- Status per consumer group per partition
- Alert на partition-level stall
Highload Best Practices
- Ключ = business entity ID (userId, orderId) — обеспечивает порядок и traceability
- Равномерное распределение критично — проверяйте гистограмму per-partition bytes in
- Sticky partitioning по умолчанию — оптимален для throughput при отсутствии ключа
- Custom партиционер для бизнес-логики — но делайте его детерминированным
- Если вы используете ключи для гарантий порядка – не добавляйте партиции, создавайте новый топик. Если порядок не важен (key=null), добавление партиций безопасно.
- Batch tuning:
batch.size=256KB,linger.ms=10для high-throughput - Compression:
lz4— лучший баланс CPU/compression для partitioned данных - Тестируйте на production-like данных — реальные ключи часто имеют skew
🎯 Шпаргалка для интервью
Обязательно знать:
- С ключом:
MurmurHash2(key) % numPartitions— детерминированное распределение - Без ключа: Sticky Partitioning (Kafka 2.4+) — батч в одну партицию для throughput
- Порядок гарантирован только внутри партиции (один ключ = одна партиция)
- Hot partition — проблема при неравномерных ключах; решается солью к ключу
- При добавлении партиций
hash(key) % Nменяется — порядок нарушается - UniformStickyPartitioner (Kafka 3.3+) — улучшенная версия sticky partitioning
- Custom Partitioner — для бизнес-логики маршрутизации
- Тестируйте распределение на production-like данных, не синтетических
Частые уточняющие вопросы:
- Что такое Sticky Partitioning? — При key=null сообщения группируются в одну партицию пока батч не заполнится, затем переключение.
- Как бороться с hot partition? — Добавить соль к ключу:
key = entityId + "_" + random(N). - Что будет при добавлении партиций? — Те же ключи попадут в другие партиции → порядок нарушен.
- Какой хеш использует Kafka? — MurmurHash 2, быстрый non-cryptographic.
Красные флаги (НЕ говорить):
- «Без ключа порядок гарантирован» — без ключа sticky/random, порядка нет
- «Добавление партиций не влияет на ключи» — влияет,
hash(key) % Nменяется - «Round-Robin используется по умолчанию» — заменён на Sticky Partitioning
- «Ключ может быть любым размера без последствий» — большой ключ = CPU overhead на хеширование
Связанные темы:
- [[2. Что такое партиция (partition) и зачем она нужна]]
- [[4. Что такое ключ сообщения и как он влияет на партиционирование]]
- [[1. Что такое топик (topic) в Kafka]]
- [[21. Что такое batch в Kafka producer]]