Вопрос 29 · Раздел 15

Можно ли читать сообщения из определённой партиции в Kafka

Представьте ресторан с несколькими поварами (партиции).

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

Да, можно! Kafka Consumer может читать сообщения из конкретной партиции, а не из всех сразу. Это делается через метод assign() вместо subscribe().

Аналогия

Представьте ресторан с несколькими поварами (партиции).

subscribe() — вы садитесь за общий стол и едите то, что принесёт любой повар (официант распределяет).

assign() — вы сами выбираете, от каких поваров хотите: “От повара 1 и 3, но не от 2”.

Пример кода

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// assign() — ручное назначение конкретной партиции
TopicPartition partition = new TopicPartition("orders", 0);  // topic "orders", partition 0
consumer.assign(List.of(partition));

// Читаем ТОЛЬКО из partition 0
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Offset: " + record.offset());
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());
    }
}

subscribe() vs assign()

// subscribe() — автоматическое назначение через consumer group
consumer.subscribe(List.of("orders"));
// Kafka сама решает какие партиции дать этому консьюмеру

// assign() — ручное назначение
TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0, p1));
// Вы сами решаете какие партиции читать

Когда использовать

  • Тестирование — прочитать сообщения из конкретной партиции
  • Replay — перечитать сообщения с определённого offset
  • Debugging — investigation проблемной партиции
  • Специфичные задачи — когда логика требует конкретную партицию

🟡 Middle Level

subscribe() vs assign() — детальное сравнение

Аспект subscribe() assign()
Consumer group Использует consumer group Игнорирует consumer group
Rebalancing Автоматическое Нет rebalancing — если consumer с assign() упадёт, его партиции НЕ будут переназначены другим консьюмерам. Обработка этих партиций полностью остановится до рестарта.
Offset management Автоматическое (group-based) Ручное (вы сами)
Fault tolerance Высокая (rebalancing при failure) Низкая (manual handling)
Scalability Автоматическая Ручная
Use case Production processing Testing, debugging, special tasks

assign() — как это работает

// 1. Создаём TopicPartition объект
TopicPartition partition = new TopicPartition("orders", 0);

// 2. Назначаем консьюмеру
consumer.assign(List.of(partition));

// 3. (Опционально) Устанавливаем offset
consumer.seek(partition, 1000);           // с offset 1000
consumer.seekToBeginning(List.of(partition));  // с начала
consumer.seekToEnd(List.of(partition));        // с конца

// 4. Читаем
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Только из partition 0

Seek — произвольный доступ

TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Читать с конкретного offset
consumer.seek(partition, 5000);

// Читать с начала
consumer.seekToBeginning(List.of(partition));

// Читать с конца (новые сообщения)
consumer.seekToEnd(List.of(partition));

// Читать с timestamp
long timestamp = System.currentTimeMillis() - 3600_000;  // 1 час назад
Map<TopicPartition, Long> timestamps = Map.of(partition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
OffsetAndTimestamp offsetAndTs = offsets.get(partition);
if (offsetAndTs != null) {
    consumer.seek(partition, offsetAndTs.offset());
}

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
assign() + subscribe() вместе Conflicting behavior Непредсказуемое поведение Использовать что-то одно
Без ручного offset management При рестарте — чтение с начала Duplicate processing Сохранять offsets externally
assign() для production processing Нет rebalancing Consumer failure = data stall subscribe() для production
assign() без consumer group id Group.id игнорируется OK, но confusing group.id не нужен для assign()
Seek без assign() IllegalStateException Exception Сначала assign(), потом seek()
Чтение несуществующей партиции Exception или пустые results Error Проверить partition существует

Когда assign() полезен

✅ Тестирование: проверить конкретную партицию
✅ Replay: перечитать сообщения с начала
✅ Debugging: investigation конкретной партиции
✅ Data migration: copy partition to another cluster
✅ Backfill: process historical data from specific partition
✅ Audit: проверить данные в конкретной партиции

Когда assign() НЕ использовать

  • Production processing — нет fault tolerance
  • Auto-scaling — нет rebalancing
  • Consumer groups — assign() игнорирует groups
  • Когда нужна балансировка — subscribe() делает это автоматически

🔴 Senior Level

Глубокие внутренности

assign() vs subscribe() — internal implementation

// KafkaConsumer internals

subscribe():
  consumer.coordinator  GroupCoordinator protocol
  consumer.subscriptions  dynamic (changes on rebalance)
  consumer.assignment  computed by GroupCoordinator
  
  Flow:
    1. consumer.subscribe(topics)  JoinGroup request
    2. GroupCoordinator  leader election
    3. Leader computes assignment (PartitionAssignor)
    4. SyncGroup response  each consumer gets its partitions
    5. consumer.onPartitionsAssigned() callback
    6. consumer.assignment  computed assignment

assign():
  consumer.coordinator  NOT used
  consumer.subscriptions  static (user-defined)
  consumer.assignment  direct user input
  
  Flow:
    1. consumer.assign(partitions)  direct assignment
    2. No JoinGroup, no coordinator, no rebalancing
    3. consumer.assignment  user-provided partitions
    4. consumer.position(partition)  fetch from __consumer_offsets (if group.id set)
       OR  beginning (if auto.offset.reset = earliest)

Offset management с assign()

// assign() DOES NOT automatically manage offsets
// Вы должны сами отслеживать offsets

// Вариант 1: Использовать group.id (offsets в __consumer_offsets)
props.put("group.id", "my-consumer");
consumer.assign(List.of(partition));
// Offsets всё ещё можно коммитить!
consumer.commitSync();  // сохраняет в __consumer_offsets

// Вариант 2: External offset storage
Map<TopicPartition, Long> savedOffsets = loadOffsetsFromDB();
consumer.assign(List.of(partition));
for (Map.Entry<TopicPartition, Long> entry : savedOffsets.entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue());
}
// After processing:
saveOffsetsToDB(partition, currentOffset);

// Вариант 3: No offset tracking
consumer.assign(List.of(partition));
consumer.seekToBeginning(List.of(partition));
// Каждый запуск — чтение с начала

ConsumerRebalanceListener — только для subscribe()

// subscribe() + ConsumerRebalanceListener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Вызывается при rebalancing
        consumer.commitSync();
        cleanupState(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Вызывается при получении партиций
        initializeState(partitions);
    }
});

// assign() — ConsumerRebalanceListener НЕ ВЫЗЫВАЕТСЯ
// Нет rebalancing → нет callbacks
// Вы полностью ответственны за lifecycle

Trade-offs

Аспект subscribe() assign()
Complexity Низкая (автоматика) Высокая (ручное управление)
Fault tolerance Высокая Низкая
Control Низкий Максимальный
Operational overhead Низкий Высокий
Suitable for production Да Редко
Suitable for testing Нет (нельзя выбрать партицию) Да

Edge Cases

1. assign() и consumer group offsets:

// Сценарий: group.id установлен, assign() используется
props.put("group.id", "order-service");
consumer.assign(List.of(new TopicPartition("orders", 0)));

// При первом poll:
// consumer.position(partition) → проверяет __consumer_offsets
// Если committed offset существует → продолжает с него
// Если нет → auto.offset.reset applies (earliest/latest)

// Это значит assign() МОЖЕТ использовать committed offsets!
// НО: не коммитит автоматически, нужно consumer.commitSync()

2. Mixed assign() and subscribe() — undefined behavior:

// НЕ ДЕЛАЙТЕ ТАК:
consumer.subscribe(List.of("orders"));
consumer.assign(List.of(new TopicPartition("orders", 0)));

// Result:
// subscribe() registration может быть overwritten
// assign() может conflict с group assignment
// Behavior: undefined, зависит от версии Kafka

3. Seek на не-assigned partition:

TopicPartition p0 = new TopicPartition("orders", 0);
TopicPartition p1 = new TopicPartition("orders", 1);
consumer.assign(List.of(p0));

// Exception!
consumer.seek(p1, 1000);
// IllegalStateException: You can only check the position for partitions assigned to this consumer.

4. assign() с динамическим изменением партиций:

// assign() — static assignment
// Если добавлены новые партиции в topic — assign() их НЕ видит

Topic: orders (3 partitions: 0, 1, 2)
consumer.assign(List.of(p0, p1, p2));

// Admin adds partition 3 to topic "orders"
// consumer продолжает читать только p0, p1, p2
// partition 3 — не assigned, не читается

// Решение: periodic re-assignment
while (running) {
    Set<TopicPartition> currentAssignment = consumer.assignment();
    Set<TopicPartition> desiredAssignment = getPartitionsForTopic("orders");
    if (!currentAssignment.equals(desiredAssignment)) {
        consumer.assign(desiredAssignment);
    }
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // process...
}

5. assign() с manual offset commit — exactly-once pattern:

// assign() + manual offset = полный контроль
props.put("group.id", "order-service");
props.put("enable.auto.commit", "false");

TopicPartition partition = new TopicPartition("orders", 0);
consumer.assign(List.of(partition));

// Load saved offset
long savedOffset = loadOffsetFromDB(partition);
consumer.seek(partition, savedOffset);

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processWithTransaction(record);  // DB write + offset in same transaction
    }
    // Offset committed in same DB transaction as processing
    // Exactly-once semantics!
}

6. Reading from specific partition for debugging:

// Investigation: partition 2 has higher lag
TopicPartition suspiciousPartition = new TopicPartition("orders", 2);
consumer.assign(List.of(suspiciousPartition));
consumer.seekToBeginning(List.of(suspiciousPartition));

// Read all messages from partition 2
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) break;
    
    for (ConsumerRecord<String, String> record : records) {
        count++;
        System.out.printf("Offset %d: key=%s, value=%s%n", 
                         record.offset(), record.key(), record.value());
        if (count > 10000) break;  // limit for safety
    }
}
System.out.println("Total messages in partition 2: " + count);

Производительность (production numbers)

Scenario Throughput Latency Reliability
subscribe() (10 consumers, 30 partitions) 100K msg/s 10ms High
assign() (1 consumer, 1 partition) 10K msg/s 5ms Low
assign() (1 consumer, 30 partitions) 100K msg/s 5ms Low
subscribe() (1 consumer, 1 partition) 10K msg/s 10ms High

Production War Story

Ситуация: E-commerce, 10 partitions, 10 consumers (subscribe). Один consumer стал отставать — lag рос только на partition 7.

Investigation:

# Check which consumer has partition 7
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-service

GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
order-service   orders   7          500000          550000          50000  consumer-7-abc123

Root cause: Consumer-7 обрабатывал messages медленнее — у него был slow DB connection.

Debugging с assign():

// assign() к partition 7 для детального анализа
TopicPartition p7 = new TopicPartition("orders", 7);
consumer.assign(List.of(p7));
consumer.seek(p7, 500000);  // start from lag offset

// Анализ messages
Map<String, Integer> keyDistribution = new HashMap<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) break;
    
    for (ConsumerRecord<String, String> record : records) {
        String key = record.key();
        keyDistribution.merge(key, 1, Integer::sum);
    }
}

// Обнаружено: 80% messages имели key "user-42"
// → Все messages для user-42 в одной партиции (hash of key)
// → Один user = hot key = single consumer bottleneck

Решение: Custom partitioner для better key distribution, не rely on default hash.

Monitoring (JMX, Prometheus, Burrow)

JMX метрики (consumer):

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1
  - records-lag-max: max lag across assigned partitions
  - records-lag-avg: average lag
  - bytes-consumed-rate
  - records-consumed-rate

Prometheus:

- alert: KafkaConsumerSinglePartitionLag
  expr: kafka_consumergroup_lag > 50000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Single partition lag > 50K  potential hot key"

- alert: KafkaConsumerPartitionImbalance
  expr: stddev(kafka_consumergroup_lag) by (group) > 10000
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "High variance in partition lag  uneven distribution"

Burrow:

GET /v3/kafka/production/consumer/order-service/status

Показывает lag per partition:
  partition 0: lag 100   OK
  partition 1: lag 200   OK
  ...
  partition 7: lag 50000 ERROR  ← hot partition!

Highload Best Practices

✅ subscribe() для production processing
✅ assign() для testing, debugging, replay
✅ ConsumerRebalanceListener для subscribe()
✅ External offset storage для assign()
✅ assign() + seek() для targeted debugging
✅ Monitor lag per partition (hot key detection)
✅ Custom partitioner для even distribution

❌ assign() для production (no fault tolerance)
❌ Mixed subscribe() + assign() (undefined behavior)
❌ assign() без offset management (duplicate processing on restart)
❌ Seek на не-assigned partition (IllegalStateException)
❌ assign() с auto-scaling (no rebalancing)
❌ Игнорирование partition lag imbalance (hot key detection)

Архитектурные решения

  1. subscribe() = production default — auto-scaling, fault tolerance, rebalancing
  2. assign() = specialist tool — testing, debugging, replay, migration
  3. assign() + external offset = exactly-once при transactional processing
  4. Hot key detection — partition lag imbalance → partitioner issue
  5. Seek + assign = powerful debugging и data investigation tool

Резюме для Senior

  • assign() — прямой контроль над partition assignment, без consumer group management
  • subscribe() — автоматическое назначение через GroupCoordinator, с rebalancing
  • assign() может использовать committed offsets если group.id установлен
  • assign() НЕ вызывает ConsumerRebalanceListener — вы полностью responsible
  • Mixed subscribe() + assign() = undefined behavior — не делать
  • assign() + seek() = powerful tool для debugging и investigation
  • Hot key detection через partition lag monitoring
  • Production processing: subscribe() — almost always correct choice
  • assign() — specialist tool for testing, replay, debugging, migration

🎯 Шпаргалка для интервью

Обязательно знать:

  • assign() — ручное назначение конкретных партиции; subscribe() — автоматическое через consumer group
  • assign() игнорирует consumer group: нет rebalancing, нет fault tolerance
  • Если consumer с assign() упадёт — его партиции НЕ переназначаются, обработка остановится
  • Seek: чтение с любого offset, начала, конца, или по timestamp
  • assign() может использовать committed offsets если group.id установлен
  • Mixed subscribe() + assign() = undefined behavior — не делать
  • Production: subscribe() (fault tolerance); assign() — testing, debugging, replay

Частые уточняющие вопросы:

  • Можно ли коммитить offsets при assign()? — Да, если group.id установлен, но вручную (commitSync).
  • Что будет при seek на не-assigned partition? — IllegalStateException.
  • assign() видит новые партиции при добавлении? — Нет, нужно periodic re-assignment.
  • ConsumerRebalanceListener работает с assign()? — Нет, нет rebalancing → нет callbacks.

Красные флаги (НЕ говорить):

  • «assign() для production processing» — нет fault tolerance
  • «assign() автоматически ребалансирует» — нет rebalancing вообще
  • «subscribe() и assign() можно смешивать» — undefined behavior
  • «assign() автоматически управляет offsets» — вы полностью responsible

Связанные темы:

  • [[5. Что такое Consumer Group]]
  • [[6. Как работает балансировка консьюмеров в группе]]
  • [[12. Что такое offset в Kafka]]
  • [[15. Что такое rebalancing и когда он происходит]]