Skip to content

Commit b02ecd6

Browse files
tbiniasartembilan
authored andcommitted
GH-403 fix ConsumerSeekAware with acknowledgingML
Resolves #403 remove test duplicates and use dedicated topic for testSeekAck * Remove in the `ListenerConsumer` redundant `kafkaDataListener` property in favor of `theListener` * Some simple polishing
1 parent e9aa11a commit b02ecd6

File tree

2 files changed

+37
-20
lines changed

2 files changed

+37
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
* @author Martin Dam
7676
* @author Artem Bilan
7777
* @author Loic Talhouarne
78+
* @author Thorsten Binias
7879
*/
7980
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
8081

@@ -258,8 +259,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
258259

259260
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
260261

261-
private final GenericMessageListener<?> genericListener;
262-
263262
private final MessageListener<K, V> listener;
264263

265264
private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
@@ -319,13 +318,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
319318
ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
320319
Assert.state(!this.isAnyManualAck || !this.autoCommit,
321320
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
322-
@SuppressWarnings("deprecation")
323-
final Consumer<K, V> consumer =
321+
@SuppressWarnings("deprecation") final Consumer<K, V> consumer =
324322
KafkaMessageListenerContainer.this.consumerFactory instanceof
325-
org.springframework.kafka.core.ClientIdSuffixAware
323+
org.springframework.kafka.core.ClientIdSuffixAware
326324
? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
327-
.this.consumerFactory)
328-
.createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
325+
.this.consumerFactory)
326+
.createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
329327
: KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
330328

331329
this.theListener = listener == null ? ackListener : listener;
@@ -351,7 +349,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
351349
}
352350
this.consumer = consumer;
353351
GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
354-
this.genericListener = listener; if (this.theListener instanceof BatchAcknowledgingMessageListener) {
352+
if (this.theListener instanceof BatchAcknowledgingMessageListener) {
355353
this.listener = null;
356354
this.batchListener = null;
357355
this.acknowledgingMessageListener = null;
@@ -457,7 +455,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
457455
KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
458456
}
459457
}
460-
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
458+
if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
461459
seekPartitions(partitions, false);
462460
}
463461
// We will not start the invoker thread if we are in autocommit mode,
@@ -489,10 +487,10 @@ public void seek(String topic, int partition, long offset) {
489487

490488
};
491489
if (idle) {
492-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onIdleContainer(current, callback);
490+
((ConsumerSeekAware) ListenerConsumer.this.theListener).onIdleContainer(current, callback);
493491
}
494492
else {
495-
((ConsumerSeekAware) ListenerConsumer.this.genericListener).onPartitionsAssigned(current, callback);
493+
((ConsumerSeekAware) ListenerConsumer.this.theListener).onPartitionsAssigned(current, callback);
496494
}
497495
}
498496

@@ -530,8 +528,8 @@ public boolean isLongLived() {
530528

531529
@Override
532530
public void run() {
533-
if (this.genericListener instanceof ConsumerSeekAware) {
534-
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
531+
if (this.theListener instanceof ConsumerSeekAware) {
532+
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
535533
}
536534
this.count = 0;
537535
this.last = System.currentTimeMillis();
@@ -588,7 +586,7 @@ public void run() {
588586
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
589587
publishIdleContainerEvent(now - lastReceive);
590588
lastAlertAt = now;
591-
if (this.genericListener instanceof ConsumerSeekAware) {
589+
if (this.theListener instanceof ConsumerSeekAware) {
592590
seekPartitions(getAssignedPartitions(), true);
593591
}
594592
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
* @author Martin Dam
8383
* @author Artem Bilan
8484
* @author Loic Talhouarne
85+
* @author Thorsten Binias
8586
*/
8687
public class KafkaMessageListenerContainerTests {
8788

@@ -119,9 +120,11 @@ public class KafkaMessageListenerContainerTests {
119120

120121
private static String topic16 = "testTopic16";
121122

123+
private static String topic17 = "testTopic17";
124+
122125
@ClassRule
123126
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
124-
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16);
127+
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17);
125128

126129
@Rule
127130
public TestName testName = new TestName();
@@ -849,20 +852,20 @@ public void testBatchListenerErrors() throws Exception {
849852
@Test
850853
public void testSeek() throws Exception {
851854
Map<String, Object> props = KafkaTestUtils.consumerProps("test11", "false", embeddedKafka);
852-
testSeekGuts(props, topic11, false);
855+
testSeekGuts(props, topic11, false, false);
853856
}
854857

855858
@Test
856859
public void testSeekAutoCommit() throws Exception {
857860
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka);
858-
testSeekGuts(props, topic12, true);
861+
testSeekGuts(props, topic12, true, false);
859862
}
860863

861864
@Test
862865
public void testSeekAutoCommitDefault() throws Exception {
863866
Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
864867
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
865-
testSeekGuts(props, topic15, true);
868+
testSeekGuts(props, topic15, true, false);
866869
}
867870

868871
@Test
@@ -912,7 +915,13 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
912915
container.stop();
913916
}
914917

915-
private void testSeekGuts(Map<String, Object> props, String topic, boolean autoCommit) throws Exception {
918+
@Test
919+
public void testSeekAck() throws Exception {
920+
Map<String, Object> props = KafkaTestUtils.consumerProps("test17", "false", embeddedKafka);
921+
testSeekGuts(props, topic17, false, true);
922+
}
923+
924+
private void testSeekGuts(Map<String, Object> props, String topic, boolean autoCommit, boolean acknowledging) throws Exception {
916925
logger.info("Start seek " + topic);
917926
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
918927
ContainerProperties containerProps = new ContainerProperties(topic);
@@ -964,7 +973,17 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
964973
}
965974

966975
}
967-
Listener messageListener = new Listener();
976+
977+
class AcknowledgingListener extends Listener implements AcknowledgingMessageListener<Integer, String> {
978+
979+
@Override
980+
public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment acknowledgment) {
981+
super.onMessage(data);
982+
}
983+
984+
}
985+
986+
Listener messageListener = acknowledging ? new AcknowledgingListener() : new Listener();
968987
containerProps.setMessageListener(messageListener);
969988
containerProps.setSyncCommits(true);
970989
containerProps.setAckMode(AckMode.RECORD);

0 commit comments

Comments
 (0)