Skip to content

Commit ebbd167

Browse files
committed
Merge pull request #29884 from garyrussell
* pr/29884: Add support for Kafka immediateStop property Closes gh-29884
2 parents c5307a8 + d56403b commit ebbd167

File tree

3 files changed

+30
-15
lines changed

3 files changed

+30
-15
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ private void configureContainer(ContainerProperties container) {
207207
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
208208
map.from(properties::isOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata);
209209
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
210+
map.from(properties::isImmediateStop).to(container::setStopImmediate);
210211
map.from(this.transactionManager).to(container::setTransactionManager);
211212
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
212213
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,12 @@ public enum Type {
946946
*/
947947
private boolean missingTopicsFatal = false;
948948

949+
/**
950+
* Whether the container stops after the current record is processed or after all
951+
* the records from the previous poll are processed.
952+
*/
953+
private boolean immediateStop = false;
954+
949955
public Type getType() {
950956
return this.type;
951957
}
@@ -1066,6 +1072,14 @@ public void setMissingTopicsFatal(boolean missingTopicsFatal) {
10661072
this.missingTopicsFatal = missingTopicsFatal;
10671073
}
10681074

1075+
public boolean isImmediateStop() {
1076+
return this.immediateStop;
1077+
}
1078+
1079+
public void setImmediateStop(boolean immediateStop) {
1080+
this.immediateStop = immediateStop;
1081+
}
1082+
10691083
}
10701084

10711085
public static class Ssl {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -382,21 +382,20 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
382382
@SuppressWarnings("unchecked")
383383
@Test
384384
void listenerProperties() {
385-
this.contextRunner
386-
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
387-
"spring.kafka.template.transaction-id-prefix=txOverride",
388-
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
389-
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
390-
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
391-
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
392-
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
393-
"spring.kafka.listener.idle-partition-event-interval=1s",
394-
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
395-
"spring.kafka.listener.only-log-record-metadata=true",
396-
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
397-
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
398-
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
399-
.run((context) -> {
385+
this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic",
386+
"spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL",
387+
"spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123",
388+
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3",
389+
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5",
390+
"spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s",
391+
"spring.kafka.listener.idle-event-interval=1s",
392+
"spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
393+
"spring.kafka.listener.log-container-config=true",
394+
"spring.kafka.listener.only-log-record-metadata=true",
395+
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
396+
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
397+
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
398+
"spring.kafka.jaas.options.useKeyTab=true").run((context) -> {
400399
DefaultKafkaProducerFactory<?, ?> producerFactory = context
401400
.getBean(DefaultKafkaProducerFactory.class);
402401
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
@@ -423,6 +422,7 @@ void listenerProperties() {
423422
assertThat(containerProperties.isLogContainerConfig()).isTrue();
424423
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();
425424
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
425+
assertThat(containerProperties.isStopImmediate()).isTrue();
426426
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
427427
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
428428
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);

0 commit comments

Comments
 (0)