Питання 30 · Розділ 15

Як реалізувати фільтрацію повідомлень на стороні консьюмера в Kafka

Kafka не підтримує фільтрацію на стороні брокера. Усі повідомлення з топика доставляються консьюмеру, і він сам вирішує які обробляти, а які ігнорувати.

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Просте визначення

Kafka не підтримує фільтрацію на стороні брокера. Усі повідомлення з топика доставляються консьюмеру, і він сам вирішує які обробляти, а які ігнорувати.

Це архітектурне рішення Kafka — брокер працює як «розумна труба» і не заглядає в зміст повідомлень. Це спрощує брокер і дає максимальну продуктивність.

Аналогія

Уявіть поштову скриньку. Листоноша (Kafka) приносить УСІ листи. Ви (consumer) відкриваєте скриньку і:

  • Реклама — у кошик (ігноруєте)
  • Рахунок за електрику — оплачуєте (обробляєте)
  • Лист від друга — читаєте (обробляєте)

Листоноша не фільтрує — він приносить все. Фільтрація — ваше завдання.

Базовий приклад

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Фільтрація на стороні консьюмера
        if (shouldProcess(record)) {
            process(record);
        }
        // Ігноруємо — але offset все одно рухається
    }
    consumer.commitSync();
}

private boolean shouldProcess(ConsumerRecord<String, String> record) {
    // Приклад: обробляємо тільки замовлення > $100
    Order order = parseOrder(record.value());
    return order.amount() > 100;
}

Коли це важливо

  • Кілька типів подій в одному топику — filter по типу
  • Multi-tenant — filter по tenant ID
  • Feature flags — filter по умовам
  • Data privacy — filter sensitive data

🟡 Middle Level

Підходи до фільтрації

1. Фільтрація в консьюмері (найпростіший)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Order order = parseOrder(record.value());

    if (order.status().equals("COMPLETED") && order.amount() > 100) {
        process(order);
    }
    // Ігноруємо PENDING, CANCELLED, small orders
}

2. Фільтрація через Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");

// Фільтрація
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
      .to("important-orders");  // новий топик

3. Фільтрація через ksqlDB

CREATE STREAM important_orders AS
    SELECT * FROM orders
    WHERE amount > 100 AND status = 'COMPLETED'
    EMIT CHANGES;

4. Фільтрація на стороні продюсера (найкращий варіант)

// Продюсер сам вирішує що писати в який топик
if (order.amount() > 100) {
    producer.send(new ProducerRecord("important-orders", order));
} else {
    producer.send(new ProducerRecord("normal-orders", order));
}

Порівняння підходів

Підход Складність Network overhead CPU overhead Коли використовувати
Consumer filter Низька Високий (всі messages) Низький Simple filters
Kafka Streams Середня Середній Середній Complex transformations
ksqlDB Середня Середній Низький SQL-style filtering
Producer routing Низька Низький Низький Best practice

Коли фільтрація на консьюмері приемлема

  • Прості умови — filter by key, status, amount
  • Low-volume topics — network cost negligible
  • Temporary solution — quick fix before proper routing
  • Multi-purpose consumer — consumer повинен бачити все для auditing

Коли НЕ використовувати consumer-side filtering

  • High-volume topics — wasted network/disk/CPU
  • Data privacy — sensitive data shouldn’t reach consumer at all
  • Regulatory — GDPR, PCI-DSS вимагають data segregation
  • Cost optimization — network/disk waste at scale

🔴 Senior Level

Чому Kafka не підтримує server-side filtering

Архітектурне рішення Kafka:
  Broker = dumb storage + replication
  Intelligence на producer і consumer

Причини:
  1. Broker complexity — filter logic на кожному broker
  2. Performance — scan всіх messages для filter = O(n) per fetch
  3. Flexibility — filter logic application-specific
  4. Consistency — різні consumers, різні filters

Result:
  FetchRequest → returns ALL messages in range
  Consumer → receives ALL → filters locally

KIP-290 і KIP-359 обговорювали server-side filtering, але відхилили через complexity і performance impact.

Consumer filtering і offset management

// КРИТИЧНО: offset management з filtering

// ПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
    }
    // Offset рухається для ВСІХ messages (processed + filtered)
}
consumer.commitSync();  // коммітимо останній offset

// НЕПРАВИЛЬНО:
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
        consumer.commitSync();  // ❌
    }
}
// Filtered messages будуть прочитані знову при restart!

Trade-offs

Аспект Consumer filtering Producer routing Kafka Streams
Network (read) All messages Only needed All messages
Network (write) N/A Per-topic writes Per-topic writes
Consumer CPU Filter + process Process only N/A
Complexity Low Low-Medium Medium-High
Data privacy Low (all data exposed) High (segregated) High

Edge Cases

1. Filtered message dependency:

order-1 (PENDING) → filtered out
order-2 (COMPLETED) → processed, але залежить від order-1
→ Processing fails!
Рішення: Не filter dependent messages

2. Filtering з ordering guarantee:

Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
Filter: status=COMPLETE
→ order-2(NEW) filtered out, але мав бути оброблений ДО order-1(COMPLETE)
→ Filter порушив ordering!

3. High-volume filtering — performance impact:

1M messages/min в topic
Filter: 99% filtered out, 1% processed
Network cost: 1M × 1KB = 1GB/min (99% wasted!)

At this scale:
  Consumer filtering = unacceptable
  Producer routing REQUIRED

Highload Best Practices

✅ Producer routing для high-volume (>100K msg/min)
✅ Consumer filtering для simple, low-volume cases
✅ Kafka Streams для complex filtering + transformation
✅ ksqlDB для SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ Filter BEFORE processing (deserialize only if needed)

❌ Consumer filtering для high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Complex filter logic in consumer (hard to maintain)

Архітектурні рішення

  1. Producer routing > Consumer filtering — move intelligence to producer
  2. Filter ratio as architecture smell — >90% filter = wrong architecture
  3. One topic per message type — natural filtering at source
  4. Kafka Streams as filter layer — intermediate filtering before consumers
  5. Offset management: коммітити для ВСІХ повідомлень (processed + filtered)

Резюме для Senior

  • Kafka НЕ підтримує server-side filtering — architectural decision
  • Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
  • Producer routing = most efficient but least flexible
  • Filter ratio > 90% = strong signal for architecture change
  • Offset management: коммітити для ВСІХ повідомлень (processed + filtered)
  • Ordering can be broken by filtering — be careful with dependent messages
  • At highload (>100K msg/min): consumer filtering unacceptable
  • Monitor filter rate — key metric for architecture health
  • Best practice: one topic per message type, no filtering needed

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • Kafka НЕ підтримує server-side filtering — architectural decision (broker = dumb pipe)
  • Consumer filtering: отримує ВСІ повідомлення, обробляє тільки потрібні; wasted network/disk/CPU
  • Producer routing — best practice: різні типи повідомлень → різні топики
  • Kafka Streams / ksqlDB — проміжна фільтрація перед консьюмерами
  • Offset management: коммітити для ВСІХ повідомлень (processed + filtered), не тільки processed
  • Filter ratio > 90% = strong signal для architecture change (producer routing needed)
  • Фільтрація може порушити ordering для dependent messages

Часті уточнюючі запитання:

  • Чому Kafka не підтримує server-side filtering? — Broker complexity, O(n) scan, performance impact.
  • Filter ratio 95% — це проблема? — Так, 95% network/disk wasted; потрібен producer routing.
  • Як filter не зламає offset? — Коммітити останній offset batch, включаючи filtered messages.
  • Коли filtering на консьюмері приемлема? — Low-volume, simple filters, temporary solution.

Червоні прапорці (НЕ говорити):

  • «Kafka фільтрує повідомлення на брокері» — не фільтрує, architectural decision
  • «Consumer filtering для high-volume — норма» — при >100K msg/min unacceptable
  • «Filter ratio 95% — відмінна практика» — architecture smell, потрібен producer routing
  • «Offset коммітиться тільки для processed messages» — filtered будуть прочитані знову

Пов’язані теми:

  • [[1. Що таке топiк (topic) в Kafka]]
  • [[25. Що таке DLQ (Dead Letter Queue)]]
  • [[3. Як дані розподіляються по партиціях]]
  • [[21. Що таке batch в Kafka producer]]