Как реализовать фильтрацию сообщений на стороне консьюмера в Kafka
Kafka не поддерживает фильтрацию на стороне брокера. Все сообщения из топика доставляются консьюмеру, и он сам решает какие обрабатывать, а какие игнорировать.
🟢 Junior Level
Простое определение
Kafka не поддерживает фильтрацию на стороне брокера. Все сообщения из топика доставляются консьюмеру, и он сам решает какие обрабатывать, а какие игнорировать.
Это архитектурное решение Kafka — брокер работает как «умная труба» и не заглядывает в содержимое сообщений. Это упрощает брокер и даёт максимальную производительность.
Аналогия
Представьте почтовый ящик. Почтальон (Kafka) приносит ВСЕ письма. Вы (consumer) открываете ящик и:
- Реклама → в корзину (игнорируете)
- Счёт за электричество → оплачиваете (обрабатываете)
- Письмо от друга → читаете (обрабатываете)
Почтальон не фильтрует — он приносит всё. Фильтрация — ваша задача.
Базовый пример
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Фильтрация на стороне консьюмера
if (shouldProcess(record)) {
process(record);
}
// Игнорируем — но offset всё равно двигается
}
consumer.commitSync();
}
private boolean shouldProcess(ConsumerRecord<String, String> record) {
// Пример: обрабатываем только заказы > $100
Order order = parseOrder(record.value());
return order.amount() > 100;
}
Когда это важно
- Несколько типов событий в одном топике — filter по типу
- Multi-tenant — filter по tenant ID
- Feature flags — filter по условиям
- Data privacy — filter sensitive data
🟡 Middle Level
Подходы к фильтрации
1. Фильтрация в консьюмере (самый простой)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Order order = parseOrder(record.value());
if (order.status().equals("COMPLETED") && order.amount() > 100) {
process(order);
}
// Игнорируем PENDING, CANCELLED, small orders
}
2. Фильтрация через Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
// Фильтрация
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
.to("important-orders"); // новый топик
// Или side-output для отфильтрованных
orders.branch(
(key, order) -> order.amount() > 100, // stream 1: important
(key, order) -> order.amount() <= 100 // stream 2: normal
)[0].to("important-orders");
3. Фильтрация через ksqlDB
-- Создание отфильтрованного stream
CREATE STREAM important_orders AS
SELECT *
FROM orders
WHERE amount > 100
AND status = 'COMPLETED'
EMIT CHANGES;
-- Consumer читает уже отфильтрованный stream
-- consumer.subscribe(List.of("important_orders"))
4. Фильтрация на стороне продюсера (лучший вариант)
// Продюсер сам решает что писать в какой топик
if (order.amount() > 100) {
producer.send(new ProducerRecord("important-orders", order));
} else {
producer.send(new ProducerRecord("normal-orders", order));
}
Таблица типичных ошибок
| Ошибка | Симптомы | Последствия | Решение |
|---|---|---|---|
| Фильтрация без offset commit | Offset не двигается для filtered | Duplicate processing на restart | Commit offset даже для filtered |
| Фильтрация после полной обработки | Wasted CPU | Обработали, потом выбросили | Фильтровать ДО обработки |
| Фильтрация в сложной логике | Hard to maintain | Bugs в filter logic | Вынести filter в отдельный step |
| Без monitoring filtered count | Не видно что filter работает | Невозможно tune | Метрики: processed vs filtered |
| Все сообщения в один топик | Каждый consumer фильтрует | Wasted network + CPU | Разделить на topics на producer side |
| Без alternative routing | Отфильтрованные сообщения теряются | Data loss | DLQ или separate topic для filtered |
Сравнение подходов
| Подход | Сложность | Network overhead | CPU overhead | Когда использовать |
|---|---|---|---|---|
| Consumer filter | Низкая | Высокий (все messages) | Низкий | Simple filters |
| Kafka Streams | Средняя | Средний | Средний | Complex transformations |
| ksqlDB | Средняя | Средний | Низкий | SQL-style filtering |
| Producer routing | Низкая | Низкий | Низкий | Best practice |
Когда фильтрация на консьюмере приемлема
- Простые условия — filter by key, status, amount
- Low-volume topics — network cost negligible
- Temporary solution — quick fix before proper routing
- Multi-purpose consumer — consumer должен видеть всё для auditing
Когда НЕ использовать consumer-side filtering
- High-volume topics — wasted network/disk/CPU
- Data privacy — sensitive data shouldn’t reach consumer at all
- Regulatory — GDPR, PCI-DSS требуют data segregation
- Cost optimization — network/disk waste at scale
🔴 Senior Level
Глубокие внутренности
Почему Kafka не поддерживает server-side filtering
Архитектурное решение Kafka:
Broker = dumb storage + replication
Intelligence на producer и consumer
Причины:
1. Broker complexity — filter logic на каждом broker
2. Performance — scan всех messages для filter = O(n) per fetch
3. Flexibility — filter logic application-specific
4. Consistency — разные consumers, разные filters
Result:
FetchRequest → returns ALL messages in range
Consumer → receives ALL → filters locally
Broker doesn't know/doesn't care about filter criteria
KIP-290 и KIP-359 обсуждали server-side filtering, но отклонили из-за complexity и performance impact.
Consumer filtering и offset management
// CRITICAL: offset management с filtering
// ПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
}
// Offset двигается для ВСЕХ messages (processed + filtered)
}
consumer.commitSync(); // коммитим последний offset
// НЕПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
// Commit только для processed — offset не двигается для filtered
consumer.commitSync(); // ❌
}
}
// Filtered messages будут прочитаны снова при restart!
Kafka Streams filtering — internal mechanics
Kafka Streams filtering:
builder.stream("orders") // reads from topic
.filter(predicate) // applies filter
.to("filtered-topic"); // writes to new topic
Internal flow:
1. Source node: reads from "orders" topic
2. Filter node: applies predicate, drops non-matching
3. Sink node: writes matching to "filtered-topic"
Key point:
Filtered messages are READ from "orders" but NOT WRITTEN to "filtered-topic"
Network I/O: orders topic → Streams app (all messages)
Network I/O: Streams app → filtered-topic (only matching)
Это лучше чем consumer filtering (write I/O saved)
Но хуже чем producer routing (read I/O wasted)
Trade-offs
| Аспект | Consumer filtering | Producer routing | Kafka Streams |
|---|---|---|---|
| Network (read) | All messages | Only needed | All messages |
| Network (write) | N/A | Per-topic writes | Per-topic writes |
| Broker disk | All messages | Split by topic | Split by topic |
| Consumer CPU | Filter + process | Process only | N/A (separate app) |
| Complexity | Low | Low-Medium | Medium-High |
| Flexibility | High (dynamic filter) | Low (static routing) | High |
| Data privacy | Low (all data exposed) | High (segregated) | High (segregated) |
Edge Cases
1. Filtered message dependency:
Сценарий:
Topic "orders": order-1 (status=PENDING, $50), order-2 (status=COMPLETED, $200)
order-2 references order-1 (return order)
Consumer filter: status=COMPLETED
order-1 → filtered out (PENDING)
order-2 → processed
Problem: order-2 processing fails because order-1 not in system
Решение:
1. Не filter dependent messages
2. Process ALL messages, filter at action level
3. Use separate topics per status (producer routing)
2. Dynamic filter requirements:
Сценарий:
Filter criteria changes at runtime:
Monday: amount > 100
Tuesday: amount > 50
Wednesday: amount > 200
Consumer-side: легко, reload filter config
Producer routing: сложно, producer нужно update
ksqlDB: легко, ALTER STREAM
Решение для consumer:
volatile FilterConfig currentFilter;
while (running) {
FilterConfig filter = currentFilter; // atomic read
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (filter.matches(record)) {
process(record);
}
}
consumer.commitSync();
}
// Filter update (из config service):
void updateFilter(FilterConfig newFilter) {
currentFilter = newFilter; // atomic write
}
3. Filtering с ordering guarantee:
Сценарий:
Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
Filter: status=COMPLETE
order-1(PENDING) → filtered out
order-1(COMPLETE) → processed
order-2(NEW) → filtered out
Проблема: order-2(NEW) должен был processed ДО order-1(COMPLETE)
Filter нарушил ordering!
Решение:
1. Не filter messages that affect ordering
2. Filter на уровне topic (producer routing), не consumer
3. Process ALL, filter at action level
4. Filtering и DLQ:
// Filtered messages → DLQ для audit
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
} else {
// Не process, но и не lose
dlqProducer.send(new ProducerRecord<>(
"orders-filtered", // separate topic для audit
record.key(),
record.value()
));
}
}
consumer.commitSync();
5. High-volume filtering — performance impact:
Сценарий:
1M messages/min в topic
Filter: 99% filtered out, 1% processed
Consumer получает 1M, обрабатывает 10K
Network cost: 1M messages × 1KB = 1GB/min (99% wasted!)
CPU cost: deserialize 1M, filter 1M, process 10K
Disk cost: 1M messages stored (99% never needed)
At this scale:
Consumer filtering = unacceptable
Producer routing REQUIRED
Или Kafka Streams pre-filtering
Производительность (production numbers)
| Подход | Throughput | Network I/O | CPU overhead | Disk overhead |
|---|---|---|---|---|
| No filtering | 100K msg/s | 100% | baseline | 100% |
| Consumer filtering (90% filter) | 100K msg/s | 100% | +5% (deserialize + filter) | 100% |
| Kafka Streams filter | 100K msg/s | 2× (read + write) | +15% (Streams overhead) | 10% (filtered only) |
| Producer routing | 100K msg/s | 10% (only needed) | baseline | 10% |
| ksqlDB filter | 100K msg/s | 2× (read + write) | +10% (ksqlDB) | 10% |
Production War Story
Ситуация: Payment gateway, 500K transactions/min. Один topic “all-transactions” со всеми типами: authorization, capture, refund, void.
Проблема: Refund processor consumer получал все 500K msg/min, фильтровал 95% (только refund’ы нужны). Network: 500MB/min, CPU: 40% на deserialize + filter. После масштабирования до 2M msg/min — network saturation, consumer crash.
Расследование:
Current architecture:
Producer → all-transactions (500K msg/min)
↓
Auth consumer: filters 90% (processes 50K)
Capture consumer: filters 85% (processes 75K)
Refund consumer: filters 95% (processes 25K) ← problem
Void consumer: filters 98% (processes 10K)
Total network: 500K × 4 consumers = 2M message deliveries/min
95% of this traffic is wasted!
Решение:
New architecture (producer routing):
Producer → auth-topic (50K msg/min) → Auth consumer (100% processed)
→ capture-topic (75K msg/min) → Capture consumer (100% processed)
→ refund-topic (25K msg/min) → Refund consumer (100% processed)
→ void-topic (10K msg/min) → Void consumer (100% processed)
Total network: 160K message deliveries/min (92% reduction!)
CPU: 0% wasted on filtering
Disk: 68% reduction (only needed data stored)
Post-mortem lesson: Consumer filtering OK для low-volume. При scale > 100K msg/min — producer routing mandatory.
Monitoring (JMX, Prometheus, Burrow)
Application metrics:
// Filter metrics
Counter messagesReceived = meterRegistry.counter("kafka.consumer.messages.received");
Counter messagesProcessed = meterRegistry.counter("kafka.consumer.messages.processed");
Counter messagesFiltered = meterRegistry.counter("kafka.consumer.messages.filtered");
Timer processingTime = Timer.builder("kafka.consumer.processing.time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
// In consumer loop:
messagesReceived.increment();
if (shouldProcess(record)) {
processingTime.record(() -> process(record));
messagesProcessed.increment();
} else {
messagesFiltered.increment();
}
Prometheus alerts:
- alert: KafkaConsumerFilterRateHigh
expr: rate(kafka_consumer_messages_filtered_total[5m]) / rate(kafka_consumer_messages_received_total[5m]) > 0.9
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer filtering > 90% на постоянной основе — сигнал к пересмотру архитектуры. Для временных или экспериментальных фильтров это допустимо."
- alert: KafkaConsumerFilteringWastedBandwidth
expr: rate(kafka_consumer_bytes_received_total[5m]) > 100_000_000 # 100MB/s
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer receiving > 100MB/s — check if filtering is causing waste"
Highload Best Practices
✅ Producer routing для high-volume (>100K msg/min)
✅ Consumer filtering для simple, low-volume cases
✅ Kafka Streams для complex filtering + transformation
✅ ksqlDB для SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ DLQ для filtered messages (audit trail)
✅ Dynamic filter config для runtime changes
✅ Filter BEFORE processing (deserialize only if needed)
❌ Consumer filtering для high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Without alternative routing for filtered data
❌ Complex filter logic in consumer (hard to maintain)
Архитектурные решения
- Producer routing > Consumer filtering — move intelligence to producer
- Filter ratio as architecture smell — >90% filter = wrong architecture
- One topic per message type — natural filtering at source
- Kafka Streams as filter layer — intermediate filtering before consumers
- Audit trail for filtered data — DLQ или separate topic
Резюме для Senior
- Kafka НЕ поддерживает server-side filtering — architectural decision
- Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
- Producer routing = most efficient but least flexible
- Filter ratio > 90% = strong signal for architecture change
- Offset management: commit for ALL messages (processed + filtered)
- Ordering can be broken by filtering — be careful with dependent messages
- Dynamic filters: use volatile config, atomic updates
- At highload (>100K msg/min): consumer filtering unacceptable
- Monitor filter rate — key metric for architecture health
- Best practice: one topic per message type, no filtering needed
🎯 Шпаргалка для интервью
Обязательно знать:
- Kafka НЕ поддерживает server-side filtering — architectural decision (broker = dumb pipe)
- Consumer filtering: получает ВСЕ сообщения, обрабатывает только нужные; wasted network/disk/CPU
- Producer routing — best practice: разные типы сообщений → разные топики
- Kafka Streams / ksqlDB — промежуточная фильтрация перед консьюмерами
- Offset management: коммитить для ВСЕХ сообщений (processed + filtered), не только processed
- Filter ratio > 90% = strong signal для architecture change (producer routing needed)
- Фильтрация может нарушить ordering для dependent messages
Частые уточняющие вопросы:
- Почему Kafka не поддерживает server-side filtering? — Broker complexity, O(n) scan, performance impact.
- Filter ratio 95% — это проблема? — Да, 95% network/disk wasted; нужен producer routing.
- Как filter не сломает offset? — Коммитить последний offset batch, включая filtered messages.
- Когда filtering на консьюмере приемлема? — Low-volume, simple filters, temporary solution.
Красные флаги (НЕ говорить):
- «Kafka фильтрует сообщения на брокере» — не фильтрует, architectural decision
- «Consumer filtering для high-volume — норма» — при >100K msg/min unacceptable
- «Filter ratio 95% — отличная практика» — architecture smell, нужен producer routing
- «Offset коммитится только для processed messages» — filtered будут прочитаны снова
Связанные темы:
- [[1. Что такое топик (topic) в Kafka]]
- [[25. Что такое DLQ (Dead Letter Queue)]]
- [[3. Как данные распределяются по партициям]]
- [[21. Что такое batch в Kafka producer]]