Skip to content

Commit d80ade5

Browse files
Bastien Boucletgaryrussell
authored andcommitted
GH-1111 Propagate the CMLC paused state on start
The paused state from ConcurrentMessageListenerContainer was not propagated to the newly instanciated KafkaMessageListenerContainers when calling start().
1 parent 9d8adf2 commit d80ade5

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ protected void doStart() {
167167
});
168168
publishContainerStoppedEvent();
169169
});
170+
if (isPaused()) {
171+
container.pause();
172+
}
170173
container.start();
171174
this.containers.add(container);
172175
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,11 @@ public class ConcurrentMessageListenerContainerTests {
9595

9696
private static String topic11 = "testTopic11";
9797

98+
private static String topic12 = "testTopic12";
99+
98100
@ClassRule
99101
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic2, topic4, topic5,
100-
topic6, topic7, topic8, topic9, topic10, topic11);
102+
topic6, topic7, topic8, topic9, topic10, topic11, topic12);
101103

102104
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
103105

@@ -410,6 +412,44 @@ public void testManualCommitSyncExisting() throws Exception {
410412
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
411413
}
412414

415+
@Test
416+
public void testPausedStart() throws Exception {
417+
this.logger.info("Start paused start");
418+
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "false", embeddedKafka);
419+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
420+
ContainerProperties containerProps = new ContainerProperties(topic12);
421+
422+
final CountDownLatch latch = new CountDownLatch(2);
423+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
424+
ConcurrentMessageListenerContainerTests.this.logger.info("paused start: " + message);
425+
latch.countDown();
426+
});
427+
428+
ConcurrentMessageListenerContainer<Integer, String> container =
429+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
430+
container.setConcurrency(2);
431+
container.setBeanName("testBatch");
432+
container.pause();
433+
container.start();
434+
435+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
436+
437+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
438+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
439+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
440+
template.setDefaultTopic(topic12);
441+
template.sendDefault(0, "foo");
442+
template.sendDefault(2, "bar");
443+
template.flush();
444+
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isFalse();
445+
446+
container.resume();
447+
448+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
449+
container.stop();
450+
this.logger.info("Stop paused start");
451+
}
452+
413453
@Test
414454
@SuppressWarnings("unchecked")
415455
public void testConcurrencyWithPartitions() {

0 commit comments

Comments
 (0)