How to implement message filtering on consumer side in Kafka
Kafka does not support broker-side filtering. All messages from a topic are delivered to the consumer, and it decides which ones to process and which to ignore.
Junior Level
Simple Definition
Kafka does not support broker-side filtering. All messages from a topic are delivered to the consumer, and it decides which ones to process and which to ignore.
This is an architectural decision in Kafka — the broker works as a “smart pipe” and doesn’t look into message contents. This simplifies the broker and delivers maximum performance.
Analogy
Imagine a mailbox. The postman (Kafka) delivers ALL letters. You (consumer) open the mailbox and:
- Advertising → trash can (you ignore it)
- Electricity bill → you pay it (process it)
- Letter from a friend → you read it (process it)
The postman doesn’t filter — he brings everything. Filtering is your job.
Basic Example
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Filtering on consumer side
if (shouldProcess(record)) {
process(record);
}
// We ignore it — but the offset still moves
}
consumer.commitSync();
}
private boolean shouldProcess(ConsumerRecord<String, String> record) {
// Example: we only process orders > $100
Order order = parseOrder(record.value());
return order.amount() > 100;
}
When This Matters
- Multiple event types in one topic — filter by type
- Multi-tenant — filter by tenant ID
- Feature flags — filter by conditions
- Data privacy — filter sensitive data
Middle Level
Filtering Approaches
1. Consumer-side Filtering (simplest)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Order order = parseOrder(record.value());
if (order.status().equals("COMPLETED") && order.amount() > 100) {
process(order);
}
// We ignore PENDING, CANCELLED, small orders
}
2. Filtering via Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
// Filtering
orders.filter((key, order) -> order.amount() > 100 && order.status().equals("COMPLETED"))
.to("important-orders"); // new topic
// Or side-output for filtered messages
orders.branch(
(key, order) -> order.amount() > 100, // stream 1: important
(key, order) -> order.amount() <= 100 // stream 2: normal
)[0].to("important-orders");
3. Filtering via ksqlDB
-- Create a filtered stream
CREATE STREAM important_orders AS
SELECT *
FROM orders
WHERE amount > 100
AND status = 'COMPLETED'
EMIT CHANGES;
-- Consumer reads the already-filtered stream
-- consumer.subscribe(List.of("important_orders"))
4. Producer-side Filtering (best option)
// Producer decides which topic to write to
if (order.amount() > 100) {
producer.send(new ProducerRecord("important-orders", order));
} else {
producer.send(new ProducerRecord("normal-orders", order));
}
Common Errors Table
| Error | Symptoms | Consequences | Solution |
|---|---|---|---|
| Filtering without offset commit | Offset doesn’t move for filtered | Duplicate processing on restart | Commit offset even for filtered |
| Filtering after full processing | Wasted CPU | Processed, then discarded | Filter BEFORE processing |
| Filtering in complex logic | Hard to maintain | Bugs in filter logic | Extract filter into a separate step |
| Without monitoring filtered count | Can’t see if filter works | Impossible to tune | Metrics: processed vs filtered |
| All messages in one topic | Every consumer filters | Wasted network + CPU | Split into topics on producer side |
| Without alternative routing | Filtered messages are lost | Data loss | DLQ or separate topic for filtered |
Approach Comparison
| Approach | Complexity | Network Overhead | CPU Overhead | When to Use |
|---|---|---|---|---|
| Consumer filter | Low | High (all messages) | Low | Simple filters |
| Kafka Streams | Medium | Medium | Medium | Complex transformations |
| ksqlDB | Medium | Medium | Low | SQL-style filtering |
| Producer routing | Low | Low | Low | Best practice |
When Consumer-side Filtering is Acceptable
- Simple conditions — filter by key, status, amount
- Low-volume topics — network cost is negligible
- Temporary solution — quick fix before proper routing
- Multi-purpose consumer — consumer needs to see everything for auditing
When NOT to Use Consumer-side Filtering
- High-volume topics — wasted network/disk/CPU
- Data privacy — sensitive data shouldn’t reach the consumer at all
- Regulatory — GDPR, PCI-DSS require data segregation
- Cost optimization — network/disk waste at scale
Senior Level
Deep Internals
Why Kafka Doesn’t Support Server-side Filtering
Kafka's architectural decision:
Broker = dumb storage + replication
Intelligence on producer and consumer
Reasons:
1. Broker complexity — filter logic on every broker
2. Performance — scanning all messages for filter = O(n) per fetch
3. Flexibility — filter logic is application-specific
4. Consistency — different consumers, different filters
Result:
FetchRequest → returns ALL messages in range
Consumer → receives ALL → filters locally
Broker doesn't know/doesn't care about filter criteria
KIP-290 and KIP-359 discussed server-side filtering but rejected it due to complexity and performance impact.
Consumer Filtering and Offset Management
// CRITICAL: offset management with filtering
// CORRECT:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
}
// Offset advances for ALL messages (processed + filtered)
}
consumer.commitSync(); // commit the latest offset
// INCORRECT:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
// Commit only for processed — offset doesn't advance for filtered
consumer.commitSync(); // ❌
}
}
// Filtered messages will be read again on restart!
Kafka Streams Filtering — Internal Mechanics
Kafka Streams filtering:
builder.stream("orders") // reads from topic
.filter(predicate) // applies filter
.to("filtered-topic"); // writes to new topic
Internal flow:
1. Source node: reads from "orders" topic
2. Filter node: applies predicate, drops non-matching
3. Sink node: writes matching to "filtered-topic"
Key point:
Filtered messages are READ from "orders" but NOT WRITTEN to "filtered-topic"
Network I/O: orders topic → Streams app (all messages)
Network I/O: Streams app → filtered-topic (only matching)
This is better than consumer filtering (write I/O saved)
But worse than producer routing (read I/O wasted)
Trade-offs
| Aspect | Consumer filtering | Producer routing | Kafka Streams |
|---|---|---|---|
| Network (read) | All messages | Only needed | All messages |
| Network (write) | N/A | Per-topic writes | Per-topic writes |
| Broker disk | All messages | Split by topic | Split by topic |
| Consumer CPU | Filter + process | Process only | N/A (separate app) |
| Complexity | Low | Low-Medium | Medium-High |
| Flexibility | High (dynamic filter) | Low (static routing) | High |
| Data privacy | Low (all data exposed) | High (segregated) | High (segregated) |
Edge Cases
1. Filtered message dependency:
Scenario:
Topic "orders": order-1 (status=PENDING, $50), order-2 (status=COMPLETED, $200)
order-2 references order-1 (return order)
Consumer filter: status=COMPLETED
order-1 → filtered out (PENDING)
order-2 → processed
Problem: order-2 processing fails because order-1 is not in the system
Resolution:
1. Don't filter dependent messages
2. Process ALL messages, filter at the action level
3. Use separate topics per status (producer routing)
2. Dynamic filter requirements:
Scenario:
Filter criteria changes at runtime:
Monday: amount > 100
Tuesday: amount > 50
Wednesday: amount > 200
Consumer-side: easy, reload filter config
Producer routing: hard, producers need updating
ksqlDB: easy, ALTER STREAM
Resolution for consumer:
volatile FilterConfig currentFilter;
while (running) {
FilterConfig filter = currentFilter; // atomic read
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (filter.matches(record)) {
process(record);
}
}
consumer.commitSync();
}
// Filter update (from config service):
void updateFilter(FilterConfig newFilter) {
currentFilter = newFilter; // atomic write
}
3. Filtering with ordering guarantee:
Scenario:
Partition 0: [order-1(PENDING), order-1(COMPLETE), order-2(NEW)]
Filter: status=COMPLETE
order-1(PENDING) → filtered out
order-1(COMPLETE) → processed
order-2(NEW) → filtered out
Problem: order-2(NEW) should have been processed BEFORE order-1(COMPLETE)
Filtering broke the ordering!
Resolution:
1. Don't filter messages that affect ordering
2. Filter at the topic level (producer routing), not consumer
3. Process ALL, filter at action level
4. Filtering and DLQ:
// Filtered messages → DLQ for audit
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
process(record);
} else {
// Don't process, but don't lose
dlqProducer.send(new ProducerRecord<>(
"orders-filtered", // separate topic for audit
record.key(),
record.value()
));
}
}
consumer.commitSync();
5. High-volume filtering — performance impact:
Scenario:
1M messages/min in topic
Filter: 99% filtered out, 1% processed
Consumer receives 1M, processes 10K
Network cost: 1M messages × 1KB = 1GB/min (99% wasted!)
CPU cost: deserialize 1M, filter 1M, process 10K
Disk cost: 1M messages stored (99% never needed)
At this scale:
Consumer filtering = unacceptable
Producer routing REQUIRED
Or Kafka Streams pre-filtering
Performance (Production Numbers)
| Approach | Throughput | Network I/O | CPU overhead | Disk overhead |
|---|---|---|---|---|
| No filtering | 100K msg/s | 100% | baseline | 100% |
| Consumer filtering (90% filter) | 100K msg/s | 100% | +5% (deserialize + filter) | 100% |
| Kafka Streams filter | 100K msg/s | 2× (read + write) | +15% (Streams overhead) | 10% (filtered only) |
| Producer routing | 100K msg/s | 10% (only needed) | baseline | 10% |
| ksqlDB filter | 100K msg/s | 2× (read + write) | +10% (ksqlDB) | 10% |
Production War Story
Situation: Payment gateway, 500K transactions/min. One topic “all-transactions” with all types: authorization, capture, refund, void.
Problem: The refund processor consumer received all 500K msg/min, filtering 95% (only refunds are needed). Network: 500MB/min, CPU: 40% on deserialize + filter. After scaling to 2M msg/min — network saturation, consumer crash.
Investigation:
Current architecture:
Producer → all-transactions (500K msg/min)
↓
Auth consumer: filters 90% (processes 50K)
Capture consumer: filters 85% (processes 75K)
Refund consumer: filters 95% (processes 25K) ← problem
Void consumer: filters 98% (processes 10K)
Total network: 500K × 4 consumers = 2M message deliveries/min
95% of this traffic is wasted!
Resolution:
New architecture (producer routing):
Producer → auth-topic (50K msg/min) → Auth consumer (100% processed)
→ capture-topic (75K msg/min) → Capture consumer (100% processed)
→ refund-topic (25K msg/min) → Refund consumer (100% processed)
→ void-topic (10K msg/min) → Void consumer (100% processed)
Total network: 160K message deliveries/min (92% reduction!)
CPU: 0% wasted on filtering
Disk: 68% reduction (only needed data stored)
Post-mortem lesson: Consumer filtering is OK for low-volume. At scale > 100K msg/min — producer routing is mandatory.
Monitoring (JMX, Prometheus, Burrow)
Application metrics:
// Filter metrics
Counter messagesReceived = meterRegistry.counter("kafka.consumer.messages.received");
Counter messagesProcessed = meterRegistry.counter("kafka.consumer.messages.processed");
Counter messagesFiltered = meterRegistry.counter("kafka.consumer.messages.filtered");
Timer processingTime = Timer.builder("kafka.consumer.processing.time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
// In consumer loop:
messagesReceived.increment();
if (shouldProcess(record)) {
processingTime.record(() -> process(record));
messagesProcessed.increment();
} else {
messagesFiltered.increment();
}
Prometheus alerts:
- alert: KafkaConsumerFilterRateHigh
expr: rate(kafka_consumer_messages_filtered_total[5m]) / rate(kafka_consumer_messages_received_total[5m]) > 0.9
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer filtering > 90% on a consistent basis — signal to reconsider architecture. For temporary or experimental filters this is acceptable."
- alert: KafkaConsumerFilteringWastedBandwidth
expr: rate(kafka_consumer_bytes_received_total[5m]) > 100_000_000 # 100MB/s
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer receiving > 100MB/s — check if filtering is causing waste"
Highload Best Practices
✅ Producer routing for high-volume (>100K msg/min)
✅ Consumer filtering for simple, low-volume cases
✅ Kafka Streams for complex filtering + transformation
✅ ksqlDB for SQL-style filtering
✅ Monitor filter rate (processed vs received ratio)
✅ Alert on high filter rate (>90% = architecture smell)
✅ Separate topics per message type (producer responsibility)
✅ DLQ for filtered messages (audit trail)
✅ Dynamic filter config for runtime changes
✅ Filter BEFORE processing (deserialize only if needed)
❌ Consumer filtering for high-volume topics
❌ Filtering without monitoring (blind waste)
❌ All messages in one topic (anti-pattern at scale)
❌ Filtering messages that affect ordering
❌ Without alternative routing for filtered data
❌ Complex filter logic in consumer (hard to maintain)
Architectural Decisions
- Producer routing > Consumer filtering — move intelligence to the producer
- Filter ratio as architecture smell — >90% filter = wrong architecture
- One topic per message type — natural filtering at source
- Kafka Streams as filter layer — intermediate filtering before consumers
- Audit trail for filtered data — DLQ or separate topic
Summary for Senior
- Kafka does NOT support server-side filtering — architectural decision
- Consumer filtering = simplest but least efficient (wasted network/disk/CPU)
- Producer routing = most efficient but least flexible
- Filter ratio > 90% = strong signal for architecture change
- Offset management: commit for ALL messages (processed + filtered), not just processed
- Ordering can be broken by filtering — be careful with dependent messages
- Dynamic filters: use volatile config, atomic updates
- At highload (>100K msg/min): consumer filtering is unacceptable
- Monitor filter rate — key metric for architecture health
- Best practice: one topic per message type, no filtering needed
🎯 Interview Cheat Sheet
Must know:
- Kafka does NOT support server-side filtering — architectural decision (broker = dumb pipe)
- Consumer filtering: receives ALL messages, processes only needed ones; wasted network/disk/CPU
- Producer routing — best practice: different message types → different topics
- Kafka Streams / ksqlDB — intermediate filtering before consumers
- Offset management: commit for ALL messages (processed + filtered), not just processed
- Filter ratio > 90% = strong signal for architecture change (producer routing needed)
- Filtering can break ordering for dependent messages
Common follow-up questions:
- Why doesn’t Kafka support server-side filtering? — Broker complexity, O(n) scan, performance impact.
- Filter ratio 95% — is this a problem? — Yes, 95% network/disk wasted; producer routing is needed.
- How to ensure filtering doesn’t break offsets? — Commit the last offset of the batch, including filtered messages.
- When is consumer-side filtering acceptable? — Low-volume, simple filters, temporary solution.
Red flags (DO NOT say):
- “Kafka filters messages on the broker” — it doesn’t, architectural decision
- “Consumer filtering for high-volume is normal” — unacceptable at >100K msg/min
- “Filter ratio 95% is great practice” — architecture smell, producer routing is needed
- “Offset is committed only for processed messages” — filtered ones will be read again
Related topics:
- [[1. What is a topic in Kafka]]
- [[25. What is DLQ (Dead Letter Queue)]]
- [[3. How is data distributed across partitions]]
- [[21. What is batch in Kafka producer]]