Skip to content

Commit d0b8ead

Browse files
committed
Add startup failure policy to listeners
Previously, when a listener container failed to start, it would only log the exception. This commit introduces `StartupFailurePolicy` that allows listener containers to CONTINUE, STOP, RETRY when an error is encountered on startup. See #445 See #816
1 parent 136d465 commit d0b8ead

File tree

12 files changed

+956
-111
lines changed

12 files changed

+956
-111
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Objects;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325
import java.util.concurrent.locks.ReentrantLock;
2426

@@ -29,6 +31,8 @@
2931
import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
3032

3133
import org.springframework.core.log.LogAccessor;
34+
import org.springframework.pulsar.PulsarException;
35+
import org.springframework.pulsar.config.StartupFailurePolicy;
3236
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
3337
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
3438
import org.springframework.util.CollectionUtils;
@@ -38,6 +42,7 @@
3842
*
3943
* @param <T> message type.
4044
* @author Christophe Bornet
45+
* @author Chris Bono
4146
*/
4247
public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
4348
implements ReactivePulsarMessageListenerContainer<T> {
@@ -135,13 +140,56 @@ public void stop() {
135140

136141
private void doStart() {
137142
setRunning(true);
138-
this.pipeline = startPipeline(this.pulsarContainerProperties);
143+
var containerProps = this.getContainerProperties();
144+
try {
145+
this.pipeline = startPipeline(this.pulsarContainerProperties);
146+
}
147+
catch (Exception e) {
148+
this.logger.error(e, () -> "Error starting Reactive pipeline");
149+
this.doStop();
150+
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.STOP) {
151+
this.logger.info(() -> "Configured to stop on startup failures - exiting");
152+
throw new IllegalStateException("Error starting Reactive pipeline", e);
153+
}
154+
}
155+
// Pipeline started w/o errors - short circuit
156+
if (this.pipeline != null && this.pipeline.isRunning()) {
157+
return;
158+
}
159+
160+
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
161+
this.logger.info(() -> "Configured to retry on startup failures - retrying");
162+
CompletableFuture.supplyAsync(() -> {
163+
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
164+
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
165+
ReactiveMessagePipeline pipeline = null;
166+
try {
167+
pipeline = retryTemplate
168+
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
169+
}
170+
catch (PulsarException ex) {
171+
this.logger.error(ex, () -> "Unable to start Reactive pipeline - retries exhausted");
172+
this.doStop();
173+
}
174+
return pipeline;
175+
}).whenComplete((p, e) -> {
176+
this.pipeline = p;
177+
setRunning(this.pipeline != null ? this.pipeline.isRunning() : false);
178+
if (e != null && !(e instanceof PulsarException)) {
179+
// PulsarException handled in supplyAsync handler above
180+
this.logger.error(e, () -> "Unable to start Reactive pipeline");
181+
this.doStop();
182+
}
183+
});
184+
}
139185
}
140186

141187
public void doStop() {
142188
try {
143189
this.logger.info("Closing Pulsar Reactive pipeline.");
144-
this.pipeline.close();
190+
if (this.pipeline != null) {
191+
this.pipeline.close();
192+
}
145193
}
146194
catch (Exception e) {
147195
this.logger.error(e, () -> "Error closing Pulsar Reactive pipeline.");
@@ -174,6 +222,9 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
174222
customizers.add(this.consumerCustomizer);
175223
}
176224

225+
// NOTE: The following various pipeline builders always set 'pipelineRetrySpec'
226+
// to null as the container controls the retry of the pipeline start. Otherwise
227+
// they do not work well together.
177228
ReactiveMessageConsumer<T> consumer = getReactivePulsarConsumerFactory()
178229
.createConsumer(containerProperties.getSchema(), customizers);
179230
ReactiveMessagePipelineBuilder<T> pipelineBuilder = ApiImplementationFactory
@@ -183,6 +234,7 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
183234
if (messageHandler instanceof ReactivePulsarStreamingHandler<?>) {
184235
pipeline = pipelineBuilder
185236
.streamingMessageHandler(((ReactivePulsarStreamingHandler<T>) messageHandler)::received)
237+
.pipelineRetrySpec(null)
186238
.build();
187239
}
188240
else {
@@ -195,10 +247,10 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
195247
if (containerProperties.isUseKeyOrderedProcessing()) {
196248
concurrentPipelineBuilder.useKeyOrderedProcessing();
197249
}
198-
pipeline = concurrentPipelineBuilder.build();
250+
pipeline = concurrentPipelineBuilder.pipelineRetrySpec(null).build();
199251
}
200252
else {
201-
pipeline = pipelineBuilder.build();
253+
pipeline = pipelineBuilder.pipelineRetrySpec(null).build();
202254
}
203255
}
204256
pipeline.start();

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,16 +18,20 @@
1818

1919
import java.time.Duration;
2020
import java.util.Collection;
21+
import java.util.Objects;
2122
import java.util.regex.Pattern;
2223

2324
import org.apache.pulsar.client.api.Schema;
2425
import org.apache.pulsar.client.api.SubscriptionType;
2526
import org.apache.pulsar.common.schema.SchemaType;
2627

28+
import org.springframework.lang.Nullable;
29+
import org.springframework.pulsar.config.StartupFailurePolicy;
2730
import org.springframework.pulsar.core.DefaultSchemaResolver;
2831
import org.springframework.pulsar.core.DefaultTopicResolver;
2932
import org.springframework.pulsar.core.SchemaResolver;
3033
import org.springframework.pulsar.core.TopicResolver;
34+
import org.springframework.retry.support.RetryTemplate;
3135

3236
/**
3337
* Contains runtime properties for a reactive listener container.
@@ -61,6 +65,16 @@ public class ReactivePulsarContainerProperties<T> {
6165

6266
private boolean useKeyOrderedProcessing = false;
6367

68+
@Nullable
69+
private RetryTemplate startupFailureRetryTemplate;
70+
71+
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
72+
.maxAttempts(3)
73+
.fixedBackoff(Duration.ofSeconds(10))
74+
.build();
75+
76+
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
77+
6478
public ReactivePulsarMessageHandler getMessageHandler() {
6579
return this.messageHandler;
6680
}
@@ -161,4 +175,46 @@ public void setUseKeyOrderedProcessing(boolean useKeyOrderedProcessing) {
161175
this.useKeyOrderedProcessing = useKeyOrderedProcessing;
162176
}
163177

178+
@Nullable
179+
public RetryTemplate getStartupFailureRetryTemplate() {
180+
return this.startupFailureRetryTemplate;
181+
}
182+
183+
/**
184+
* Get the default template to use to retry startup when no custom retry template has
185+
* been specified.
186+
* @return the default retry template that will retry 3 times with a fixed delay of 10
187+
* seconds between each attempt.
188+
* @since 1.2.0
189+
*/
190+
public RetryTemplate getDefaultStartupFailureRetryTemplate() {
191+
return this.defaultStartupFailureRetryTemplate;
192+
}
193+
194+
/**
195+
* Set the template to use to retry startup when an exception occurs during startup.
196+
* @param startupFailureRetryTemplate the retry template to use
197+
* @since 1.2.0
198+
*/
199+
public void setStartupFailureRetryTemplate(RetryTemplate startupFailureRetryTemplate) {
200+
this.startupFailureRetryTemplate = startupFailureRetryTemplate;
201+
if (this.startupFailureRetryTemplate != null) {
202+
setStartupFailurePolicy(StartupFailurePolicy.RETRY);
203+
}
204+
}
205+
206+
public StartupFailurePolicy getStartupFailurePolicy() {
207+
return this.startupFailurePolicy;
208+
}
209+
210+
/**
211+
* The action to take on the container when a failure occurs during startup.
212+
* @param startupFailurePolicy action to take when a failure occurs during startup
213+
* @since 1.2.0
214+
*/
215+
public void setStartupFailurePolicy(StartupFailurePolicy startupFailurePolicy) {
216+
this.startupFailurePolicy = Objects.requireNonNull(startupFailurePolicy,
217+
"startupFailurePolicy must not be null");
218+
}
219+
164220
}

0 commit comments

Comments
 (0)