Question 30 · Section 15

How to implement message filtering on consumer side in Kafka

Kafka does not support broker-side filtering. All messages from a topic are delivered to the consumer, and it decides which ones to process and which to ignore.

Language versions: English Russian Ukrainian

Junior Level

Simple Definition

Kafka does not support broker-side filtering. All messages from a topic are delivered to the consumer, and it decides which ones to process and which to ignore.

This is an architectural decision in Kafka — the broker works as a “smart pipe” and doesn’t look into message contents. This simplifies the broker and delivers maximum performance.

Analogy

Imagine a mailbox. The postman (Kafka) delivers ALL letters. You (consumer) open the mailbox and:

  • Advertising → trash can (you ignore it)
  • Electricity bill → you pay it (process it)
  • Letter from a friend → you read it (process it)

The postman doesn’t filter — he brings everything. Filtering is your job.

Basic Example

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // Filtering on consumer side
        if (shouldProcess(record)) {
            process(record);
        }
        // We ignore it — but the offset still moves
    }
    consumer.commitSync();
}

private boolean shouldProcess(ConsumerRecord<String, String> record) {
    // Example: we only process orders > $100
    Order order = parseOrder(record.value());
    return order.amount() > 100;
}

When This Matters

  • Multiple event types in one topic — filter by type
  • Multi-tenant — filter by tenant ID
  • Feature flags — filter by conditions
  • Data privacy — filter sensitive data

Middle Level

Filtering Approaches

1. Consumer-side Filtering (simplest)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Order order = parseOrder(record.value());

    if (order.status().equals("COMPLETED") && order.amount() > 100) {
        process(order);
    }
    // We ignore PENDING, CANCELLED, small orders
}

2. Filtering via Kafka Streams

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");

// Filtering
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
      .to("important-orders");  // new topic

// Or side-output for filtered messages
orders.branch(
    (key, order) -> order.amount() > 100,              // stream 1: important
    (key, order) -> order.amount() <= 100              // stream 2: normal
)[0].to("important-orders");

3. Filtering via ksqlDB

-- Create a filtered stream
CREATE STREAM important_orders AS
    SELECT *
    FROM orders
    WHERE amount > 100
      AND status = 'COMPLETED'
    EMIT CHANGES;

-- Consumer reads the already-filtered stream
-- consumer.subscribe(List.of("important_orders"))

4. Producer-side Filtering (best option)

// Producer decides which topic to write to
if (order.amount() > 100) {
    producer.send(new ProducerRecord("important-orders", order));
} else {
    producer.send(new ProducerRecord("normal-orders", order));
}

Common Errors Table

Error Symptoms Consequences Solution
Filtering without offset commit Offset doesn’t move for filtered Duplicate processing on restart Commit offset even for filtered
Filtering after full processing Wasted CPU Processed, then discarded Filter BEFORE processing
Filtering in complex logic Hard to maintain Bugs in filter logic Extract filter into a separate step
Without monitoring filtered count Can’t see if filter works Impossible to tune Metrics: processed vs filtered
All messages in one topic Every consumer filters Wasted network + CPU Split into topics on producer side
Without alternative routing Filtered messages are lost Data loss DLQ or separate topic for filtered

Approach Comparison

Approach Complexity Network Overhead CPU Overhead When to Use
Consumer filter Low High (all messages) Low Simple filters
Kafka Streams Medium Medium Medium Complex transformations
ksqlDB Medium Medium Low SQL-style filtering
Producer routing Low Low Low Best practice

When Consumer-side Filtering is Acceptable

  • Simple conditions — filter by key, status, amount
  • Low-volume topics — network cost is negligible
  • Temporary solution — quick fix before proper routing
  • Multi-purpose consumer — consumer needs to see everything for auditing

When NOT to Use Consumer-side Filtering

  • High-volume topics — wasted network/disk/CPU
  • Data privacy — sensitive data shouldn’t reach the consumer at all
  • Regulatory — GDPR, PCI-DSS require data segregation
  • Cost optimization — network/disk waste at scale

Senior Level

Deep Internals

Why Kafka Doesn’t Support Server-side Filtering

Kafka's architectural decision:
  Broker = dumb storage + replication
  Intelligence on producer and consumer

Reasons:
  1. Broker complexity — filter logic on every broker
  2. Performance — scanning all messages for filter = O(n) per fetch
  3. Flexibility — filter logic is application-specific
  4. Consistency — different consumers, different filters

Result:
  FetchRequest → returns ALL messages in range
  Consumer → receives ALL → filters locally
  Broker doesn't know/doesn't care about filter criteria

KIP-290 and KIP-359 discussed server-side filtering but rejected it due to complexity and performance impact.

Consumer Filtering and Offset Management

// CRITICAL: offset management with filtering

// CORRECT:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
    }
    // Offset advances for ALL messages (processed + filtered)
}
consumer.commitSync();  // commit the latest offset

// INCORRECT:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
        // Commit only for processed — offset doesn't advance for filtered
        consumer.commitSync();  // ❌
    }
}
// Filtered messages will be read again on restart!

Kafka Streams Filtering — Internal Mechanics

Kafka Streams filtering:
  builder.stream("orders")              // reads from topic
         .filter(predicate)              // applies filter
         .to("filtered-topic");          // writes to new topic

Internal flow:
  1. Source node: reads from "orders" topic
  2. Filter node: applies predicate, drops non-matching
  3. Sink node: writes matching to "filtered-topic"

Key point:
  Filtered messages are READ from "orders" but NOT WRITTEN to "filtered-topic"
  Network I/O: orders topic → Streams app (all messages)
  Network I/O: Streams app → filtered-topic (only matching)

  This is better than consumer filtering (write I/O saved)
  But worse than producer routing (read I/O wasted)

Trade-offs

Aspect Consumer filtering Producer routing Kafka Streams
Network (read) All messages Only needed All messages
Network (write) N/A Per-topic writes Per-topic writes
Broker disk All messages Split by topic Split by topic
Consumer CPU Filter + process Process only N/A (separate app)
Complexity Low Low-Medium Medium-High
Flexibility High (dynamic filter) Low (static routing) High
Data privacy Low (all data exposed) High (segregated) High (segregated)

Edge Cases

1. Filtered message dependency:

Scenario:
  Topic "orders": order-1 (status=PENDING, $50), order-2 (status=COMPLETED, $200)
  order-2 references order-1 (return order)

  Consumer filter: status=COMPLETED
  order-1 → filtered out (PENDING)
  order-2 → processed
  Problem: order-2 processing fails because order-1 is not in the system

Resolution:
  1. Don't filter dependent messages
  2. Process ALL messages, filter at the action level
  3. Use separate topics per status (producer routing)

2. Dynamic filter requirements:

Scenario:
  Filter criteria changes at runtime:
    Monday: amount > 100
    Tuesday: amount > 50
    Wednesday: amount > 200

  Consumer-side: easy, reload filter config
  Producer routing: hard, producers need updating
  ksqlDB: easy, ALTER STREAM

Resolution for consumer:
  volatile FilterConfig currentFilter;

  while (running) {
      FilterConfig filter = currentFilter;  // atomic read
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, String> record : records) {
          if (filter.matches(record)) {
              process(record);
          }
      }
      consumer.commitSync();
  }

  // Filter update (from config service):
  void updateFilter(FilterConfig newFilter) {
      currentFilter = newFilter;  // atomic write
  }

3. Filtering with ordering guarantee:

Scenario:
  Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
  Filter: status=COMPLETE

  order-1(PENDING) → filtered out
  order-1(COMPLETE) → processed
  order-2(NEW) → filtered out

  Problem: order-2(NEW) should have been processed BEFORE order-1(COMPLETE)
  Filtering broke the ordering!

Resolution:
  1. Don't filter messages that affect ordering
  2. Filter at the topic level (producer routing), not consumer
  3. Process ALL, filter at action level

4. Filtering and DLQ:

// Filtered messages → DLQ for audit
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    if (shouldProcess(record)) {
        process(record);
    } else {
        // Don't process, but don't lose
        dlqProducer.send(new ProducerRecord<>(
            "orders-filtered",  // separate topic for audit
            record.key(),
            record.value()
        ));
    }
}
consumer.commitSync();

5. High-volume filtering — performance impact:

Scenario:
  1M messages/min in topic
  Filter: 99% filtered out, 1% processed
  Consumer receives 1M, processes 10K

Network cost: 1M messages × 1KB = 1GB/min (99% wasted!)
CPU cost: deserialize 1M, filter 1M, process 10K
Disk cost: 1M messages stored (99% never needed)

At this scale:
  Consumer filtering = unacceptable
  Producer routing REQUIRED
  Or Kafka Streams pre-filtering

Performance (Production Numbers)

Approach Throughput Network I/O CPU overhead Disk overhead
No filtering 100K msg/s 100% baseline 100%
Consumer filtering (90% filter) 100K msg/s 100% +5% (deserialize + filter) 100%
Kafka Streams filter 100K msg/s 2× (read + write) +15% (Streams overhead) 10% (filtered only)
Producer routing 100K msg/s 10% (only needed) baseline 10%
ksqlDB filter 100K msg/s 2× (read + write) +10% (ksqlDB) 10%

Production War Story

Situation: Payment gateway, 500K transactions/min. One topic “all-transactions” with all types: authorization, capture, refund, void.

Problem: The refund processor consumer received all 500K msg/min, filtering 95% (only refunds are needed). Network: 500MB/min, CPU: 40% on deserialize + filter. After scaling to 2M msg/min — network saturation, consumer crash.

Investigation:

Current architecture:
  Producer → all-transactions (500K msg/min)
                          ↓
  Auth consumer: filters 90% (processes 50K)
  Capture consumer: filters 85% (processes 75K)
  Refund consumer: filters 95% (processes 25K) ← problem
  Void consumer: filters 98% (processes 10K)

  Total network: 500K × 4 consumers = 2M message deliveries/min
  95% of this traffic is wasted!

Resolution:

New architecture (producer routing):
  Producer → auth-topic (50K msg/min)     → Auth consumer (100% processed)
           → capture-topic (75K msg/min)  → Capture consumer (100% processed)
           → refund-topic (25K msg/min)   → Refund consumer (100% processed)
           → void-topic (10K msg/min)     → Void consumer (100% processed)

  Total network: 160K message deliveries/min (92% reduction!)
  CPU: 0% wasted on filtering
  Disk: 68% reduction (only needed data stored)

Post-mortem lesson: Consumer filtering is OK for low-volume. At scale > 100K msg/min — producer routing is mandatory.

Monitoring (JMX, Prometheus, Burrow)

Application metrics:

// Filter metrics
Counter messagesReceived = meterRegistry.counter("kafka.consumer.messages.received");
Counter messagesProcessed = meterRegistry.counter("kafka.consumer.messages.processed");
Counter messagesFiltered = meterRegistry.counter("kafka.consumer.messages.filtered");

Timer processingTime = Timer.builder("kafka.consumer.processing.time")
    .publishPercentiles(0.5, 0.95, 0.99)
    .register(meterRegistry);

// In consumer loop:
messagesReceived.increment();
if (shouldProcess(record)) {
    processingTime.record(() -> process(record));
    messagesProcessed.increment();
} else {
    messagesFiltered.increment();
}

Prometheus alerts:

- alert: KafkaConsumerFilterRateHigh
  expr: rate(kafka_consumer_messages_filtered_total[5m]) / rate(kafka_consumer_messages_received_total[5m]) > 0.9
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Consumer filtering > 90% on a consistent basis  signal to reconsider architecture. For temporary or experimental filters this is acceptable."

- alert: KafkaConsumerFilteringWastedBandwidth
  expr: rate(kafka_consumer_bytes_received_total[5m]) > 100_000_000  # 100MB/s
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer receiving > 100MB/s  check if filtering is causing waste"

Highload Best Practices

✅ Producer routing for high-volume (>100K msg/min)
✅ Consumer filtering for simple, low-volume cases
✅ Kafka Streams for complex filtering + transformation
✅ ksqlDB for SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ DLQ for filtered messages (audit trail)
✅ Dynamic filter config for runtime changes
✅ Filter BEFORE processing (deserialize only if needed)

❌ Consumer filtering for high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Without alternative routing for filtered data
❌ Complex filter logic in consumer (hard to maintain)

Architectural Decisions

  1. Producer routing > Consumer filtering — move intelligence to the producer
  2. Filter ratio as architecture smell — >90% filter = wrong architecture
  3. One topic per message type — natural filtering at source
  4. Kafka Streams as filter layer — intermediate filtering before consumers
  5. Audit trail for filtered data — DLQ or separate topic

Summary for Senior

  • Kafka does NOT support server-side filtering — architectural decision
  • Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
  • Producer routing = most efficient but least flexible
  • Filter ratio > 90% = strong signal for architecture change
  • Offset management: commit for ALL messages (processed + filtered), not just processed
  • Ordering can be broken by filtering — be careful with dependent messages
  • Dynamic filters: use volatile config, atomic updates
  • At highload (>100K msg/min): consumer filtering is unacceptable
  • Monitor filter rate — key metric for architecture health
  • Best practice: one topic per message type, no filtering needed

🎯 Interview Cheat Sheet

Must know:

  • Kafka does NOT support server-side filtering — architectural decision (broker = dumb pipe)
  • Consumer filtering: receives ALL messages, processes only needed ones; wasted network/disk/CPU
  • Producer routing — best practice: different message types → different topics
  • Kafka Streams / ksqlDB — intermediate filtering before consumers
  • Offset management: commit for ALL messages (processed + filtered), not just processed
  • Filter ratio > 90% = strong signal for architecture change (producer routing needed)
  • Filtering can break ordering for dependent messages

Common follow-up questions:

  • Why doesn’t Kafka support server-side filtering? — Broker complexity, O(n) scan, performance impact.
  • Filter ratio 95% — is this a problem? — Yes, 95% network/disk wasted; producer routing is needed.
  • How to ensure filtering doesn’t break offsets? — Commit the last offset of the batch, including filtered messages.
  • When is consumer-side filtering acceptable? — Low-volume, simple filters, temporary solution.

Red flags (DO NOT say):

  • “Kafka filters messages on the broker” — it doesn’t, architectural decision
  • “Consumer filtering for high-volume is normal” — unacceptable at >100K msg/min
  • “Filter ratio 95% is great practice” — architecture smell, producer routing is needed
  • “Offset is committed only for processed messages” — filtered ones will be read again

Related topics:

  • [[1. What is a topic in Kafka]]
  • [[25. What is DLQ (Dead Letter Queue)]]
  • [[3. How is data distributed across partitions]]
  • [[21. What is batch in Kafka producer]]