Вопрос 30 · Раздел 15

Как реализовать фильтрацию сообщений на стороне консьюмера в Kafka

Kafka не поддерживает фильтрацию на стороне брокера. Все сообщения из топика доставляются консьюмеру, и он сам решает какие обрабатывать, а какие игнорировать.

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Простое определение

Kafka не поддерживает фильтрацию на стороне брокера. Все сообщения из топика доставляются консьюмеру, и он сам решает какие обрабатывать, а какие игнорировать.

Это архитектурное решение Kafka — брокер работает как «умная труба» и не заглядывает в содержимое сообщений. Это упрощает брокер и даёт максимальную производительность.

Аналогия

Представьте почтовый ящик. Почтальон (Kafka) приносит ВСЕ письма. Вы (consumer) открываете ящик и:

  • Реклама → в корзину (игнорируете)
  • Счёт за электричество → оплачиваете (обрабатываете)
  • Письмо от друга → читаете (обрабатываете)

Почтальон не фильтрует — он приносит всё. Фильтрация — ваша задача.

Базовый пример

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Фильтрация на стороне консьюмера
        if (shouldProcess(record)) {
            process(record);
        }
        // Игнорируем — но offset всё равно двигается
    }
    consumer.commitSync();
}

private boolean shouldProcess(ConsumerRecord<String, String> record) {
    // Пример: обрабатываем только заказы > $100
    Order order = parseOrder(record.value());
    return order.amount() > 100;
}

Когда это важно

  • Несколько типов событий в одном топике — filter по типу
  • Multi-tenant — filter по tenant ID
  • Feature flags — filter по условиям
  • Data privacy — filter sensitive data

🟡 Middle Level

Подходы к фильтрации

1. Фильтрация в консьюмере (самый простой)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Order order = parseOrder(record.value());
    
    if (order.status().equals("COMPLETED") && order.amount() > 100) {
        process(order);
    }
    // Игнорируем PENDING, CANCELLED, small orders
}

2. Фильтрация через Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");

// Фильтрация
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
      .to("important-orders");  // новый топик

// Или side-output для отфильтрованных
orders.branch(
    (key, order) -> order.amount() > 100,              // stream 1: important
    (key, order) -> order.amount() <= 100              // stream 2: normal
)[0].to("important-orders");

3. Фильтрация через ksqlDB

-- Создание отфильтрованного stream
CREATE STREAM important_orders AS
    SELECT *
    FROM orders
    WHERE amount > 100
      AND status = 'COMPLETED'
    EMIT CHANGES;

-- Consumer читает уже отфильтрованный stream
-- consumer.subscribe(List.of("important_orders"))

4. Фильтрация на стороне продюсера (лучший вариант)

// Продюсер сам решает что писать в какой топик
if (order.amount() > 100) {
    producer.send(new ProducerRecord("important-orders", order));
} else {
    producer.send(new ProducerRecord("normal-orders", order));
}

Таблица типичных ошибок

Ошибка Симптомы Последствия Решение
Фильтрация без offset commit Offset не двигается для filtered Duplicate processing на restart Commit offset даже для filtered
Фильтрация после полной обработки Wasted CPU Обработали, потом выбросили Фильтровать ДО обработки
Фильтрация в сложной логике Hard to maintain Bugs в filter logic Вынести filter в отдельный step
Без monitoring filtered count Не видно что filter работает Невозможно tune Метрики: processed vs filtered
Все сообщения в один топик Каждый consumer фильтрует Wasted network + CPU Разделить на topics на producer side
Без alternative routing Отфильтрованные сообщения теряются Data loss DLQ или separate topic для filtered

Сравнение подходов

Подход Сложность Network overhead CPU overhead Когда использовать
Consumer filter Низкая Высокий (все messages) Низкий Simple filters
Kafka Streams Средняя Средний Средний Complex transformations
ksqlDB Средняя Средний Низкий SQL-style filtering
Producer routing Низкая Низкий Низкий Best practice

Когда фильтрация на консьюмере приемлема

  • Простые условия — filter by key, status, amount
  • Low-volume topics — network cost negligible
  • Temporary solution — quick fix before proper routing
  • Multi-purpose consumer — consumer должен видеть всё для auditing

Когда НЕ использовать consumer-side filtering

  • High-volume topics — wasted network/disk/CPU
  • Data privacy — sensitive data shouldn’t reach consumer at all
  • Regulatory — GDPR, PCI-DSS требуют data segregation
  • Cost optimization — network/disk waste at scale

🔴 Senior Level

Глубокие внутренности

Почему Kafka не поддерживает server-side filtering

Архитектурное решение Kafka:
  Broker = dumb storage + replication
  Intelligence на producer и consumer
  
Причины:
  1. Broker complexity — filter logic на каждом broker
  2. Performance — scan всех messages для filter = O(n) per fetch
  3. Flexibility — filter logic application-specific
  4. Consistency — разные consumers, разные filters
  
Result:
  FetchRequest → returns ALL messages in range
  Consumer → receives ALL → filters locally
  Broker doesn't know/doesn't care about filter criteria

KIP-290 и KIP-359 обсуждали server-side filtering, но отклонили из-за complexity и performance impact.

Consumer filtering и offset management

// CRITICAL: offset management с filtering

// ПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
    }
    // Offset двигается для ВСЕХ messages (processed + filtered)
}
consumer.commitSync();  // коммитим последний offset

// НЕПРАВИЛЬНО:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
        // Commit только для processed — offset не двигается для filtered
        consumer.commitSync();  // ❌
    }
}
// Filtered messages будут прочитаны снова при restart!

Kafka Streams filtering — internal mechanics

Kafka Streams filtering:
  builder.stream("orders")              // reads from topic
         .filter(predicate)              // applies filter
         .to("filtered-topic");          // writes to new topic

Internal flow:
  1. Source node: reads from "orders" topic
  2. Filter node: applies predicate, drops non-matching
  3. Sink node: writes matching to "filtered-topic"

Key point:
  Filtered messages are READ from "orders" but NOT WRITTEN to "filtered-topic"
  Network I/O: orders topic → Streams app (all messages)
  Network I/O: Streams app → filtered-topic (only matching)
  
  Это лучше чем consumer filtering (write I/O saved)
  Но хуже чем producer routing (read I/O wasted)

Trade-offs

Аспект Consumer filtering Producer routing Kafka Streams
Network (read) All messages Only needed All messages
Network (write) N/A Per-topic writes Per-topic writes
Broker disk All messages Split by topic Split by topic
Consumer CPU Filter + process Process only N/A (separate app)
Complexity Low Low-Medium Medium-High
Flexibility High (dynamic filter) Low (static routing) High
Data privacy Low (all data exposed) High (segregated) High (segregated)

Edge Cases

1. Filtered message dependency:

Сценарий:
  Topic "orders": order-1 (status=PENDING, $50), order-2 (status=COMPLETED, $200)
  order-2 references order-1 (return order)
  
  Consumer filter: status=COMPLETED
  order-1 → filtered out (PENDING)
  order-2 → processed
  Problem: order-2 processing fails because order-1 not in system
  
Решение:
  1. Не filter dependent messages
  2. Process ALL messages, filter at action level
  3. Use separate topics per status (producer routing)

2. Dynamic filter requirements:

Сценарий:
  Filter criteria changes at runtime:
    Monday: amount > 100
    Tuesday: amount > 50
    Wednesday: amount > 200
  
  Consumer-side: легко, reload filter config
  Producer routing: сложно, producer нужно update
  ksqlDB: легко, ALTER STREAM
  
Решение для consumer:
  volatile FilterConfig currentFilter;
  
  while (running) {
      FilterConfig filter = currentFilter;  // atomic read
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, String> record : records) {
          if (filter.matches(record)) {
              process(record);
          }
      }
      consumer.commitSync();
  }
  
  // Filter update (из config service):
  void updateFilter(FilterConfig newFilter) {
      currentFilter = newFilter;  // atomic write
  }

3. Filtering с ordering guarantee:

Сценарий:
  Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
  Filter: status=COMPLETE
  
  order-1(PENDING) → filtered out
  order-1(COMPLETE) → processed
  order-2(NEW) → filtered out
  
  Проблема: order-2(NEW) должен был processed ДО order-1(COMPLETE)
  Filter нарушил ordering!
  
Решение:
  1. Не filter messages that affect ordering
  2. Filter на уровне topic (producer routing), не consumer
  3. Process ALL, filter at action level

4. Filtering и DLQ:

// Filtered messages → DLQ для audit
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
    } else {
        // Не process, но и не lose
        dlqProducer.send(new ProducerRecord<>(
            "orders-filtered",  // separate topic для audit
            record.key(),
            record.value()
        ));
    }
}
consumer.commitSync();

5. High-volume filtering — performance impact:

Сценарий:
  1M messages/min в topic
  Filter: 99% filtered out, 1% processed
  Consumer получает 1M, обрабатывает 10K
  
Network cost: 1M messages × 1KB = 1GB/min (99% wasted!)
CPU cost: deserialize 1M, filter 1M, process 10K
Disk cost: 1M messages stored (99% never needed)

At this scale:
  Consumer filtering = unacceptable
  Producer routing REQUIRED
  Или Kafka Streams pre-filtering

Производительность (production numbers)

Подход Throughput Network I/O CPU overhead Disk overhead
No filtering 100K msg/s 100% baseline 100%
Consumer filtering (90% filter) 100K msg/s 100% +5% (deserialize + filter) 100%
Kafka Streams filter 100K msg/s 2× (read + write) +15% (Streams overhead) 10% (filtered only)
Producer routing 100K msg/s 10% (only needed) baseline 10%
ksqlDB filter 100K msg/s 2× (read + write) +10% (ksqlDB) 10%

Production War Story

Ситуация: Payment gateway, 500K transactions/min. Один topic “all-transactions” со всеми типами: authorization, capture, refund, void.

Проблема: Refund processor consumer получал все 500K msg/min, фильтровал 95% (только refund’ы нужны). Network: 500MB/min, CPU: 40% на deserialize + filter. После масштабирования до 2M msg/min — network saturation, consumer crash.

Расследование:

Current architecture:
  Producer → all-transactions (500K msg/min)
                          ↓
  Auth consumer: filters 90% (processes 50K)
  Capture consumer: filters 85% (processes 75K)
  Refund consumer: filters 95% (processes 25K) ← problem
  Void consumer: filters 98% (processes 10K)
  
  Total network: 500K × 4 consumers = 2M message deliveries/min
  95% of this traffic is wasted!

Решение:

New architecture (producer routing):
  Producer → auth-topic (50K msg/min)     → Auth consumer (100% processed)
           → capture-topic (75K msg/min)  → Capture consumer (100% processed)
           → refund-topic (25K msg/min)   → Refund consumer (100% processed)
           → void-topic (10K msg/min)     → Void consumer (100% processed)
  
  Total network: 160K message deliveries/min (92% reduction!)
  CPU: 0% wasted on filtering
  Disk: 68% reduction (only needed data stored)

Post-mortem lesson: Consumer filtering OK для low-volume. При scale > 100K msg/min — producer routing mandatory.

Monitoring (JMX, Prometheus, Burrow)

Application metrics:

// Filter metrics
Counter messagesReceived = meterRegistry.counter("kafka.consumer.messages.received");
Counter messagesProcessed = meterRegistry.counter("kafka.consumer.messages.processed");
Counter messagesFiltered = meterRegistry.counter("kafka.consumer.messages.filtered");

Timer processingTime = Timer.builder("kafka.consumer.processing.time")
    .publishPercentiles(0.5, 0.95, 0.99)
    .register(meterRegistry);

// In consumer loop:
messagesReceived.increment();
if (shouldProcess(record)) {
    processingTime.record(() -> process(record));
    messagesProcessed.increment();
} else {
    messagesFiltered.increment();
}

Prometheus alerts:

- alert: KafkaConsumerFilterRateHigh
  expr: rate(kafka_consumer_messages_filtered_total[5m]) / rate(kafka_consumer_messages_received_total[5m]) > 0.9
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Consumer filtering > 90% на постоянной основе  сигнал к пересмотру архитектуры. Для временных или экспериментальных фильтров это допустимо."

- alert: KafkaConsumerFilteringWastedBandwidth
  expr: rate(kafka_consumer_bytes_received_total[5m]) > 100_000_000  # 100MB/s
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer receiving > 100MB/s  check if filtering is causing waste"

Highload Best Practices

✅ Producer routing для high-volume (>100K msg/min)
✅ Consumer filtering для simple, low-volume cases
✅ Kafka Streams для complex filtering + transformation
✅ ksqlDB для SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ DLQ для filtered messages (audit trail)
✅ Dynamic filter config для runtime changes
✅ Filter BEFORE processing (deserialize only if needed)

❌ Consumer filtering для high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Without alternative routing for filtered data
❌ Complex filter logic in consumer (hard to maintain)

Архитектурные решения

  1. Producer routing > Consumer filtering — move intelligence to producer
  2. Filter ratio as architecture smell — >90% filter = wrong architecture
  3. One topic per message type — natural filtering at source
  4. Kafka Streams as filter layer — intermediate filtering before consumers
  5. Audit trail for filtered data — DLQ или separate topic

Резюме для Senior

  • Kafka НЕ поддерживает server-side filtering — architectural decision
  • Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
  • Producer routing = most efficient but least flexible
  • Filter ratio > 90% = strong signal for architecture change
  • Offset management: commit for ALL messages (processed + filtered)
  • Ordering can be broken by filtering — be careful with dependent messages
  • Dynamic filters: use volatile config, atomic updates
  • At highload (>100K msg/min): consumer filtering unacceptable
  • Monitor filter rate — key metric for architecture health
  • Best practice: one topic per message type, no filtering needed

🎯 Шпаргалка для интервью

Обязательно знать:

  • Kafka НЕ поддерживает server-side filtering — architectural decision (broker = dumb pipe)
  • Consumer filtering: получает ВСЕ сообщения, обрабатывает только нужные; wasted network/disk/CPU
  • Producer routing — best practice: разные типы сообщений → разные топики
  • Kafka Streams / ksqlDB — промежуточная фильтрация перед консьюмерами
  • Offset management: коммитить для ВСЕХ сообщений (processed + filtered), не только processed
  • Filter ratio > 90% = strong signal для architecture change (producer routing needed)
  • Фильтрация может нарушить ordering для dependent messages

Частые уточняющие вопросы:

  • Почему Kafka не поддерживает server-side filtering? — Broker complexity, O(n) scan, performance impact.
  • Filter ratio 95% — это проблема? — Да, 95% network/disk wasted; нужен producer routing.
  • Как filter не сломает offset? — Коммитить последний offset batch, включая filtered messages.
  • Когда filtering на консьюмере приемлема? — Low-volume, simple filters, temporary solution.

Красные флаги (НЕ говорить):

  • «Kafka фильтрует сообщения на брокере» — не фильтрует, architectural decision
  • «Consumer filtering для high-volume — норма» — при >100K msg/min unacceptable
  • «Filter ratio 95% — отличная практика» — architecture smell, нужен producer routing
  • «Offset коммитится только для processed messages» — filtered будут прочитаны снова

Связанные темы:

  • [[1. Что такое топик (topic) в Kafka]]
  • [[25. Что такое DLQ (Dead Letter Queue)]]
  • [[3. Как данные распределяются по партициям]]
  • [[21. Что такое batch в Kafka producer]]