Чи можна читати повідомлення з певної партиції в Kafka
Уявіть ресторан з кількома кухарями (партиції).
🟢 Junior Level
Просте визначення
Так, можна! Kafka Consumer може читати повідомлення з конкретної партиції, а не з усіх одразу. Це робиться через метод assign() замість subscribe().
Аналогія
Уявіть ресторан з кількома кухарями (партиції).
subscribe() — ви сідаєте за загальний стіл і їсте те, що принесе будь-який кухар (офіціант розподіляє).
assign() — ви самі обираєте, від яких кухарів хочете: “Від кухаря 1 і 3, але не від 2”.
Приклад коду
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// assign() — ручне призначення конкретної партиції
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
// Читаємо ТІЛЬКИ з partition 0
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Offset: " + record.offset());
System.out.println("Value: " + record.value());
}
}
subscribe() vs assign()
// subscribe() — автоматичне призначення через consumer group
consumer.subscribe(List.of("orders"));
// Kafka сама вирішує які партиції дати цьому консьюмеру
// assign() — ручне призначення
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0, p1));
// Ви самі вирішуєте які партиції читати
Коли використовувати
- Тестування — прочитати повідомлення з конкретної партиції
- Replay — перечитати повідомлення з певного offset
- Debugging — investigation проблемної партиції
🟡 Middle Level
subscribe() vs assign() — детальне порівняння
| Аспект | subscribe() | assign() |
|---|---|---|
| Consumer group | Використовує consumer group | Ігнорує consumer group |
| Rebalancing | Автоматичне | Немає rebalancing — якщо consumer з assign() впаде, його партиції НЕ будуть переназначені іншим |
| Offset management | Автоматичне (group-based) | Ручне (ви самі) |
| Fault tolerance | Висока | Низька |
| Use case | Production processing | Testing, debugging, special tasks |
Seek — довільний доступ
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));
// Читати з конкретного offset
consumer.seek(partition, 5000);
// Читати з початку
consumer.seekToBeginning(List.of(partition));
// Читати з кінця
consumer.seekToEnd(List.of(partition));
// Читати з timestamp
long timestamp = System.currentTimeMillis() - 3600_000;
Map<TopicPartition, Long> timestamps = Map.of(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
OffsetAndTimestamp oat = offsets.get(partition);
if (oat != null) {
consumer.seek(partition, oat.offset());
}
Коли assign() НЕ використовувати
- Production processing — немає fault tolerance
- Auto-scaling — немає rebalancing
- Consumer groups — assign() ігнорує groups
🔴 Senior Level
assign() vs subscribe() — internal implementation
subscribe():
consumer.coordinator — GroupCoordinator protocol
consumer.subscriptions — dynamic (changes on rebalance)
consumer.assignment — computed by GroupCoordinator
assign():
consumer.coordinator — NOT used
consumer.subscriptions — static (user-defined)
consumer.assignment ← direct user input
No JoinGroup, no coordinator, no rebalancing
Offset management з assign()
// assign() DOES NOT automatically manage offsets
// Ви повинні самі відстежувати offsets
// Варіант 1: Використати group.id (offsets в __consumer_offsets)
props.put("group.id", "my-consumer");
consumer.assign(List.of(partition));
consumer.commitSync(); // зберігає в __consumer_offsets
// Варіант 2: External offset storage
Map<TopicPartition, Long> savedOffsets = loadOffsetsFromDB();
consumer.assign(List.of(partition));
for (var entry : savedOffsets.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());
}
Edge Cases
1. Mixed assign() and subscribe() — undefined behavior:
// НЕ РОБІТЬ ТАК:
consumer.subscribe(List.of("orders"));
consumer.assign(List.of(new TopicPartition("orders", 0)));
// Result: undefined, залежить від версії Kafka
2. assign() з динамічною зміною партицій:
// assign() — static assignment
// Якщо додані нові партиції в topic — assign() їх НЕ бачить
// Рішення: periodic re-assignment
while (running) {
Set<TopicPartition> current = consumer.assignment();
Set<TopicPartition> desired = getPartitionsForTopic("orders");
if (!current.equals(desired)) {
consumer.assign(desired);
}
consumer.poll(Duration.ofMillis(100));
}
3. assign() + manual offset = exactly-once pattern:
props.put("enable.auto.commit", "false");
consumer.assign(List.of(partition));
consumer.seek(partition, loadOffsetFromDB(partition));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
processWithTransaction(record); // DB write + offset in same transaction
}
// Exactly-once semantics!
}
Highload Best Practices
✅ subscribe() для production processing
✅ assign() для testing, debugging, replay
✅ External offset storage для assign()
✅ assign() + seek() для targeted debugging
✅ Monitor lag per partition (hot key detection)
✅ Custom partitioner для even distribution
❌ assign() для production (no fault tolerance)
❌ Mixed subscribe() + assign() (undefined behavior)
❌ assign() без offset management (duplicate processing on restart)
❌ Seek на не-assigned partition (IllegalStateException)
Архітектурні рішення
- subscribe() = production default — auto-scaling, fault tolerance, rebalancing
- assign() = specialist tool — testing, debugging, replay, migration
- assign() + external offset = exactly-once при transactional processing
- Hot key detection — partition lag imbalance → partitioner issue
Резюме для Senior
- assign() — прямий контроль над partition assignment, без consumer group management
- subscribe() — автоматичне призначення через GroupCoordinator, з rebalancing
- assign() може використовувати committed offsets якщо group.id встановлено
- assign() НЕ викликає ConsumerRebalanceListener — ви повністю responsible
- Mixed subscribe() + assign() = undefined behavior — не робити
- assign() + seek() = powerful tool для debugging і investigation
- Hot key detection через partition lag monitoring
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
assign()— ручне призначення конкретних партицій;subscribe()— автоматичне через consumer groupassign()ігнорує consumer group: немає rebalancing, немає fault tolerance- Якщо consumer з assign() впаде — його партиції НЕ переназначаються, обробка зупиниться
- Seek: читання з будь-якого offset, початку, кінця, або по timestamp
assign()може використовувати committed offsets якщоgroup.idвстановлено- Mixed
subscribe()+assign()= undefined behavior — не робити - Production: subscribe() (fault tolerance); assign() — testing, debugging, replay
Часті уточнюючі запитання:
- Чи можна коммітити offsets при assign()? — Так, якщо group.id встановлено, але вручну (commitSync).
- Що буде при seek на не-assigned partition? — IllegalStateException.
- assign() бачить нові партиції при додаванні? — Ні, потрібен periodic re-assignment.
- ConsumerRebalanceListener працює з assign()? — Ні, немає rebalancing → немає callbacks.
Червоні прапорці (НЕ говорити):
- «assign() для production processing» — немає fault tolerance
- «assign() автоматично ребалансує» — немає rebalancing взагалі
- «subscribe() і assign() можна змішувати» — undefined behavior
- «assign() автоматично управляє offsets» — ви повністю responsible
Пов’язані теми:
- [[5. Що таке Consumer Group]]
- [[6. Як працює балансування консьюмерів у групі]]
- [[12. Що таке offset в Kafka]]
- [[15. Що таке ребаланс і коли він відбувається]]