Skip to content

Commit 4e8b8dd

Browse files
committed
fix: use ThreadLocal to isolate filter result per thread in FilteringMessageListenerAdapter
Signed-off-by: Chaedong Im <[email protected]>
1 parent 507ddbc commit 4e8b8dd

File tree

2 files changed

+84
-7
lines changed

2 files changed

+84
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19-
import java.util.concurrent.atomic.AtomicReference;
20-
2119
import org.apache.kafka.clients.consumer.Consumer;
2220
import org.apache.kafka.clients.consumer.ConsumerRecord;
2321
import org.jspecify.annotations.Nullable;
@@ -43,7 +41,8 @@ public class FilteringMessageListenerAdapter<K, V>
4341

4442
private final boolean ackDiscarded;
4543

46-
private final AtomicReference<@Nullable FilterResult<K, V>> lastResult = new AtomicReference<>();
44+
@SuppressWarnings("rawtypes")
45+
private static final ThreadLocal<FilterResult> LAST = new ThreadLocal<>();
4746

4847
/**
4948
* Create an instance with the supplied strategy and delegate listener.
@@ -74,9 +73,8 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
7473
@Nullable Consumer<?, ?> consumer) {
7574

7675
boolean filtered = filter(consumerRecord);
76+
LAST.set(new FilterResult<>(consumerRecord, filtered));
7777

78-
// Atomically update both the record and its filtered state together
79-
this.lastResult.set(new FilterResult<>(consumerRecord, filtered));
8078

8179
if (!filtered) {
8280
switch (this.delegateType) {
@@ -105,8 +103,13 @@ private void ackFilteredIfNecessary(@Nullable Acknowledgment acknowledgment) {
105103

106104
@Override
107105
public boolean wasFiltered(ConsumerRecord<K, V> record) {
108-
FilterResult<K, V> result = this.lastResult.get();
109-
return result != null && result.record == record && result.wasFiltered;
106+
@SuppressWarnings("unchecked")
107+
FilterResult<K, V> result = (FilterResult<K, V>) LAST.get();
108+
return result != null
109+
&& result.record.topic().equals(record.topic())
110+
&& result.record.partition() == record.partition()
111+
&& result.record.offset() == record.offset()
112+
&& result.wasFiltered;
110113
}
111114

112115
/*

spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.kafka.core.ConsumerFactory;
3434
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
3535
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
36+
import org.springframework.kafka.support.Acknowledgment;
3637

3738
import static org.assertj.core.api.Assertions.assertThat;
3839
import static org.mockito.ArgumentMatchers.any;
@@ -326,4 +327,77 @@ void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedExce
326327
assertThat(processedValues).containsExactly("value0", "value1", "value2");
327328
verify(consumer, times(3)).commitSync(any(), any(Duration.class));
328329
}
330+
331+
@SuppressWarnings({"unchecked", "deprecation"})
332+
@Test
333+
void recordFilteredModeShouldBeThreadIsolated() throws Exception {
334+
ConsumerFactory<String, String> cf = mock(ConsumerFactory.class);
335+
Consumer<String, String> c0 = mock(Consumer.class);
336+
Consumer<String, String> c1 = mock(Consumer.class);
337+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(c0, c1);
338+
339+
ContainerProperties props = new ContainerProperties("iso-topic");
340+
props.setGroupId("iso-group");
341+
props.setAckMode(ContainerProperties.AckMode.RECORD_FILTERED);
342+
343+
CountDownLatch aHasSetState = new CountDownLatch(1);
344+
CountDownLatch bHasProcessed = new CountDownLatch(1);
345+
RecordFilterStrategy<String, String> filter = rec -> rec.offset() == 0;
346+
347+
FilteringMessageListenerAdapter<String, String> adapter =
348+
new FilteringMessageListenerAdapter<>(
349+
(MessageListener<String, String>) r -> {
350+
},
351+
filter
352+
) {
353+
@Override
354+
public void onMessage(ConsumerRecord<String, String> rec,
355+
Acknowledgment ack,
356+
Consumer<?, ?> consumer) {
357+
super.onMessage(rec, ack, consumer);
358+
if (rec.offset() == 0) {
359+
aHasSetState.countDown();
360+
try {
361+
bHasProcessed.await(500, TimeUnit.MILLISECONDS);
362+
} catch (InterruptedException e) {
363+
Thread.currentThread().interrupt();
364+
}
365+
} else if (rec.offset() == 1) {
366+
try {
367+
aHasSetState.await(200, TimeUnit.MILLISECONDS);
368+
} catch (InterruptedException e) {
369+
Thread.currentThread().interrupt();
370+
}
371+
bHasProcessed.countDown();
372+
}
373+
}
374+
};
375+
376+
ConcurrentMessageListenerContainer<String, String> container =
377+
new ConcurrentMessageListenerContainer<>(cf, props);
378+
container.setConcurrency(2);
379+
container.setupMessageListener(adapter);
380+
381+
TopicPartition tp0 = new TopicPartition("iso-topic", 0);
382+
TopicPartition tp1 = new TopicPartition("iso-topic", 1);
383+
384+
ConsumerRecords<String, String> poll0 = new ConsumerRecords<>(Map.of(
385+
tp0, List.of(new ConsumerRecord<>("iso-topic", 0, 0, "k0", "v0"))
386+
));
387+
ConsumerRecords<String, String> poll1 = new ConsumerRecords<>(Map.of(
388+
tp1, List.of(new ConsumerRecord<>("iso-topic", 1, 1, "k1", "v1"))
389+
));
390+
391+
given(c0.poll(any(Duration.class))).willReturn(poll0).willReturn(ConsumerRecords.empty());
392+
given(c1.poll(any(Duration.class))).willReturn(poll1).willReturn(ConsumerRecords.empty());
393+
394+
// when: containers process records concurrently (thread-local isolation should apply)
395+
container.start();
396+
Thread.sleep(400);
397+
container.stop();
398+
399+
// then: consumer c1 commits only its record, while c0 (filtered) does not
400+
verify(c1, times(1)).commitSync(any(), any(Duration.class));
401+
verify(c0, never()).commitSync(any(), any(Duration.class));
402+
}
329403
}

0 commit comments

Comments
 (0)