diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 8135517f66..6900d3fac4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -56,6 +56,7 @@ * @author Kyuhyeok Park * @author Wang Zhiyang * @author Choi Wang Gyu + * @author Chaedong Im */ public class ContainerProperties extends ConsumerProperties { @@ -69,6 +70,16 @@ public enum AckMode { */ RECORD, + /** + * Commit the offset after each record is processed by the listener, but only + * for records that are not filtered out by a {@code RecordFilterStrategy}. + * When a record is filtered (not passed to the listener), no offset commit + * occurs for that record. This mode provides better performance when using + * filtering strategies that filter out a significant portion of records. + * @since 4.0 + */ + RECORD_FILTERED, + /** * Commit the offsets of all records returned by the previous poll after they all * have been processed by the listener. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f611178907..afe3354f15 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -111,6 +111,7 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; +import org.springframework.kafka.listener.adapter.FilteringAware; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; @@ -172,6 +173,7 @@ * @author Christian Fredriksson * @author Timofey Barabanov * @author Janek Lasocki-Biczysko + * @author Chaedong Im */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -677,6 +679,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean isRecordAck; + private final boolean isRecordFilteredAck; + private final BlockingQueue> acks = new LinkedBlockingQueue<>(); private final BlockingQueue seeks = new LinkedBlockingQueue<>(); @@ -871,6 +875,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode); this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; this.isRecordAck = this.ackMode.equals(AckMode.RECORD); + this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED); boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies; this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null; this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null; @@ -933,8 +938,8 @@ else if (listener instanceof MessageListener) { this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals(ListenerType.CONSUMER_AWARE); this.commonErrorHandler = determineCommonErrorHandler(); - Assert.state(!this.isBatchListener || !this.isRecordAck, - "Cannot use AckMode.RECORD with a batch listener"); + Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck), + "Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener"); if (this.containerProperties.getScheduler() != null) { this.taskScheduler = this.containerProperties.getScheduler(); this.taskSchedulerExplicitlySet = true; @@ -1510,7 +1515,7 @@ protected void handleAsyncFailure() { } private void doProcessCommits() { - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { try { processCommits(); } @@ -2260,7 +2265,7 @@ private List> createRecordList(final ConsumerRecords } getAfterRollbackProcessor().clearThreadState(); } - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { processCommits(); } } @@ -2710,7 +2715,7 @@ private void listenerInfo(final ConsumerRecord cRecord) { } private void handleNack(final ConsumerRecords records, final ConsumerRecord cRecord) { - if (!this.autoCommit && !this.isRecordAck) { + if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) { processCommits(); } List> list = new ArrayList<>(); @@ -3060,12 +3065,26 @@ public void checkDeser(final ConsumerRecord cRecord, String headerName) { } } + private boolean isRecordFiltered(ConsumerRecord cRecord) { + Object listener = KafkaMessageListenerContainer.this.getContainerProperties().getMessageListener(); + if (listener instanceof FilteringAware) { + @SuppressWarnings("unchecked") + FilteringAware filteringAware = (FilteringAware) listener; + return filteringAware.wasFiltered(cRecord); + } + return false; + } + public void ackCurrent(final ConsumerRecord cRecord) { ackCurrent(cRecord, false); } public void ackCurrent(final ConsumerRecord cRecord, boolean commitRecovered) { - if (this.isRecordAck && this.producer == null) { + if (this.isRecordFilteredAck && isRecordFiltered(cRecord)) { + return; + } + + if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) { Map offsetsToCommit = buildSingleCommits(cRecord); this.commitLogger.log(() -> COMMITTING + offsetsToCommit); commitOffsets(offsetsToCommit); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java new file mode 100644 index 0000000000..74f869e877 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringAware.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * An interface to indicate that a message listener adapter can report + * whether a record was filtered during processing. + * + * @param the key type. + * @param the value type. + * + * @author Chaedong Im + * @since 4.0 + */ +public interface FilteringAware { + + /** + * Check if the most recent record processed was filtered out. + * This method should be called after a record has been processed + * to determine if the record was filtered and should not trigger + * an offset commit in RECORD_FILTERED acknowledge mode. + * @param record the record to check + * @return true if the record was filtered, false if it was processed + */ + boolean wasFiltered(ConsumerRecord record); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java index b17c24d0e9..e9e86228a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java @@ -32,14 +32,18 @@ * @param the value type. * * @author Gary Russell + * @author Chaedong Im * */ public class FilteringMessageListenerAdapter extends AbstractFilteringMessageListener> - implements AcknowledgingConsumerAwareMessageListener { + implements AcknowledgingConsumerAwareMessageListener, FilteringAware { private final boolean ackDiscarded; + @SuppressWarnings("rawtypes") + private static final ThreadLocal LAST = new ThreadLocal<>(); + /** * Create an instance with the supplied strategy and delegate listener. * @param delegate the delegate. @@ -68,7 +72,11 @@ public FilteringMessageListenerAdapter(MessageListener delegate, public void onMessage(ConsumerRecord consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer) { - if (!filter(consumerRecord)) { + boolean filtered = filter(consumerRecord); + LAST.set(new FilterResult<>(consumerRecord, filtered)); + + + if (!filtered) { switch (this.delegateType) { case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer); case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment); @@ -93,6 +101,17 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) { } } + @Override + public boolean wasFiltered(ConsumerRecord record) { + @SuppressWarnings("unchecked") + FilterResult result = (FilterResult) LAST.get(); + return result != null + && result.record.topic().equals(record.topic()) + && result.record.partition() == record.partition() + && result.record.offset() == record.offset() + && result.wasFiltered; + } + /* * Since the container uses the delegate's type to determine which method to call, we * must implement them all. @@ -113,4 +132,16 @@ public void onMessage(ConsumerRecord data, @Nullable Consumer consum onMessage(data, null, consumer); } + private static class FilterResult { + + final ConsumerRecord record; + + final boolean wasFiltered; + + FilterResult(ConsumerRecord record, boolean wasFiltered) { + this.record = record; + this.wasFiltered = wasFiltered; + } + + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java new file mode 100644 index 0000000000..f8998d4e7c --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java @@ -0,0 +1,401 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.kafka.support.Acknowledgment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the RECORD_FILTERED acknowledge mode. + * + * Related to GitHub issue #3562 + * + * @author Chaedong Im + * @see AckModeRecordWithFilteringTest + */ +class AckModeRecordFilteredTest { + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { + // Given: A container with RECORD_FILTERED ack mode + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(2); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Will be filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Will be processed -> COMMIT offset 2 + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2"), // Will be filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 3, "key3", "value3") // Will be processed -> COMMIT offset 4 + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When: Start the container and process records + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify that only odd offset records were processed + assertThat(processedValues).containsExactly("value1", "value3"); + + verify(consumer, times(2)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { + // Given: All records are filtered + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> true; + + List processedValues = new ArrayList<>(); + MessageListener listener = record -> processedValues.add(record.value()); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2") // Filtered -> NO COMMIT + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When + container.start(); + Thread.sleep(1000); + container.stop(); + + assertThat(processedValues).isEmpty(); + verify(consumer, never()).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testRecordFilteredModeWithMixedPartitions() throws InterruptedException { + // Given: Mixed partitions with different filtering scenarios + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> + record.value().contains("skip"); + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + + List> records = List.of( + // Partition 0 + new ConsumerRecord<>("test-topic", 0, 0, "key0", "process1"), // Processed -> COMMIT offset 1 + new ConsumerRecord<>("test-topic", 0, 1, "key1", "skip1"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 0, 2, "key2", "process2"), // Processed -> COMMIT offset 3 + // Partition 1 + new ConsumerRecord<>("test-topic", 1, 0, "key3", "skip2"), // Filtered -> NO COMMIT + new ConsumerRecord<>("test-topic", 1, 1, "key4", "process3"), // Processed -> COMMIT offset 2 + new ConsumerRecord<>("test-topic", 1, 2, "key5", "skip3") // Filtered -> NO COMMIT + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp0, records.subList(0, 3)); + recordsMap.put(tp1, records.subList(3, 6)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).containsExactly("process1", "process2", "process3"); + verify(consumer, times(3)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testRecordFilteredModeEfficiencyGains() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 10 != 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(1); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(new ConsumerRecord<>("test-topic", 0, i, "key" + i, "value" + i)); + } + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).hasSize(1); + assertThat(processedValues.get(0)).isEqualTo("value0"); + verify(consumer, times(1)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> false; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: All records processed + assertThat(processedValues).containsExactly("value0", "value1", "value2"); + verify(consumer, times(3)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void recordFilteredModeShouldBeThreadIsolated() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer c0 = mock(Consumer.class); + Consumer c1 = mock(Consumer.class); + given(cf.createConsumer(any(), any(), any(), any())).willReturn(c0, c1); + + ContainerProperties props = new ContainerProperties("iso-topic"); + props.setGroupId("iso-group"); + props.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED); + + CountDownLatch aHasSetState = new CountDownLatch(1); + CountDownLatch bHasProcessed = new CountDownLatch(1); + RecordFilterStrategy filter = rec -> rec.offset() == 0; + + FilteringMessageListenerAdapter adapter = + new FilteringMessageListenerAdapter<>((MessageListener) r -> { + }, filter) { + @Override + public void onMessage(ConsumerRecord rec, Acknowledgment ack, Consumer consumer) { + super.onMessage(rec, ack, consumer); + if (rec.offset() == 0) { + aHasSetState.countDown(); + try { + bHasProcessed.await(500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + else if (rec.offset() == 1) { + try { + aHasSetState.await(200, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + bHasProcessed.countDown(); + } + } + }; + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, props); + container.setConcurrency(2); + container.setupMessageListener(adapter); + + TopicPartition tp0 = new TopicPartition("iso-topic", 0); + TopicPartition tp1 = new TopicPartition("iso-topic", 1); + + ConsumerRecords poll0 = new ConsumerRecords<>(Map.of( + tp0, List.of(new ConsumerRecord<>("iso-topic", 0, 0, "k0", "v0")) + )); + ConsumerRecords poll1 = new ConsumerRecords<>(Map.of( + tp1, List.of(new ConsumerRecord<>("iso-topic", 1, 1, "k1", "v1")) + )); + + given(c0.poll(any(Duration.class))).willReturn(poll0).willReturn(ConsumerRecords.empty()); + given(c1.poll(any(Duration.class))).willReturn(poll1).willReturn(ConsumerRecords.empty()); + + // when: containers process records concurrently (thread-local isolation should apply) + container.start(); + Thread.sleep(400); + container.stop(); + + // then: consumer c1 commits only its record, while c0 (filtered) does not + verify(c1, times(1)).commitSync(any(), any(Duration.class)); + verify(c0, never()).commitSync(any(), any(Duration.class)); + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java new file mode 100644 index 0000000000..a47698f408 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java @@ -0,0 +1,321 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies. + * + * Related to GitHub issue #3562 + * + * @author Chaedong Im + * @see AckModeRecordFilteredTest + */ +class AckModeRecordWithFilteringTest { + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testCurrentRecordModeCommitsAllRecords() throws InterruptedException { + // Given: A container with RECORD ack mode and a filter that filters out even offsets + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(2); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1"), + new ConsumerRecord<>("test-topic", 0, 2, "key2", "value2"), + new ConsumerRecord<>("test-topic", 0, 3, "key3", "value3") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When: Start the container and process records + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify that only odd offset records were processed + assertThat(processedValues).containsExactly("value1", "value3"); + + verify(consumer, times(4)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testAllRecordsFilteredStillCommits() throws InterruptedException { + // Given: A container where all records are filtered + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> true; + + List processedValues = new ArrayList<>(); + MessageListener listener = record -> processedValues.add(record.value()); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "value0"), + new ConsumerRecord<>("test-topic", 0, 1, "key1", "value1") + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When: Start the container + container.start(); + Thread.sleep(1000); + container.stop(); + + // Then: Verify no records were processed + assertThat(processedValues).isEmpty(); + verify(consumer, times(2)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testMixedPartitionsWithFiltering() throws InterruptedException { + // Given: Multiple partitions with different records + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = + record -> record.value().contains("skip"); + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(3); + + MessageListener listener = record -> { + processedValues.add(record.value()); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + + List> records = List.of( + // Partition 0 + new ConsumerRecord<>("test-topic", 0, 0, "key0", "process_me"), // Will be processed + new ConsumerRecord<>("test-topic", 0, 1, "key1", "skip_me"), // Will be filtered + // Partition 1 + new ConsumerRecord<>("test-topic", 1, 0, "key2", "process_me"), // Will be processed + new ConsumerRecord<>("test-topic", 1, 1, "key3", "skip_me"), // Will be filtered + new ConsumerRecord<>("test-topic", 1, 2, "key4", "process_me") // Will be processed + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp0, records.subList(0, 2)); + recordsMap.put(tp1, records.subList(2, 5)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When: Start container + container.start(); + + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + // Then: Verify correct records were processed + assertThat(processedValues).containsExactly("process_me", "process_me", "process_me"); + verify(consumer, times(5)).commitSync(any(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testCommitLogging() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.RECORD); + containerProperties.setGroupId("test-group"); + containerProperties.setLogContainerConfig(true); + + RecordFilterStrategy filterStrategy = record -> record.offset() == 0; + + CountDownLatch processedLatch = new CountDownLatch(1); + MessageListener listener = record -> processedLatch.countDown(); + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "filtered"), // Will be filtered + new ConsumerRecord<>("test-topic", 0, 1, "key1", "processed") // Will be processed + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + // When + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + verify(consumer, times(2)).commitSync(anyMap(), any(Duration.class)); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + void testAckDiscardedParameterBehavior() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(consumerFactory.createConsumer(any(), any(), any(), any())).willReturn(consumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL); + containerProperties.setGroupId("test-group"); + + RecordFilterStrategy filterStrategy = record -> record.offset() % 2 == 0; + + List processedValues = new ArrayList<>(); + CountDownLatch processedLatch = new CountDownLatch(1); + + AcknowledgingMessageListener listener = (record, ack) -> { + processedValues.add(record.value()); + ack.acknowledge(); + processedLatch.countDown(); + }; + + FilteringMessageListenerAdapter filteringAdapter = + new FilteringMessageListenerAdapter<>(listener, filterStrategy, true); + containerProperties.setMessageListener(filteringAdapter); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + TopicPartition tp = new TopicPartition("test-topic", 0); + List> records = List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key0", "filtered"), // Will be filtered but acked + new ConsumerRecord<>("test-topic", 0, 1, "key1", "processed") // Will be processed and acked + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(tp, records); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + given(consumer.poll(any(Duration.class))) + .willReturn(consumerRecords) + .willReturn(ConsumerRecords.empty()); + + container.start(); + assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(500); + container.stop(); + + assertThat(processedValues).containsExactly("processed"); + } +}