Skip to content

Commit 6856b21

Browse files
garyrussellartembilan
authored andcommitted
GH-335: Fix Class Cast with Batch Filter
Fixes #335 Failed to check `batchListener` field when casting listener to apply filter. Conflicts: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java Resolved.
1 parent 2f23900 commit 6856b21

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@
3030
import org.springframework.beans.factory.config.BeanExpressionResolver;
3131
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
3232
import org.springframework.kafka.listener.AcknowledgingMessageListener;
33+
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
34+
import org.springframework.kafka.listener.BatchMessageListener;
3335
import org.springframework.kafka.listener.MessageListener;
3436
import org.springframework.kafka.listener.MessageListenerContainer;
3537
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
38+
import org.springframework.kafka.listener.adapter.FilteringBatchAcknowledgingMessageListenerAdapter;
39+
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
3640
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
3741
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
3842
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -313,10 +317,19 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
313317
(AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
314318
this.ackDiscarded);
315319
}
316-
else {
320+
else if (messageListener instanceof MessageListener) {
317321
messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
318322
this.recordFilterStrategy);
319323
}
324+
else if (messageListener instanceof BatchAcknowledgingMessageListener) {
325+
messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>(
326+
(BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
327+
this.ackDiscarded);
328+
}
329+
else if (messageListener instanceof BatchMessageListener) {
330+
messageListener = new FilteringBatchMessageListenerAdapter<>(
331+
(BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);
332+
}
320333
}
321334
container.setupMessageListener(messageListener);
322335
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,11 @@ public class EnableKafkaIntegrationTests {
120120
public KafkaListenerEndpointRegistry registry;
121121

122122
@Autowired
123-
private RecordFilterImpl recordFilter;
123+
private RecordPassAllFilter recordFilter;
124124

125125
@Test
126126
public void testSimple() throws Exception {
127+
this.recordFilter.called = false;
127128
template.send("annotated1", 0, "foo");
128129
template.flush();
129130
assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
@@ -251,6 +252,7 @@ public void testEmpty() throws Exception {
251252
@Test
252253
@SuppressWarnings("unchecked")
253254
public void testBatch() throws Exception {
255+
this.recordFilter.called = false;
254256
ConcurrentMessageListenerContainer<?, ?> container =
255257
(ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer("list1");
256258
Consumer<?, ?> consumer =
@@ -277,6 +279,7 @@ public void testBatch() throws Exception {
277279
List<?> list = (List<?>) this.listener.payload;
278280
assertThat(list.size()).isGreaterThan(0);
279281
assertThat(list.get(0)).isInstanceOf(String.class);
282+
assertThat(this.recordFilter.called).isTrue();
280283

281284
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
282285
}
@@ -388,13 +391,13 @@ public PlatformTransactionManager transactionManager() {
388391
}
389392

390393
@Bean
391-
public RecordFilterImpl recordFilter() {
392-
return new RecordFilterImpl();
394+
public RecordPassAllFilter recordFilter() {
395+
return new RecordPassAllFilter();
393396
}
394397

395398
@Bean
396-
public RecordFilterImpl manualFilter() {
397-
return new RecordFilterImpl();
399+
public RecordPassAllFilter manualFilter() {
400+
return new RecordPassAllFilter();
398401
}
399402

400403
@Bean
@@ -412,6 +415,7 @@ public KafkaListenerContainerFactory<?> batchFactory() {
412415
new ConcurrentKafkaListenerContainerFactory<>();
413416
factory.setConsumerFactory(consumerFactory());
414417
factory.setBatchListener(true);
418+
factory.setRecordFilterStrategy(recordFilter());
415419
return factory;
416420
}
417421

@@ -821,7 +825,7 @@ public void setBar(String bar) {
821825

822826
}
823827

824-
public static class RecordFilterImpl implements RecordFilterStrategy<Integer, String> {
828+
public static class RecordPassAllFilter implements RecordFilterStrategy<Integer, String> {
825829

826830
private boolean called;
827831

0 commit comments

Comments
 (0)