Skip to content

Commit 1bead94

Browse files
authored
GH-1938: Support Whole Batch Filtering
Resolves #1938 Previously, when adding a filter strategy to a batch listener, the batch members were filtered one-at-a-time. When database lookups are involved it is more efficient to act on the entire batch. Add another method to the filter strategy to allow filtering the entire batch in one call. * Add unsaved doc changes. * Fix what's new anchor.
1 parent 4404fa7 commit 1bead94

File tree

6 files changed

+51
-13
lines changed

6 files changed

+51
-13
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1624,6 +1624,7 @@ public void pollResults(ConsumerRecords<?, ?> records) {
16241624

16251625
IMPORTANT: If the container factory has a `RecordFilterStrategy` configured, it is ignored for `ConsumerRecords<?, ?>` listeners, with a `WARN` log message emitted.
16261626
Records can only be filtered with a batch listener if the `<List<?>>` form of listener is used.
1627+
By default, records are filtered one-at-a-time; starting with version 2.8, you can override `filterBatch` to filter the entire batch in one call.
16271628

16281629
[[annotation-properties]]
16291630
====== Annotation Properties

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ Batch listeners can now handle conversion exceptions.
2727

2828
See <<batch-listener-conv-errors>> for more information.
2929

30+
`RecordFilterStrategy`, when used with batch listeners, can now filter the entire batch in one call.
31+
See the note at the end of <<batch-listeners>> for more information.
32+
3033
[[x28-template]]
3134
==== `KafkaTemplate` Changes
3235

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractFilteringMessageListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,6 +42,10 @@ protected AbstractFilteringMessageListener(T delegate, RecordFilterStrategy<K, V
4242
this.recordFilterStrategy = recordFilterStrategy;
4343
}
4444

45+
protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
46+
return this.recordFilterStrategy;
47+
}
48+
4549
protected boolean filter(ConsumerRecord<K, V> consumerRecord) {
4650
return this.recordFilterStrategy.filter(consumerRecord);
4751
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19-
import java.util.Iterator;
2019
import java.util.List;
2120

2221
import org.apache.kafka.clients.consumer.Consumer;
@@ -27,6 +26,7 @@
2726
import org.springframework.kafka.listener.ListenerType;
2827
import org.springframework.kafka.support.Acknowledgment;
2928
import org.springframework.lang.Nullable;
29+
import org.springframework.util.Assert;
3030

3131
/**
3232
* A {@link BatchMessageListener} adapter that implements filter logic
@@ -51,6 +51,7 @@ public class FilteringBatchMessageListenerAdapter<K, V>
5151
*/
5252
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
5353
RecordFilterStrategy<K, V> recordFilterStrategy) {
54+
5455
super(delegate, recordFilterStrategy);
5556
this.ackDiscarded = false;
5657
}
@@ -67,19 +68,17 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
6768
*/
6869
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
6970
RecordFilterStrategy<K, V> recordFilterStrategy, boolean ackDiscarded) {
71+
7072
super(delegate, recordFilterStrategy);
7173
this.ackDiscarded = ackDiscarded;
7274
}
7375

7476
@Override
75-
public void onMessage(List<ConsumerRecord<K, V>> consumerRecords, @Nullable Acknowledgment acknowledgment,
77+
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
7678
Consumer<?, ?> consumer) {
77-
Iterator<ConsumerRecord<K, V>> iterator = consumerRecords.iterator();
78-
while (iterator.hasNext()) {
79-
if (filter(iterator.next())) {
80-
iterator.remove();
81-
}
82-
}
79+
80+
List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
81+
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
8382
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
8483
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
8584
/*

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.util.Iterator;
20+
import java.util.List;
21+
1922
import org.apache.kafka.clients.consumer.ConsumerRecord;
2023

2124
/**
@@ -38,4 +41,21 @@ public interface RecordFilterStrategy<K, V> {
3841
*/
3942
boolean filter(ConsumerRecord<K, V> consumerRecord);
4043

44+
/**
45+
* Filter an entire batch of records; to filter all records, return an empty list, not
46+
* null.
47+
* @param records the records.
48+
* @return the filtered records.
49+
* @since 2.8
50+
*/
51+
default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> records) {
52+
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
53+
while (iterator.hasNext()) {
54+
if (filter(iterator.next())) {
55+
iterator.remove();
56+
}
57+
}
58+
return records;
59+
}
60+
4161
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ public void testBatch() throws Exception {
552552
List<?> list = (List<?>) this.listener.payload;
553553
assertThat(list.size()).isGreaterThan(0);
554554
assertThat(list.get(0)).isInstanceOf(String.class);
555-
assertThat(this.recordFilter.called).isTrue();
555+
assertThat(this.recordFilter.batchCalled).isTrue();
556556
assertThat(this.config.listen10Exception).isNotNull();
557557
assertThat(this.listener.receivedGroupId).isEqualTo("list1");
558558

@@ -2474,14 +2474,25 @@ public void setBar(String bar) {
24742474

24752475
public static class RecordPassAllFilter implements RecordFilterStrategy<Integer, CharSequence> {
24762476

2477-
private boolean called;
2477+
boolean called;
2478+
2479+
boolean batchCalled;
24782480

24792481
@Override
24802482
public boolean filter(ConsumerRecord<Integer, CharSequence> consumerRecord) {
2481-
called = true;
2483+
this.called = true;
24822484
return false;
24832485
}
24842486

2487+
@Override
2488+
public List<ConsumerRecord<Integer, CharSequence>> filterBatch(
2489+
List<ConsumerRecord<Integer, CharSequence>> records) {
2490+
2491+
this.batchCalled = true;
2492+
return records;
2493+
}
2494+
2495+
24852496
}
24862497

24872498
public static class FooConverter implements Converter<String, Foo> {

0 commit comments

Comments
 (0)