Питання 29 · Розділ 15

Чи можна читати повідомлення з певної партиції в Kafka

Уявіть ресторан з кількома кухарями (партиції).

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Просте визначення

Так, можна! Kafka Consumer може читати повідомлення з конкретної партиції, а не з усіх одразу. Це робиться через метод assign() замість subscribe().

Аналогія

Уявіть ресторан з кількома кухарями (партиції).

subscribe() — ви сідаєте за загальний стіл і їсте те, що принесе будь-який кухар (офіціант розподіляє).

assign() — ви самі обираєте, від яких кухарів хочете: “Від кухаря 1 і 3, але не від 2”.

Приклад коду

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// assign() — ручне призначення конкретної партиції
TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Читаємо ТІЛЬКИ з partition 0
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Offset: " + record.offset());
        System.out.println("Value: " + record.value());
    }
}

subscribe() vs assign()

// subscribe() — автоматичне призначення через consumer group
consumer.subscribe(List.of("orders"));
// Kafka сама вирішує які партиції дати цьому консьюмеру

// assign() — ручне призначення
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0, p1));
// Ви самі вирішуєте які партиції читати

Коли використовувати

  • Тестування — прочитати повідомлення з конкретної партиції
  • Replay — перечитати повідомлення з певного offset
  • Debugging — investigation проблемної партиції

🟡 Middle Level

subscribe() vs assign() — детальне порівняння

Аспект subscribe() assign()
Consumer group Використовує consumer group Ігнорує consumer group
Rebalancing Автоматичне Немає rebalancing — якщо consumer з assign() впаде, його партиції НЕ будуть переназначені іншим
Offset management Автоматичне (group-based) Ручне (ви самі)
Fault tolerance Висока Низька
Use case Production processing Testing, debugging, special tasks

Seek — довільний доступ

TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Читати з конкретного offset
consumer.seek(partition, 5000);

// Читати з початку
consumer.seekToBeginning(List.of(partition));

// Читати з кінця
consumer.seekToEnd(List.of(partition));

// Читати з timestamp
long timestamp = System.currentTimeMillis() - 3600_000;
Map<TopicPartition, Long> timestamps = Map.of(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
OffsetAndTimestamp oat = offsets.get(partition);
if (oat != null) {
    consumer.seek(partition, oat.offset());
}

Коли assign() НЕ використовувати

  • Production processing — немає fault tolerance
  • Auto-scaling — немає rebalancing
  • Consumer groups — assign() ігнорує groups

🔴 Senior Level

assign() vs subscribe() — internal implementation

subscribe():
  consumer.coordinator  GroupCoordinator protocol
  consumer.subscriptions  dynamic (changes on rebalance)
  consumer.assignment  computed by GroupCoordinator

assign():
  consumer.coordinator  NOT used
  consumer.subscriptions  static (user-defined)
  consumer.assignment  direct user input
  No JoinGroup, no coordinator, no rebalancing

Offset management з assign()

// assign() DOES NOT automatically manage offsets
// Ви повинні самі відстежувати offsets

// Варіант 1: Використати group.id (offsets в __consumer_offsets)
props.put("group.id", "my-consumer");
consumer.assign(List.of(partition));
consumer.commitSync();  // зберігає в __consumer_offsets

// Варіант 2: External offset storage
Map<TopicPartition, Long> savedOffsets = loadOffsetsFromDB();
consumer.assign(List.of(partition));
for (var entry : savedOffsets.entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue());
}

Edge Cases

1. Mixed assign() and subscribe() — undefined behavior:

// НЕ РОБІТЬ ТАК:
consumer.subscribe(List.of("orders"));
consumer.assign(List.of(new TopicPartition("orders", 0)));
// Result: undefined, залежить від версії Kafka

2. assign() з динамічною зміною партицій:

// assign() — static assignment
// Якщо додані нові партиції в topic — assign() їх НЕ бачить
// Рішення: periodic re-assignment
while (running) {
    Set<TopicPartition> current = consumer.assignment();
    Set<TopicPartition> desired = getPartitionsForTopic("orders");
    if (!current.equals(desired)) {
        consumer.assign(desired);
    }
    consumer.poll(Duration.ofMillis(100));
}

3. assign() + manual offset = exactly-once pattern:

props.put("enable.auto.commit", "false");
consumer.assign(List.of(partition));
consumer.seek(partition, loadOffsetFromDB(partition));

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (var record : records) {
        processWithTransaction(record);  // DB write + offset in same transaction
    }
    // Exactly-once semantics!
}

Highload Best Practices

✅ subscribe() для production processing
✅ assign() для testing, debugging, replay
✅ External offset storage для assign()
✅ assign() + seek() для targeted debugging
✅ Monitor lag per partition (hot key detection)
✅ Custom partitioner для even distribution

❌ assign() для production (no fault tolerance)
❌ Mixed subscribe() + assign() (undefined behavior)
❌ assign() без offset management (duplicate processing on restart)
❌ Seek на не-assigned partition (IllegalStateException)

Архітектурні рішення

  1. subscribe() = production default — auto-scaling, fault tolerance, rebalancing
  2. assign() = specialist tool — testing, debugging, replay, migration
  3. assign() + external offset = exactly-once при transactional processing
  4. Hot key detection — partition lag imbalance → partitioner issue

Резюме для Senior

  • assign() — прямий контроль над partition assignment, без consumer group management
  • subscribe() — автоматичне призначення через GroupCoordinator, з rebalancing
  • assign() може використовувати committed offsets якщо group.id встановлено
  • assign() НЕ викликає ConsumerRebalanceListener — ви повністю responsible
  • Mixed subscribe() + assign() = undefined behavior — не робити
  • assign() + seek() = powerful tool для debugging і investigation
  • Hot key detection через partition lag monitoring

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • assign() — ручне призначення конкретних партицій; subscribe() — автоматичне через consumer group
  • assign() ігнорує consumer group: немає rebalancing, немає fault tolerance
  • Якщо consumer з assign() впаде — його партиції НЕ переназначаються, обробка зупиниться
  • Seek: читання з будь-якого offset, початку, кінця, або по timestamp
  • assign() може використовувати committed offsets якщо group.id встановлено
  • Mixed subscribe() + assign() = undefined behavior — не робити
  • Production: subscribe() (fault tolerance); assign() — testing, debugging, replay

Часті уточнюючі запитання:

  • Чи можна коммітити offsets при assign()? — Так, якщо group.id встановлено, але вручну (commitSync).
  • Що буде при seek на не-assigned partition? — IllegalStateException.
  • assign() бачить нові партиції при додаванні? — Ні, потрібен periodic re-assignment.
  • ConsumerRebalanceListener працює з assign()? — Ні, немає rebalancing → немає callbacks.

Червоні прапорці (НЕ говорити):

  • «assign() для production processing» — немає fault tolerance
  • «assign() автоматично ребалансує» — немає rebalancing взагалі
  • «subscribe() і assign() можна змішувати» — undefined behavior
  • «assign() автоматично управляє offsets» — ви повністю responsible

Пов’язані теми:

  • [[5. Що таке Consumer Group]]
  • [[6. Як працює балансування консьюмерів у групі]]
  • [[12. Що таке offset в Kafka]]
  • [[15. Що таке ребаланс і коли він відбувається]]