Як реалізувати фільтрацію повідомлень на стороні консьюмера в Kafka
Kafka не підтримує фільтрацію на стороні брокера. Усі повідомлення з топика доставляються консьюмеру, і він сам вирішує які обробляти, а які ігнорувати.
🟢 Junior Level
Просте визначення
Kafka не підтримує фільтрацію на стороні брокера. Усі повідомлення з топика доставляються консьюмеру, і він сам вирішує які обробляти, а які ігнорувати.
Це архітектурне рішення Kafka — брокер працює як «розумна труба» і не заглядає в зміст повідомлень. Це спрощує брокер і дає максимальну продуктивність.
Аналогія
Уявіть поштову скриньку. Листоноша (Kafka) приносить УСІ листи. Ви (consumer) відкриваєте скриньку і:
- Реклама — у кошик (ігноруєте)
- Рахунок за електрику — оплачуєте (обробляєте)
- Лист від друга — читаєте (обробляєте)
Листоноша не фільтрує — він приносить все. Фільтрація — ваше завдання.
Базовий приклад
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Фільтрація на стороні консьюмера
if (shouldProcess(record)) {
process(record);
}
// Ігноруємо — але offset все одно рухається
}
consumer.commitSync();
}
private boolean shouldProcess(ConsumerRecord<String, String> record) {
// Приклад: обробляємо тільки замовлення > $100
Order order = parseOrder(record.value());
return order.amount() > 100;
}
Коли це важливо
- Кілька типів подій в одному топику — filter по типу
- Multi-tenant — filter по tenant ID
- Feature flags — filter по умовам
- Data privacy — filter sensitive data
🟡 Middle Level
Підходи до фільтрації
1. Фільтрація в консьюмері (найпростіший)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Order order = parseOrder(record.value());
if (order.status().equals("COMPLETED") && order.amount() > 100) {
process(order);
}
// Ігноруємо PENDING, CANCELLED, small orders
}
2. Фільтрація через Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
// Фільтрація
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
.to("important-orders"); // новий топик
3. Фільтрація через ksqlDB
CREATE STREAM important_orders AS
SELECT * FROM orders
WHERE amount > 100 AND status = 'COMPLETED'
EMIT CHANGES;
4. Фільтрація на стороні продюсера (найкращий варіант)
// Продюсер сам вирішує що писати в який топик
if (order.amount() > 100) {
producer.send(new ProducerRecord("important-orders", order));
} else {
producer.send(new ProducerRecord("normal-orders", order));
}
Порівняння підходів
| Підход | Складність | Network overhead | CPU overhead | Коли використовувати |
|---|---|---|---|---|
| Consumer filter | Низька | Високий (всі messages) | Низький | Simple filters |
| Kafka Streams | Середня | Середній | Середній | Complex transformations |
| ksqlDB | Середня | Середній | Низький | SQL-style filtering |
| Producer routing | Низька | Низький | Низький | Best practice |
Коли фільтрація на консьюмері приемлема
- Прості умови — filter by key, status, amount
- Low-volume topics — network cost negligible
- Temporary solution — quick fix before proper routing
- Multi-purpose consumer — consumer повинен бачити все для auditing
Коли НЕ використовувати consumer-side filtering
- High-volume topics — wasted network/disk/CPU
- Data privacy — sensitive data shouldn’t reach consumer at all
- Regulatory — GDPR, PCI-DSS вимагають data segregation
- Cost optimization — network/disk waste at scale
🔴 Senior Level
Чому Kafka не підтримує server-side filtering
Архітектурне рішення Kafka:
Broker = dumb storage + replication
Intelligence на producer і consumer
Причини:
1. Broker complexity — filter logic на кожному broker
2. Performance — scan всіх messages для filter = O(n) per fetch
3. Flexibility — filter logic application-specific
4. Consistency — різні consumers, різні filters
Result:
FetchRequest → returns ALL messages in range
Consumer → receives ALL → filters locally
KIP-290 і KIP-359 обговорювали server-side filtering, але відхилили через complexity і performance impact.
Consumer filtering і offset management
// КРИТИЧНО: offset management з filtering
// ПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
}
// Offset рухається для ВСІХ messages (processed + filtered)
}
consumer.commitSync(); // коммітимо останній offset
// НЕПРАВИЛЬНО:
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
consumer.commitSync(); // ❌
}
}
// Filtered messages будуть прочитані знову при restart!
Trade-offs
| Аспект | Consumer filtering | Producer routing | Kafka Streams |
|---|---|---|---|
| Network (read) | All messages | Only needed | All messages |
| Network (write) | N/A | Per-topic writes | Per-topic writes |
| Consumer CPU | Filter + process | Process only | N/A |
| Complexity | Low | Low-Medium | Medium-High |
| Data privacy | Low (all data exposed) | High (segregated) | High |
Edge Cases
1. Filtered message dependency:
order-1 (PENDING) → filtered out
order-2 (COMPLETED) → processed, але залежить від order-1
→ Processing fails!
Рішення: Не filter dependent messages
2. Filtering з ordering guarantee:
Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
Filter: status=COMPLETE
→ order-2(NEW) filtered out, але мав бути оброблений ДО order-1(COMPLETE)
→ Filter порушив ordering!
3. High-volume filtering — performance impact:
1M messages/min в topic
Filter: 99% filtered out, 1% processed
Network cost: 1M × 1KB = 1GB/min (99% wasted!)
At this scale:
Consumer filtering = unacceptable
Producer routing REQUIRED
Highload Best Practices
✅ Producer routing для high-volume (>100K msg/min)
✅ Consumer filtering для simple, low-volume cases
✅ Kafka Streams для complex filtering + transformation
✅ ksqlDB для SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ Filter BEFORE processing (deserialize only if needed)
❌ Consumer filtering для high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Complex filter logic in consumer (hard to maintain)
Архітектурні рішення
- Producer routing > Consumer filtering — move intelligence to producer
- Filter ratio as architecture smell — >90% filter = wrong architecture
- One topic per message type — natural filtering at source
- Kafka Streams as filter layer — intermediate filtering before consumers
- Offset management: коммітити для ВСІХ повідомлень (processed + filtered)
Резюме для Senior
- Kafka НЕ підтримує server-side filtering — architectural decision
- Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
- Producer routing = most efficient but least flexible
- Filter ratio > 90% = strong signal for architecture change
- Offset management: коммітити для ВСІХ повідомлень (processed + filtered)
- Ordering can be broken by filtering — be careful with dependent messages
- At highload (>100K msg/min): consumer filtering unacceptable
- Monitor filter rate — key metric for architecture health
- Best practice: one topic per message type, no filtering needed
🎯 Шпаргалка для інтерв’ю
Обов’язково знати:
- Kafka НЕ підтримує server-side filtering — architectural decision (broker = dumb pipe)
- Consumer filtering: отримує ВСІ повідомлення, обробляє тільки потрібні; wasted network/disk/CPU
- Producer routing — best practice: різні типи повідомлень → різні топики
- Kafka Streams / ksqlDB — проміжна фільтрація перед консьюмерами
- Offset management: коммітити для ВСІХ повідомлень (processed + filtered), не тільки processed
- Filter ratio > 90% = strong signal для architecture change (producer routing needed)
- Фільтрація може порушити ordering для dependent messages
Часті уточнюючі запитання:
- Чому Kafka не підтримує server-side filtering? — Broker complexity, O(n) scan, performance impact.
- Filter ratio 95% — це проблема? — Так, 95% network/disk wasted; потрібен producer routing.
- Як filter не зламає offset? — Коммітити останній offset batch, включаючи filtered messages.
- Коли filtering на консьюмері приемлема? — Low-volume, simple filters, temporary solution.
Червоні прапорці (НЕ говорити):
- «Kafka фільтрує повідомлення на брокері» — не фільтрує, architectural decision
- «Consumer filtering для high-volume — норма» — при >100K msg/min unacceptable
- «Filter ratio 95% — відмінна практика» — architecture smell, потрібен producer routing
- «Offset коммітиться тільки для processed messages» — filtered будуть прочитані знову
Пов’язані теми:
- [[1. Що таке топiк (topic) в Kafka]]
- [[25. Що таке DLQ (Dead Letter Queue)]]
- [[3. Як дані розподіляються по партиціях]]
- [[21. Що таке batch в Kafka producer]]