Skip to content

Commit a518e11

Browse files
authored
Add startup failure policy to listeners (#824)
Previously, when a listener container failed to start, it would only log the exception. This commit introduces `StartupFailurePolicy` that allows listener containers to CONTINUE, STOP, RETRY when an error is encountered on startup. See #445 See #816
1 parent c254d6b commit a518e11

17 files changed

+1242
-106
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
= Handling Startup Failures
2+
include::../attributes/attributes-variables.adoc[]
3+
4+
Message listener containers are started when the application context is refreshed.
5+
By default, any failures encountered during startup are re-thrown and the application will fail to start.
6+
You can adjust this behavior with the `StartupFailurePolicy` on the corresponding container properties.
7+
8+
The available options are:
9+
10+
- `Stop` (default) - log and re-throw the exception, effectively stopping the application
11+
- `Continue` - log the exception, leave the container in a non-running state, but do not stop the application
12+
- `Retry` - log the exception, retry to start the container asynchronously, but do not stop the application.
13+
14+
The default retry behavior is to retry 3 times with a 10-second delay between
15+
each attempt.
16+
However, a custom retry template can be specified on the corresponding container properties.
17+
If the container fails to restart after the retries are exhausted, it is left in a non-running state.
18+
19+
[discrete]
20+
== Configuration
21+
22+
[discrete]
23+
=== With Spring Boot
24+
**TODO**
25+
26+
[discrete]
27+
=== Without Spring Boot
28+
**TODO**

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,8 +951,11 @@ The framework detects the provided bean through the `PulsarListener` and applies
951951

952952
If you have multiple `PulsarListener` methods, and each of them have different customization rules, you should create multiple customizer beans and attach the proper customizers on each `PulsarListener`.
953953

954+
[[message-listener-lifecycle]]
955+
== Message Listener Container Lifecycle
954956

955-
== Pausing and Resuming Message Listener Containers
957+
[[message-listener-pause-resume]]
958+
=== Pausing and Resuming
956959

957960
There are situations in which an application might want to pause message consumption temporarily and then resume later.
958961
Spring for Apache Pulsar provides the ability to pause and resume the underlying message listener containers.
@@ -973,6 +976,10 @@ void someMethod() {
973976

974977
TIP: The id parameter passed to `getListenerContainer` is the container id - which will be the value of the `@PulsarListener` id attribute when pausing/resuming a `@PulsarListener`.
975978

979+
[[message-listener-startup-failure]]
980+
include::../message-listener-startup-failure.adoc[leveloffset=+2]
981+
982+
976983
[[imperative-pulsar-reader]]
977984
== Pulsar Reader Support
978985
The framework provides support for using {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader] via the `PulsarReaderFactory`.
@@ -1023,3 +1030,6 @@ public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
10231030
----
10241031

10251032
TIP: If your application only has a single `@PulsarReader` and a single `PulsarReaderReaderBuilderCustomizer` bean registered then the customizer will be automatically applied.
1033+
1034+
=== Handling Startup Failures
1035+
The same xref:#message-listener-startup-failure[startup failure facilities] available to message listener containers are available for reader containers.

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ The "listener" aspect is provided by the `ReactivePulsarMessageHandler` of which
206206

207207
NOTE: If topic information is not specified when using the listener containers directly, the same xref:reference/topic-resolution.adoc#topic-resolution-process[topic resolution process] used by the `ReactivePulsarListener` is used with the one exception that the "Message type default" step is **omitted**.
208208

209+
[[message-listener-startup-failure]]
210+
include::../message-listener-startup-failure.adoc[leveloffset=+2]
211+
209212
[[reactive-concurrency]]
210213
== Concurrency
211214
When consuming records in streaming mode (`stream = true`) concurrency comes naturally via the underlying Reactive support in the client implementation.

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Objects;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325
import java.util.concurrent.locks.ReentrantLock;
2426

@@ -29,6 +31,8 @@
2931
import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
3032

3133
import org.springframework.core.log.LogAccessor;
34+
import org.springframework.pulsar.PulsarException;
35+
import org.springframework.pulsar.config.StartupFailurePolicy;
3236
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
3337
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
3438
import org.springframework.util.CollectionUtils;
@@ -38,6 +42,7 @@
3842
*
3943
* @param <T> message type.
4044
* @author Christophe Bornet
45+
* @author Chris Bono
4146
*/
4247
public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
4348
implements ReactivePulsarMessageListenerContainer<T> {
@@ -135,13 +140,50 @@ public void stop() {
135140

136141
private void doStart() {
137142
setRunning(true);
138-
this.pipeline = startPipeline(this.pulsarContainerProperties);
143+
var containerProps = this.getContainerProperties();
144+
try {
145+
this.pipeline = startPipeline(this.pulsarContainerProperties);
146+
}
147+
catch (Exception e) {
148+
this.logger.error(e, () -> "Error starting Reactive pipeline");
149+
this.doStop();
150+
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.STOP) {
151+
this.logger.info(() -> "Configured to stop on startup failures - exiting");
152+
throw new IllegalStateException("Error starting Reactive pipeline", e);
153+
}
154+
}
155+
// Pipeline started w/o errors - short circuit
156+
if (this.pipeline != null && this.pipeline.isRunning()) {
157+
return;
158+
}
159+
160+
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
161+
this.logger.info(() -> "Configured to retry on startup failures - retrying");
162+
CompletableFuture.supplyAsync(() -> {
163+
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
164+
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
165+
return retryTemplate
166+
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
167+
}).whenComplete((p, ex) -> {
168+
if (ex == null) {
169+
this.pipeline = p;
170+
setRunning(this.pipeline != null ? this.pipeline.isRunning() : false);
171+
}
172+
else {
173+
this.logger.error(ex, () -> "Unable to start Reactive pipeline");
174+
this.doStop();
175+
}
176+
});
177+
}
139178
}
140179

141180
public void doStop() {
142181
try {
143182
this.logger.info("Closing Pulsar Reactive pipeline.");
144-
this.pipeline.close();
183+
if (this.pipeline != null) {
184+
this.pipeline.close();
185+
this.pipeline = null;
186+
}
145187
}
146188
catch (Exception e) {
147189
this.logger.error(e, () -> "Error closing Pulsar Reactive pipeline.");
@@ -174,6 +216,9 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
174216
customizers.add(this.consumerCustomizer);
175217
}
176218

219+
// NOTE: The following various pipeline builders always set 'pipelineRetrySpec'
220+
// to null as the container controls the retry of the pipeline start. Otherwise
221+
// they do not work well together.
177222
ReactiveMessageConsumer<T> consumer = getReactivePulsarConsumerFactory()
178223
.createConsumer(containerProperties.getSchema(), customizers);
179224
ReactiveMessagePipelineBuilder<T> pipelineBuilder = ApiImplementationFactory
@@ -183,6 +228,7 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
183228
if (messageHandler instanceof ReactivePulsarStreamingHandler<?>) {
184229
pipeline = pipelineBuilder
185230
.streamingMessageHandler(((ReactivePulsarStreamingHandler<T>) messageHandler)::received)
231+
.pipelineRetrySpec(null)
186232
.build();
187233
}
188234
else {
@@ -195,10 +241,10 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
195241
if (containerProperties.isUseKeyOrderedProcessing()) {
196242
concurrentPipelineBuilder.useKeyOrderedProcessing();
197243
}
198-
pipeline = concurrentPipelineBuilder.build();
244+
pipeline = concurrentPipelineBuilder.pipelineRetrySpec(null).build();
199245
}
200246
else {
201-
pipeline = pipelineBuilder.build();
247+
pipeline = pipelineBuilder.pipelineRetrySpec(null).build();
202248
}
203249
}
204250
pipeline.start();

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,16 +18,20 @@
1818

1919
import java.time.Duration;
2020
import java.util.Collection;
21+
import java.util.Objects;
2122
import java.util.regex.Pattern;
2223

2324
import org.apache.pulsar.client.api.Schema;
2425
import org.apache.pulsar.client.api.SubscriptionType;
2526
import org.apache.pulsar.common.schema.SchemaType;
2627

28+
import org.springframework.lang.Nullable;
29+
import org.springframework.pulsar.config.StartupFailurePolicy;
2730
import org.springframework.pulsar.core.DefaultSchemaResolver;
2831
import org.springframework.pulsar.core.DefaultTopicResolver;
2932
import org.springframework.pulsar.core.SchemaResolver;
3033
import org.springframework.pulsar.core.TopicResolver;
34+
import org.springframework.retry.support.RetryTemplate;
3135

3236
/**
3337
* Contains runtime properties for a reactive listener container.
@@ -61,6 +65,16 @@ public class ReactivePulsarContainerProperties<T> {
6165

6266
private boolean useKeyOrderedProcessing = false;
6367

68+
@Nullable
69+
private RetryTemplate startupFailureRetryTemplate;
70+
71+
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
72+
.maxAttempts(3)
73+
.fixedBackoff(Duration.ofSeconds(10))
74+
.build();
75+
76+
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
77+
6478
public ReactivePulsarMessageHandler getMessageHandler() {
6579
return this.messageHandler;
6680
}
@@ -161,4 +175,46 @@ public void setUseKeyOrderedProcessing(boolean useKeyOrderedProcessing) {
161175
this.useKeyOrderedProcessing = useKeyOrderedProcessing;
162176
}
163177

178+
@Nullable
179+
public RetryTemplate getStartupFailureRetryTemplate() {
180+
return this.startupFailureRetryTemplate;
181+
}
182+
183+
/**
184+
* Get the default template to use to retry startup when no custom retry template has
185+
* been specified.
186+
* @return the default retry template that will retry 3 times with a fixed delay of 10
187+
* seconds between each attempt.
188+
* @since 1.2.0
189+
*/
190+
public RetryTemplate getDefaultStartupFailureRetryTemplate() {
191+
return this.defaultStartupFailureRetryTemplate;
192+
}
193+
194+
/**
195+
* Set the template to use to retry startup when an exception occurs during startup.
196+
* @param startupFailureRetryTemplate the retry template to use
197+
* @since 1.2.0
198+
*/
199+
public void setStartupFailureRetryTemplate(RetryTemplate startupFailureRetryTemplate) {
200+
this.startupFailureRetryTemplate = startupFailureRetryTemplate;
201+
if (this.startupFailureRetryTemplate != null) {
202+
setStartupFailurePolicy(StartupFailurePolicy.RETRY);
203+
}
204+
}
205+
206+
public StartupFailurePolicy getStartupFailurePolicy() {
207+
return this.startupFailurePolicy;
208+
}
209+
210+
/**
211+
* The action to take on the container when a failure occurs during startup.
212+
* @param startupFailurePolicy action to take when a failure occurs during startup
213+
* @since 1.2.0
214+
*/
215+
public void setStartupFailurePolicy(StartupFailurePolicy startupFailurePolicy) {
216+
this.startupFailurePolicy = Objects.requireNonNull(startupFailurePolicy,
217+
"startupFailurePolicy must not be null");
218+
}
219+
164220
}

0 commit comments

Comments
 (0)