Skip to content

Commit c32884b

Browse files
committed
Add Pulsar config props for startup policy
This commit adds properties to configure the startup behavior for the Pulsar message containers that back the `@PulsarListener`, `@ReactivePulsarListener`, and `@PulsarReader`.
1 parent 6cd6f75 commit c32884b

File tree

6 files changed

+128
-0
lines changed

6 files changed

+128
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,11 @@ public static class Listener {
822822
*/
823823
private boolean observationEnabled;
824824

825+
/**
826+
* Startup settings.
827+
*/
828+
private final Startup startup = new Startup();
829+
825830
public SchemaType getSchemaType() {
826831
return this.schemaType;
827832
}
@@ -846,6 +851,9 @@ public void setObservationEnabled(boolean observationEnabled) {
846851
this.observationEnabled = observationEnabled;
847852
}
848853

854+
public Startup getStartup() {
855+
return this.startup;
856+
}
849857
}
850858

851859
public static class Reader {
@@ -876,6 +884,11 @@ public static class Reader {
876884
*/
877885
private boolean readCompacted;
878886

887+
/**
888+
* Startup settings.
889+
*/
890+
private final Startup startup = new Startup();
891+
879892
public String getName() {
880893
return this.name;
881894
}
@@ -916,6 +929,50 @@ public void setReadCompacted(boolean readCompacted) {
916929
this.readCompacted = readCompacted;
917930
}
918931

932+
public Startup getStartup() {
933+
return this.startup;
934+
}
935+
}
936+
937+
public static class Startup {
938+
939+
/**
940+
* The max time to wait for the container to start.
941+
*/
942+
private Duration timeout;
943+
944+
/**
945+
* The action to take when the container fails to start.
946+
*/
947+
private FailurePolicy onFailure;
948+
949+
public Duration getTimeout() {
950+
return this.timeout;
951+
}
952+
953+
public void setTimeout(Duration timeout) {
954+
this.timeout = timeout;
955+
}
956+
957+
public FailurePolicy getOnFailure() {
958+
return this.onFailure;
959+
}
960+
961+
public void setOnFailure(FailurePolicy onFailure) {
962+
this.onFailure = onFailure;
963+
}
964+
}
965+
966+
public enum FailurePolicy {
967+
968+
/** Stop the container and throw exception. */
969+
STOP,
970+
971+
/** Stop the container but do not throw exception. */
972+
CONTINUE,
973+
974+
/** Retry startup asynchronously. */
975+
RETRY;
919976
}
920977

921978
public static class Template {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
import org.apache.pulsar.client.api.ServiceUrlProvider;
3838
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;
3939

40+
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
4041
import org.springframework.boot.context.properties.PropertyMapper;
4142
import org.springframework.boot.json.JsonWriter;
43+
import org.springframework.pulsar.config.StartupFailurePolicy;
4244
import org.springframework.pulsar.core.PulsarTemplate;
4345
import org.springframework.pulsar.listener.PulsarContainerProperties;
4446
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@@ -198,6 +200,17 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
198200
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
199201
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
200202
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
203+
customizeListenerStartupProperties(containerProperties);
204+
}
205+
206+
private void customizeListenerStartupProperties(PulsarContainerProperties containerProperties) {
207+
PulsarProperties.Startup properties = this.properties.getListener().getStartup();
208+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
209+
map.from(properties::getTimeout).to(containerProperties::setConsumerStartTimeout);
210+
map.from(properties::getOnFailure)
211+
.as(FailurePolicy::name)
212+
.as(StartupFailurePolicy::valueOf)
213+
.to(containerProperties::setStartupFailurePolicy);
201214
}
202215

203216
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
@@ -214,6 +227,17 @@ void customizeReaderContainerProperties(PulsarReaderContainerProperties readerCo
214227
PulsarProperties.Reader properties = this.properties.getReader();
215228
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
216229
map.from(properties::getTopics).to(readerContainerProperties::setTopics);
230+
customizeReaderStartupProperties(readerContainerProperties);
231+
}
232+
233+
private void customizeReaderStartupProperties(PulsarReaderContainerProperties readerContainerProperties) {
234+
PulsarProperties.Startup properties = this.properties.getReader().getStartup();
235+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
236+
map.from(properties::getTimeout).to(readerContainerProperties::setReaderStartTimeout);
237+
map.from(properties::getOnFailure)
238+
.as(FailurePolicy::name)
239+
.as(StartupFailurePolicy::valueOf)
240+
.to(readerContainerProperties::setStartupFailurePolicy);
217241
}
218242

219243
private Consumer<Duration> timeoutProperty(BiConsumer<Integer, TimeUnit> setter) {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder;
2323
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
2424

25+
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
2526
import org.springframework.boot.context.properties.PropertyMapper;
27+
import org.springframework.pulsar.config.StartupFailurePolicy;
28+
import org.springframework.pulsar.listener.PulsarContainerProperties;
2629
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;
2730

2831
/**
@@ -96,6 +99,16 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP
9699
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
97100
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
98101
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
102+
customizeListenerStartupProperties(containerProperties);
103+
}
104+
105+
private void customizeListenerStartupProperties(ReactivePulsarContainerProperties<?> containerProperties) {
106+
PulsarProperties.Startup properties = this.properties.getListener().getStartup();
107+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
108+
map.from(properties::getOnFailure)
109+
.as(FailurePolicy::name)
110+
.as(StartupFailurePolicy::valueOf)
111+
.to(containerProperties::setStartupFailurePolicy);
99112
}
100113

101114
void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder<?> builder) {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@
4141

4242
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4343
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44+
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
45+
import org.springframework.pulsar.config.StartupFailurePolicy;
4446
import org.springframework.pulsar.core.PulsarProducerFactory;
4547
import org.springframework.pulsar.core.PulsarTemplate;
4648
import org.springframework.pulsar.listener.PulsarContainerProperties;
49+
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
4750

4851
import static org.assertj.core.api.Assertions.assertThat;
4952
import static org.mockito.ArgumentMatchers.any;
@@ -265,6 +268,8 @@ void customizeContainerProperties() {
265268
properties.getListener().setSchemaType(SchemaType.AVRO);
266269
properties.getListener().setConcurrency(10);
267270
properties.getListener().setObservationEnabled(true);
271+
properties.getListener().getStartup().setOnFailure(FailurePolicy.RETRY);
272+
properties.getListener().getStartup().setTimeout(Duration.ofSeconds(25));
268273
properties.getTransaction().setEnabled(true);
269274
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
270275
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
@@ -274,6 +279,8 @@ void customizeContainerProperties() {
274279
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
275280
assertThat(containerProperties.isObservationEnabled()).isTrue();
276281
assertThat(containerProperties.transactions().isEnabled()).isTrue();
282+
assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.RETRY);
283+
assertThat(containerProperties.getConsumerStartTimeout()).isEqualTo(Duration.ofSeconds(25));
277284
}
278285

279286
@Test
@@ -295,4 +302,18 @@ void customizeReaderBuilder() {
295302
then(builder).should().readCompacted(true);
296303
}
297304

305+
@Test
306+
@SuppressWarnings("unchecked")
307+
void customizeReaderContainerProperties() {
308+
PulsarProperties properties = new PulsarProperties();
309+
List<String> topics = List.of("mytopic");
310+
properties.getReader().setTopics(topics);
311+
properties.getReader().getStartup().setOnFailure(FailurePolicy.CONTINUE);
312+
properties.getReader().getStartup().setTimeout(Duration.ofSeconds(25));
313+
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
314+
new PulsarPropertiesMapper(properties).customizeReaderContainerProperties(readerContainerProperties);
315+
assertThat(readerContainerProperties.getTopics()).isEqualTo(topics);
316+
assertThat(readerContainerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE);
317+
assertThat(readerContainerProperties.getReaderStartTimeout()).isEqualTo(Duration.ofSeconds(25));
318+
}
298319
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Defaults.TypeMapping;
3939
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover;
4040
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
41+
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
4142
import org.springframework.boot.context.properties.bind.BindException;
4243
import org.springframework.boot.context.properties.bind.Binder;
4344
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
@@ -395,10 +396,14 @@ void bind() {
395396
map.put("spring.pulsar.listener.schema-type", "avro");
396397
map.put("spring.pulsar.listener.concurrency", "10");
397398
map.put("spring.pulsar.listener.observation-enabled", "true");
399+
map.put("spring.pulsar.listener.startup.on-failure", "retry");
400+
map.put("spring.pulsar.listener.startup.timeout", "2m");
398401
PulsarProperties.Listener properties = bindProperties(map).getListener();
399402
assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO);
400403
assertThat(properties.getConcurrency()).isEqualTo(10);
401404
assertThat(properties.isObservationEnabled()).isTrue();
405+
assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.RETRY);
406+
assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofSeconds(13));
402407
}
403408

404409
}
@@ -414,12 +419,16 @@ void bind() {
414419
map.put("spring.pulsar.reader.subscription-name", "my-subscription");
415420
map.put("spring.pulsar.reader.subscription-role-prefix", "sub-role");
416421
map.put("spring.pulsar.reader.read-compacted", "true");
422+
map.put("spring.pulsar.reader.startup.on-failure", "continue");
423+
map.put("spring.pulsar.reader.startup.timeout", "2m");
417424
PulsarProperties.Reader properties = bindProperties(map).getReader();
418425
assertThat(properties.getName()).isEqualTo("my-reader");
419426
assertThat(properties.getTopics()).containsExactly("my-topic");
420427
assertThat(properties.getSubscriptionName()).isEqualTo("my-subscription");
421428
assertThat(properties.getSubscriptionRolePrefix()).isEqualTo("sub-role");
422429
assertThat(properties.isReadCompacted()).isTrue();
430+
assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.CONTINUE);
431+
assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofMinutes(2));
423432
}
424433

425434
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
3939
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer.Subscription;
40+
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
41+
import org.springframework.pulsar.config.StartupFailurePolicy;
4042
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;
4143

4244
import static org.assertj.core.api.Assertions.assertThat;
@@ -123,12 +125,14 @@ void customizeContainerProperties() {
123125
properties.getConsumer().getSubscription().setName("my-subscription");
124126
properties.getListener().setSchemaType(SchemaType.AVRO);
125127
properties.getListener().setConcurrency(10);
128+
properties.getListener().getStartup().setOnFailure(FailurePolicy.CONTINUE);
126129
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
127130
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
128131
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
129132
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
130133
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
131134
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
135+
assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE);
132136
}
133137

134138
@Test

0 commit comments

Comments
 (0)