Skip to content

Commit 8280936

Browse files
garyrussellartembilan
authored andcommitted
GH-323: Fix ConsumerSeekAware and BatchListener
Fixes #323 The check for `ConsumerSeekAware` was only performed on the record listener. Conflicts: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent a4f4480 commit 8280936

File tree

2 files changed

+119
-66
lines changed

2 files changed

+119
-66
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
258258

259259
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
260260

261+
private final GenericMessageListener<?> genericListener;
262+
261263
private final MessageListener<K, V> listener;
262264

263265
private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
@@ -349,7 +351,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
349351
}
350352
this.consumer = consumer;
351353
GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
352-
if (this.theListener instanceof BatchAcknowledgingMessageListener) {
354+
this.genericListener = listener; if (this.theListener instanceof BatchAcknowledgingMessageListener) {
353355
this.listener = null;
354356
this.batchListener = null;
355357
this.acknowledgingMessageListener = null;
@@ -455,7 +457,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
455457
KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
456458
}
457459
}
458-
if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
460+
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
459461
seekPartitions(partitions, false);
460462
}
461463
// We will not start the invoker thread if we are in autocommit mode,
@@ -487,10 +489,10 @@ public void seek(String topic, int partition, long offset) {
487489

488490
};
489491
if (idle) {
490-
((ConsumerSeekAware) ListenerConsumer.this.theListener).onIdleContainer(current, callback);
492+
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onIdleContainer(current, callback);
491493
}
492494
else {
493-
((ConsumerSeekAware) ListenerConsumer.this.theListener).onPartitionsAssigned(current, callback);
495+
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsAssigned(current, callback);
494496
}
495497
}
496498

@@ -528,8 +530,8 @@ public boolean isLongLived() {
528530

529531
@Override
530532
public void run() {
531-
if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
532-
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
533+
if (this.genericListener instanceof ConsumerSeekAware) {
534+
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
533535
}
534536
this.count = 0;
535537
this.last = System.currentTimeMillis();
@@ -586,7 +588,7 @@ public void run() {
586588
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
587589
publishIdleContainerEvent(now - lastReceive);
588590
lastAlertAt = now;
589-
if (this.theListener instanceof ConsumerSeekAware) {
591+
if (this.genericListener instanceof ConsumerSeekAware) {
590592
seekPartitions(getAssignedPartitions(), true);
591593
}
592594
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 110 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,13 @@ public class KafkaMessageListenerContainerTests {
115115

116116
private static String topic14 = "testTopic14";
117117

118+
private static String topic15 = "testTopic15";
119+
120+
private static String topic16 = "testTopic16";
121+
118122
@ClassRule
119123
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
120-
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14);
124+
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16);
121125

122126
@Rule
123127
public TestName testName = new TestName();
@@ -378,26 +382,26 @@ public void testSlowConsumerWithException() throws Exception {
378382
final BitSet bitSet = new BitSet(6);
379383
final Map<String, AtomicInteger> faults = new HashMap<>();
380384
RetryingMessageListenerAdapter<Integer, String> adapter = new RetryingMessageListenerAdapter<>(
381-
new MessageListener<Integer, String>() {
382-
383-
@Override
384-
public void onMessage(ConsumerRecord<Integer, String> message) {
385-
logger.info("slow3: " + message);
386-
bitSet.set((int) (message.partition() * 3 + message.offset()));
387-
String key = message.topic() + message.partition() + message.offset();
388-
if (faults.get(key) == null) {
389-
faults.put(key, new AtomicInteger(1));
390-
}
391-
else {
392-
faults.get(key).incrementAndGet();
393-
}
394-
latch.countDown(); // 3 per = 18
395-
if (faults.get(key).get() < 3) { // succeed on the third attempt
396-
throw new FooEx();
385+
new MessageListener<Integer, String>() {
386+
387+
@Override
388+
public void onMessage(ConsumerRecord<Integer, String> message) {
389+
logger.info("slow3: " + message);
390+
bitSet.set((int) (message.partition() * 3 + message.offset()));
391+
String key = message.topic() + message.partition() + message.offset();
392+
if (faults.get(key) == null) {
393+
faults.put(key, new AtomicInteger(1));
394+
}
395+
else {
396+
faults.get(key).incrementAndGet();
397+
}
398+
latch.countDown(); // 3 per = 18
399+
if (faults.get(key).get() < 3) { // succeed on the third attempt
400+
throw new FooEx();
401+
}
397402
}
398-
}
399403

400-
}, buildRetry(), null);
404+
}, buildRetry(), null);
401405
containerProps.setMessageListener(adapter);
402406
containerProps.setPauseAfter(100);
403407

@@ -440,34 +444,34 @@ public void testSlowConsumerWithSlowThenExceptionThenGood() throws Exception {
440444
final BitSet bitSet = new BitSet(6);
441445
final Map<String, AtomicInteger> faults = new HashMap<>();
442446
RetryingMessageListenerAdapter<Integer, String> adapter = new RetryingMessageListenerAdapter<>(
443-
new MessageListener<Integer, String>() {
444-
445-
@Override
446-
public void onMessage(ConsumerRecord<Integer, String> message) {
447-
logger.info("slow4: " + message);
448-
bitSet.set((int) (message.partition() * 4 + message.offset()));
449-
String key = message.topic() + message.partition() + message.offset();
450-
if (faults.get(key) == null) {
451-
faults.put(key, new AtomicInteger(1));
452-
}
453-
else {
454-
faults.get(key).incrementAndGet();
455-
}
456-
latch.countDown(); // 3 per = 18
457-
if (faults.get(key).get() == 1) {
458-
try {
459-
Thread.sleep(1000);
447+
new MessageListener<Integer, String>() {
448+
449+
@Override
450+
public void onMessage(ConsumerRecord<Integer, String> message) {
451+
logger.info("slow4: " + message);
452+
bitSet.set((int) (message.partition() * 4 + message.offset()));
453+
String key = message.topic() + message.partition() + message.offset();
454+
if (faults.get(key) == null) {
455+
faults.put(key, new AtomicInteger(1));
460456
}
461-
catch (InterruptedException e) {
462-
Thread.currentThread().interrupt();
457+
else {
458+
faults.get(key).incrementAndGet();
459+
}
460+
latch.countDown(); // 3 per = 18
461+
if (faults.get(key).get() == 1) {
462+
try {
463+
Thread.sleep(1000);
464+
}
465+
catch (InterruptedException e) {
466+
Thread.currentThread().interrupt();
467+
}
468+
}
469+
if (faults.get(key).get() < 3) { // succeed on the third attempt
470+
throw new FooEx();
463471
}
464472
}
465-
if (faults.get(key).get() < 3) { // succeed on the third attempt
466-
throw new FooEx();
467-
}
468-
}
469473

470-
}, buildRetry(), null);
474+
}, buildRetry(), null);
471475
containerProps.setMessageListener(adapter);
472476
containerProps.setPauseAfter(100);
473477

@@ -856,15 +860,62 @@ public void testSeekAutoCommit() throws Exception {
856860

857861
@Test
858862
public void testSeekAutoCommitDefault() throws Exception {
859-
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka);
863+
Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
860864
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
861-
testSeekGuts(props, topic12, true);
865+
testSeekGuts(props, topic15, true);
866+
}
867+
868+
@Test
869+
public void testSeekBatch() throws Exception {
870+
logger.info("Start seek batch seek");
871+
Map<String, Object> props = KafkaTestUtils.consumerProps("test16", "true", embeddedKafka);
872+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
873+
ContainerProperties containerProps = new ContainerProperties(topic16);
874+
final CountDownLatch registerLatch = new CountDownLatch(1);
875+
final CountDownLatch assignedLatch = new CountDownLatch(1);
876+
final CountDownLatch idleLatch = new CountDownLatch(1);
877+
class Listener implements BatchMessageListener<Integer, String>, ConsumerSeekAware {
878+
879+
@Override
880+
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
881+
// empty
882+
}
883+
884+
@Override
885+
public void registerSeekCallback(ConsumerSeekCallback callback) {
886+
registerLatch.countDown();
887+
}
888+
889+
@Override
890+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
891+
assignedLatch.countDown();
892+
}
893+
894+
@Override
895+
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
896+
idleLatch.countDown();
897+
}
898+
899+
}
900+
Listener messageListener = new Listener();
901+
containerProps.setMessageListener(messageListener);
902+
containerProps.setSyncCommits(true);
903+
containerProps.setAckOnError(false);
904+
containerProps.setIdleEventInterval(10L);
905+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
906+
containerProps);
907+
container.setBeanName("testBatchSeek");
908+
container.start();
909+
assertThat(registerLatch.await(10, TimeUnit.SECONDS)).isTrue();
910+
assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
911+
assertThat(idleLatch.await(10, TimeUnit.SECONDS)).isTrue();
912+
container.stop();
862913
}
863914

864915
private void testSeekGuts(Map<String, Object> props, String topic, boolean autoCommit) throws Exception {
865916
logger.info("Start seek " + topic);
866917
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
867-
ContainerProperties containerProps = new ContainerProperties(topic11);
918+
ContainerProperties containerProps = new ContainerProperties(topic);
868919
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(6));
869920
final AtomicBoolean seekInitial = new AtomicBoolean();
870921
final CountDownLatch idleLatch = new CountDownLatch(1);
@@ -881,8 +932,8 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
881932
messageThread = Thread.currentThread();
882933
latch.get().countDown();
883934
if (latch.get().getCount() == 2 && !seekInitial.get()) {
884-
callback.seek(topic11, 0, 1);
885-
callback.seek(topic11, 1, 1);
935+
callback.seek(topic, 0, 1);
936+
callback.seek(topic, 1, 1);
886937
}
887938
}
888939

@@ -922,16 +973,16 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
922973

923974
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
924975
containerProps);
925-
container.setBeanName("testRecordAcks");
976+
container.setBeanName("testSeek" + topic);
926977
container.start();
927978
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.autoCommit", Boolean.class))
928-
.isEqualTo(autoCommit);
979+
.isEqualTo(autoCommit);
929980
Consumer<?, ?> consumer = spyOnConsumer(container);
930981
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
931982
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
932983
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
933984
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
934-
template.setDefaultTopic(topic11);
985+
template.setDefaultTopic(topic);
935986
template.sendDefault(0, 0, "foo");
936987
template.sendDefault(1, 0, "bar");
937988
template.sendDefault(0, 0, "baz");
@@ -1266,14 +1317,14 @@ private void stubSetRunning(final CountDownLatch listenerConsumerAvailableLatch,
12661317
KafkaMessageListenerContainer<Integer, String> resettingContainer) {
12671318
willAnswer(invocation -> {
12681319
listenerConsumerAvailableLatch.countDown();
1269-
try {
1270-
assertThat(listenerConsumerStartLatch.await(10, TimeUnit.SECONDS)).isTrue();
1271-
}
1272-
catch (InterruptedException e) {
1273-
Thread.currentThread().interrupt();
1274-
throw new IllegalStateException(e);
1275-
}
1276-
return invocation.callRealMethod();
1320+
try {
1321+
assertThat(listenerConsumerStartLatch.await(10, TimeUnit.SECONDS)).isTrue();
1322+
}
1323+
catch (InterruptedException e) {
1324+
Thread.currentThread().interrupt();
1325+
throw new IllegalStateException(e);
1326+
}
1327+
return invocation.callRealMethod();
12771328
}).given(resettingContainer).setRunning(true);
12781329
}
12791330

0 commit comments

Comments
 (0)