Skip to content

Commit 8640e72

Browse files
committed
test: add test for AckMode Record and Record Filtered
Signed-off-by: Chaedong Im <[email protected]>
1 parent c102814 commit 8640e72

File tree

3 files changed

+641
-0
lines changed

3 files changed

+641
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ public enum AckMode {
6969
*/
7070
RECORD,
7171

72+
/**
73+
* Commit the offset after each record is processed by the listener, but only
74+
* for records that are not filtered out by a {@code RecordFilterStrategy}.
75+
* When a record is filtered (not passed to the listener), no offset commit
76+
* occurs for that record. This mode provides better performance when using
77+
* filtering strategies that filter out a significant portion of records.
78+
* @since 4.0
79+
*/
80+
RECORD_FILTERED,
81+
7282
/**
7383
* Commit the offsets of all records returned by the previous poll after they all
7484
* have been processed by the listener.
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/*
2+
* Copyright 2024-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.clients.consumer.ConsumerRecords;
22+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
23+
import org.apache.kafka.common.TopicPartition;
24+
import org.junit.jupiter.api.Test;
25+
import org.mockito.ArgumentCaptor;
26+
import org.springframework.kafka.core.ConsumerFactory;
27+
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
28+
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
29+
30+
import java.time.Duration;
31+
import java.util.*;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.mockito.ArgumentMatchers.any;
37+
import static org.mockito.BDDMockito.given;
38+
import static org.mockito.Mockito.*;
39+
40+
/**
41+
* Tests for the RECORD_FILTERED acknowledge mode.
42+
*
43+
* Related to GitHub issue #3562
44+
*
45+
* @author Chaedong Im
46+
* @see AckModeRecordWithFilteringTest
47+
*/
48+
public class AckModeRecordFilteredTest {
49+
50+
@Test
51+
public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException {
52+
// Given: A container with RECORD_FILTERED ack mode
53+
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
54+
Consumer<String, String> consumer = mock(Consumer.class);
55+
given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer);
56+
57+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
58+
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
59+
containerProperties.setGroupId("test-group");
60+
61+
RecordFilterStrategy<String, String> filterStrategy = record -> record.offset() % 2 == 0;
62+
63+
List<String> processedValues = new ArrayList<>();
64+
CountDownLatch processedLatch = new CountDownLatch(2);
65+
66+
MessageListener<String, String> listener = record -> {
67+
processedValues.add(record.value());
68+
processedLatch.countDown();
69+
};
70+
71+
FilteringMessageListenerAdapter<String, String> filteringAdapter =
72+
new FilteringMessageListenerAdapter<>(listener, filterStrategy);
73+
containerProperties.setMessageListener(filteringAdapter);
74+
75+
KafkaMessageListenerContainer<String, String> container =
76+
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
77+
78+
TopicPartition tp = new TopicPartition("test-topic", 0);
79+
List<ConsumerRecord<String, String>> records = List.of(
80+
new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Will be filtered -> NO COMMIT
81+
new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Will be processed -> COMMIT offset 2
82+
new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2"), // Will be filtered -> NO COMMIT
83+
new ConsumerRecord<>("test-topic", 0, 3, "key3", "value3") // Will be processed -> COMMIT offset 4
84+
);
85+
86+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
87+
recordsMap.put(tp, records);
88+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
89+
90+
given(consumer.poll(any(Duration.class)))
91+
.willReturn(consumerRecords)
92+
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
93+
94+
// When: Start the container and process records
95+
container.start();
96+
assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue();
97+
Thread.sleep(500);
98+
container.stop();
99+
100+
// Then: Verify that only odd offset records were processed
101+
assertThat(processedValues).containsExactly("value1", "value3");
102+
103+
verify(consumer, times(2)).commitSync(any(), any(Duration.class));
104+
}
105+
106+
@Test
107+
public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException {
108+
// Given: All records are filtered
109+
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
110+
Consumer<String, String> consumer = mock(Consumer.class);
111+
given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer);
112+
113+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
114+
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
115+
containerProperties.setGroupId("test-group");
116+
117+
RecordFilterStrategy<String, String> filterStrategy = record -> true;
118+
119+
List<String> processedValues = new ArrayList<>();
120+
MessageListener<String, String> listener = record -> processedValues.add(record.value());
121+
122+
FilteringMessageListenerAdapter<String, String> filteringAdapter =
123+
new FilteringMessageListenerAdapter<>(listener, filterStrategy);
124+
containerProperties.setMessageListener(filteringAdapter);
125+
126+
KafkaMessageListenerContainer<String, String> container =
127+
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
128+
129+
TopicPartition tp = new TopicPartition("test-topic", 0);
130+
List<ConsumerRecord<String, String>> records = List.of(
131+
new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Filtered -> NO COMMIT
132+
new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Filtered -> NO COMMIT
133+
new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2") // Filtered -> NO COMMIT
134+
);
135+
136+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
137+
recordsMap.put(tp, records);
138+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
139+
140+
given(consumer.poll(any(Duration.class)))
141+
.willReturn(consumerRecords)
142+
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
143+
144+
// When
145+
container.start();
146+
Thread.sleep(1000);
147+
container.stop();
148+
149+
assertThat(processedValues).isEmpty();
150+
verify(consumer, never()).commitSync(any(), any(Duration.class));
151+
}
152+
153+
@Test
154+
public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException {
155+
// Given: Mixed partitions with different filtering scenarios
156+
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
157+
Consumer<String, String> consumer = mock(Consumer.class);
158+
given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer);
159+
160+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
161+
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
162+
containerProperties.setGroupId("test-group");
163+
164+
RecordFilterStrategy<String, String> filterStrategy = record ->
165+
record.value().contains("skip");
166+
167+
List<String> processedValues = new ArrayList<>();
168+
CountDownLatch processedLatch = new CountDownLatch(3);
169+
170+
MessageListener<String, String> listener = record -> {
171+
processedValues.add(record.value());
172+
processedLatch.countDown();
173+
};
174+
175+
FilteringMessageListenerAdapter<String, String> filteringAdapter =
176+
new FilteringMessageListenerAdapter<>(listener, filterStrategy);
177+
containerProperties.setMessageListener(filteringAdapter);
178+
179+
KafkaMessageListenerContainer<String, String> container =
180+
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
181+
182+
TopicPartition tp0 = new TopicPartition("test-topic", 0);
183+
TopicPartition tp1 = new TopicPartition("test-topic", 1);
184+
185+
List<ConsumerRecord<String, String>> records = List.of(
186+
// Partition 0
187+
new ConsumerRecord<>("test-topic", 0, 0, "key0", "process1"), // Processed -> COMMIT offset 1
188+
new ConsumerRecord<>("test-topic", 0, 1, "key1", "skip1"), // Filtered -> NO COMMIT
189+
new ConsumerRecord<>("test-topic", 0, 2, "key2", "process2"), // Processed -> COMMIT offset 3
190+
// Partition 1
191+
new ConsumerRecord<>("test-topic", 1, 0, "key3", "skip2"), // Filtered -> NO COMMIT
192+
new ConsumerRecord<>("test-topic", 1, 1, "key4", "process3"), // Processed -> COMMIT offset 2
193+
new ConsumerRecord<>("test-topic", 1, 2, "key5", "skip3") // Filtered -> NO COMMIT
194+
);
195+
196+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
197+
recordsMap.put(tp0, records.subList(0, 3));
198+
recordsMap.put(tp1, records.subList(3, 6));
199+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
200+
201+
given(consumer.poll(any(Duration.class)))
202+
.willReturn(consumerRecords)
203+
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
204+
205+
// When
206+
container.start();
207+
assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue();
208+
Thread.sleep(500);
209+
container.stop();
210+
211+
assertThat(processedValues).containsExactly("process1", "process2", "process3");
212+
verify(consumer, times(3)).commitSync(any(), any(Duration.class));
213+
}
214+
215+
@Test
216+
public void testRecordFilteredModeEfficiencyGains() throws InterruptedException {
217+
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
218+
Consumer<String, String> consumer = mock(Consumer.class);
219+
given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer);
220+
221+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
222+
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
223+
containerProperties.setGroupId("test-group");
224+
225+
RecordFilterStrategy<String, String> filterStrategy = record -> record.offset() % 10 != 0;
226+
227+
List<String> processedValues = new ArrayList<>();
228+
CountDownLatch processedLatch = new CountDownLatch(1);
229+
230+
MessageListener<String, String> listener = record -> {
231+
processedValues.add(record.value());
232+
processedLatch.countDown();
233+
};
234+
235+
FilteringMessageListenerAdapter<String, String> filteringAdapter =
236+
new FilteringMessageListenerAdapter<>(listener, filterStrategy);
237+
containerProperties.setMessageListener(filteringAdapter);
238+
239+
KafkaMessageListenerContainer<String, String> container =
240+
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
241+
242+
TopicPartition tp = new TopicPartition("test-topic", 0);
243+
List<ConsumerRecord<String, String>> records = new ArrayList<>();
244+
for (int i = 0; i < 10; i++) {
245+
records.add(new ConsumerRecord<>("test-topic", 0, i, "key" + i, "value" + i));
246+
}
247+
248+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
249+
recordsMap.put(tp, records);
250+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
251+
252+
given(consumer.poll(any(Duration.class)))
253+
.willReturn(consumerRecords)
254+
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
255+
256+
// When
257+
container.start();
258+
assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue();
259+
Thread.sleep(500);
260+
container.stop();
261+
262+
assertThat(processedValues).hasSize(1);
263+
assertThat(processedValues.get(0)).isEqualTo("value0");
264+
verify(consumer, times(1)).commitSync(any(), any(Duration.class));
265+
}
266+
267+
@Test
268+
public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException {
269+
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
270+
Consumer<String, String> consumer = mock(Consumer.class);
271+
given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer);
272+
273+
ContainerProperties containerProperties = new ContainerProperties("test-topic");
274+
containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
275+
containerProperties.setGroupId("test-group");
276+
277+
RecordFilterStrategy<String, String> filterStrategy = record -> false;
278+
279+
List<String> processedValues = new ArrayList<>();
280+
CountDownLatch processedLatch = new CountDownLatch(3);
281+
282+
MessageListener<String, String> listener = record -> {
283+
processedValues.add(record.value());
284+
processedLatch.countDown();
285+
};
286+
287+
FilteringMessageListenerAdapter<String, String> filteringAdapter =
288+
new FilteringMessageListenerAdapter<>(listener, filterStrategy);
289+
containerProperties.setMessageListener(filteringAdapter);
290+
291+
KafkaMessageListenerContainer<String, String> container =
292+
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
293+
294+
TopicPartition tp = new TopicPartition("test-topic", 0);
295+
List<ConsumerRecord<String, String>> records = List.of(
296+
new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"),
297+
new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"),
298+
new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2")
299+
);
300+
301+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
302+
recordsMap.put(tp, records);
303+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
304+
305+
given(consumer.poll(any(Duration.class)))
306+
.willReturn(consumerRecords)
307+
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
308+
309+
// When
310+
container.start();
311+
assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue();
312+
Thread.sleep(500);
313+
container.stop();
314+
315+
// Then: All records processed
316+
assertThat(processedValues).containsExactly("value0", "value1", "value2");
317+
verify(consumer, times(3)).commitSync(any(), any(Duration.class));
318+
}
319+
}

0 commit comments

Comments
 (0)