Skip to content

Commit 03feb3f

Browse files
committed
Replace spring-retry with core-retry
Resolves #1224 Signed-off-by: onobc <[email protected]>
1 parent 81428c9 commit 03feb3f

File tree

13 files changed

+118
-174
lines changed

13 files changed

+118
-174
lines changed

gradle/libs.versions.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ spring-dep-mgmt = "1.1.7"
2525
spring-boot = "4.0.0-SNAPSHOT"
2626
spring-boot-for-docs = "4.0.0-SNAPSHOT"
2727
spring-cloud-stream = "5.0.0-SNAPSHOT"
28-
spring-retry = "2.0.12"
2928
system-lambda = "1.2.1"
3029
testcontainers = "1.21.3"
3130
# plugins
@@ -56,7 +55,6 @@ pulsar-client-reactive-adapter = { module = "org.apache.pulsar:pulsar-client-rea
5655
pulsar-client-reactive-producer-cache-caffeine-shaded = { module = "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded", version.ref = "pulsar-reactive" }
5756
reactor-bom = { module = "io.projectreactor:reactor-bom", version.ref = "reactor" }
5857
spring-bom = { module = "org.springframework:spring-framework-bom", version.ref = "spring" }
59-
spring-retry = { module = "org.springframework.retry:spring-retry", version.ref = "spring-retry" }
6058
# Testing libs
6159
assertj-bom = { module = "org.assertj:assertj-bom", version.ref = "assertj" }
6260
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }

spring-pulsar-dependencies/spring-pulsar-dependencies.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ dependencies {
2929
api libs.pulsar.client.all
3030
api libs.pulsar.client.reactive.adapter
3131
api libs.pulsar.client.reactive.producer.cache.caffeine.shaded
32-
api libs.spring.retry
3332
api libs.system.lambda
3433
}
3534
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.List;
2121
import java.util.Objects;
2222
import java.util.Optional;
23-
import java.util.concurrent.CompletableFuture;
2423
import java.util.concurrent.atomic.AtomicBoolean;
2524
import java.util.concurrent.locks.ReentrantLock;
2625

@@ -32,7 +31,7 @@
3231
import org.jspecify.annotations.Nullable;
3332

3433
import org.springframework.core.log.LogAccessor;
35-
import org.springframework.pulsar.PulsarException;
34+
import org.springframework.core.retry.RetryException;
3635
import org.springframework.pulsar.config.StartupFailurePolicy;
3736
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
3837
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
@@ -143,7 +142,18 @@ private void doStart() {
143142
setRunning(true);
144143
var containerProps = this.getContainerProperties();
145144
try {
146-
this.pipeline = startPipeline(this.pulsarContainerProperties);
145+
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
146+
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
147+
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
148+
this.pipeline = retryTemplate.execute(() -> startPipeline(containerProps));
149+
}
150+
else {
151+
this.pipeline = startPipeline(this.pulsarContainerProperties);
152+
}
153+
}
154+
catch (RetryException ex) {
155+
this.logger.error(ex, () -> "Unable to re-start reactive pipeline - retries exhausted");
156+
this.doStop();
147157
}
148158
catch (Exception e) {
149159
this.logger.error(e, () -> "Error starting Reactive pipeline");
@@ -153,29 +163,6 @@ private void doStart() {
153163
throw new IllegalStateException("Error starting Reactive pipeline", e);
154164
}
155165
}
156-
// Pipeline started w/o errors - short circuit
157-
if (this.pipeline != null && this.pipeline.isRunning()) {
158-
return;
159-
}
160-
161-
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
162-
this.logger.info(() -> "Configured to retry on startup failures - retrying");
163-
CompletableFuture.supplyAsync(() -> {
164-
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
165-
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
166-
return retryTemplate
167-
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
168-
}).whenComplete((p, ex) -> {
169-
if (ex == null) {
170-
this.pipeline = p;
171-
setRunning(this.pipeline != null ? this.pipeline.isRunning() : false);
172-
}
173-
else {
174-
this.logger.error(ex, () -> "Unable to start Reactive pipeline");
175-
this.doStop();
176-
}
177-
});
178-
}
179166
}
180167

181168
public void doStop() {

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import org.apache.pulsar.common.schema.SchemaType;
2727
import org.jspecify.annotations.Nullable;
2828

29+
import org.springframework.core.retry.RetryPolicy;
30+
import org.springframework.core.retry.RetryTemplate;
2931
import org.springframework.pulsar.config.StartupFailurePolicy;
3032
import org.springframework.pulsar.core.DefaultSchemaResolver;
3133
import org.springframework.pulsar.core.DefaultTopicResolver;
3234
import org.springframework.pulsar.core.SchemaResolver;
3335
import org.springframework.pulsar.core.TopicResolver;
34-
import org.springframework.retry.support.RetryTemplate;
36+
import org.springframework.util.backoff.FixedBackOff;
3537

3638
/**
3739
* Contains runtime properties for a reactive listener container.
@@ -68,10 +70,8 @@ public class ReactivePulsarContainerProperties<T> {
6870

6971
private RetryTemplate startupFailureRetryTemplate;
7072

71-
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
72-
.maxAttempts(3)
73-
.fixedBackoff(Duration.ofSeconds(10))
74-
.build();
73+
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
74+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());
7575

7676
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
7777

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
import org.junit.jupiter.api.Test;
5252

5353
import org.springframework.core.log.LogAccessor;
54+
import org.springframework.core.retry.RetryListener;
55+
import org.springframework.core.retry.RetryPolicy;
56+
import org.springframework.core.retry.RetryTemplate;
57+
import org.springframework.core.retry.Retryable;
5458
import org.springframework.pulsar.PulsarException;
5559
import org.springframework.pulsar.config.StartupFailurePolicy;
5660
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
@@ -63,10 +67,7 @@
6367
import org.springframework.pulsar.test.model.UserRecord;
6468
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
6569
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
66-
import org.springframework.retry.RetryCallback;
67-
import org.springframework.retry.RetryContext;
68-
import org.springframework.retry.RetryListener;
69-
import org.springframework.retry.support.RetryTemplate;
70+
import org.springframework.util.backoff.FixedBackOff;
7071

7172
import reactor.core.publisher.Flux;
7273
import reactor.core.publisher.Mono;
@@ -459,22 +460,18 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() throws E
459460
var thrown = new ArrayList<Throwable>();
460461
var retryListener = new RetryListener() {
461462
@Override
462-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
463-
Throwable throwable) {
464-
retryCount.set(context.getRetryCount());
463+
public void beforeRetry(RetryPolicy retryPolicy, Retryable<?> retryable) {
464+
retryCount.incrementAndGet();
465465
}
466466

467467
@Override
468-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
469-
Throwable throwable) {
468+
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
470469
thrown.add(throwable);
471470
}
472471
};
473-
var retryTemplate = RetryTemplate.builder()
474-
.maxAttempts(2)
475-
.fixedBackoff(Duration.ofSeconds(1))
476-
.withListener(retryListener)
477-
.build();
472+
var retryTemplate = new RetryTemplate(
473+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
474+
retryTemplate.setRetryListener(retryListener);
478475
var containerProps = new ReactivePulsarContainerProperties<String>();
479476
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
480477
containerProps.setStartupFailureRetryTemplate(retryTemplate);
@@ -515,22 +512,18 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception
515512
var thrown = new ArrayList<Throwable>();
516513
var retryListener = new RetryListener() {
517514
@Override
518-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
519-
Throwable throwable) {
520-
retryCount.set(context.getRetryCount());
515+
public void beforeRetry(RetryPolicy retryPolicy, Retryable<?> retryable) {
516+
retryCount.incrementAndGet();
521517
}
522518

523519
@Override
524-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
525-
Throwable throwable) {
520+
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
526521
thrown.add(throwable);
527522
}
528523
};
529-
var retryTemplate = RetryTemplate.builder()
530-
.maxAttempts(3)
531-
.fixedBackoff(Duration.ofSeconds(1))
532-
.withListener(retryListener)
533-
.build();
524+
var retryTemplate = new RetryTemplate(
525+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
526+
retryTemplate.setRetryListener(retryListener);
534527
var latch = new CountDownLatch(1);
535528
var containerProps = new ReactivePulsarContainerProperties<String>();
536529
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
@@ -554,7 +547,7 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
554547
// factory called 3x (initial call + 2 retries)
555548
verify(consumerFactory, times(3)).createConsumer(any(), any());
556549
// only had to retry once (2nd call in retry template succeeded)
557-
assertThat(retryCount).hasValue(1);
550+
assertThat(retryCount).hasValue(2);
558551
assertThat(thrown).containsExactly(failCause);
559552
// should be able to process messages
560553
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic);

spring-pulsar/spring-pulsar.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ dependencies {
1818
api 'org.springframework:spring-context'
1919
api 'org.springframework:spring-messaging'
2020
api 'org.springframework:spring-tx'
21-
api (libs.spring.retry) {
22-
exclude group: 'org.springframework'
23-
}
2421
api project(':spring-pulsar-cache-provider')
2522
implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow')
2623
implementation 'com.fasterxml.jackson.core:jackson-core'

spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.springframework.beans.BeanUtils;
6060
import org.springframework.context.ApplicationEventPublisher;
6161
import org.springframework.core.log.LogAccessor;
62+
import org.springframework.core.retry.RetryException;
6263
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6364
import org.springframework.pulsar.PulsarException;
6465
import org.springframework.pulsar.config.StartupFailurePolicy;
@@ -133,14 +134,24 @@ protected void doStart() {
133134
@SuppressWarnings("unchecked")
134135
var messageListener = (MessageListener<T>) containerProperties.getMessageListener();
135136
try {
136-
this.listenerConsumer = new Listener(messageListener, containerProperties);
137+
if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
138+
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
139+
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
140+
this.listenerConsumer = retryTemplate.execute(() -> new Listener(messageListener, containerProperties));
141+
}
142+
else {
143+
this.listenerConsumer = new Listener(messageListener, containerProperties);
144+
}
145+
}
146+
catch (RetryException ex) {
147+
this.logger.error(ex, () -> "Unable to re-start listener container [%s] - retries exhausted"
148+
.formatted(this.getBeanName()));
149+
this.publishConsumerFailedToStart();
137150
}
138151
catch (Exception e) {
139152
var msg = "Error starting listener container [%s]".formatted(this.getBeanName());
140153
this.logger.error(e, () -> msg);
141-
if (containerProperties.getStartupFailurePolicy() != StartupFailurePolicy.RETRY) {
142-
this.publishConsumerFailedToStart();
143-
}
154+
this.publishConsumerFailedToStart();
144155
if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.STOP) {
145156
this.logger.info(() -> "Configured to stop on startup failures - exiting");
146157
throw new IllegalStateException(msg, e);
@@ -152,26 +163,6 @@ protected void doStart() {
152163
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
153164
waitForStartup(containerProperties.determineConsumerStartTimeout());
154165
}
155-
else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
156-
this.logger.info(() -> "Configured to retry on startup failure - retrying asynchronously");
157-
this.listenerConsumerFuture = consumerExecutor.submitCompletable(() -> {
158-
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
159-
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
160-
this.listenerConsumer = retryTemplate
161-
.<Listener, PulsarException>execute((__) -> new Listener(messageListener, containerProperties));
162-
this.listenerConsumer.run();
163-
}).whenComplete((__, ex) -> {
164-
if (ex == null) {
165-
this.logger
166-
.info(() -> "Successfully re-started listener container [%s]".formatted(this.getBeanName()));
167-
}
168-
else {
169-
this.logger.error(ex, () -> "Unable to re-start listener container [%s] - retries exhausted"
170-
.formatted(this.getBeanName()));
171-
this.publishConsumerFailedToStart();
172-
}
173-
});
174-
}
175166
}
176167

177168
private void waitForStartup(Duration waitTime) {

spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.pulsar.common.schema.SchemaType;
2828
import org.jspecify.annotations.Nullable;
2929

30+
import org.springframework.core.retry.RetryPolicy;
31+
import org.springframework.core.retry.RetryTemplate;
3032
import org.springframework.core.task.AsyncTaskExecutor;
3133
import org.springframework.pulsar.config.StartupFailurePolicy;
3234
import org.springframework.pulsar.core.DefaultSchemaResolver;
@@ -36,10 +38,10 @@
3638
import org.springframework.pulsar.core.TransactionProperties;
3739
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
3840
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
39-
import org.springframework.retry.support.RetryTemplate;
4041
import org.springframework.transaction.TransactionDefinition;
4142
import org.springframework.transaction.support.DefaultTransactionDefinition;
4243
import org.springframework.util.Assert;
44+
import org.springframework.util.backoff.FixedBackOff;
4345

4446
import io.micrometer.observation.ObservationRegistry;
4547

@@ -105,10 +107,8 @@ public class PulsarContainerProperties {
105107

106108
private @Nullable RetryTemplate startupFailureRetryTemplate;
107109

108-
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
109-
.maxAttempts(3)
110-
.fixedBackoff(Duration.ofSeconds(10))
111-
.build();
110+
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
111+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());
112112

113113
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
114114

spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
import org.jspecify.annotations.Nullable;
3535

3636
import org.springframework.context.ApplicationEventPublisher;
37+
import org.springframework.core.retry.RetryException;
3738
import org.springframework.core.task.SimpleAsyncTaskExecutor;
38-
import org.springframework.pulsar.PulsarException;
3939
import org.springframework.pulsar.config.StartupFailurePolicy;
4040
import org.springframework.pulsar.core.PulsarReaderFactory;
4141
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
@@ -85,45 +85,35 @@ protected void doStart() {
8585
@SuppressWarnings("unchecked")
8686
var readerListener = (ReaderListener<T>) containerProperties.getReaderListener();
8787
try {
88-
this.internalAsyncReader.set(new InternalAsyncReader(readerListener, containerProperties));
88+
if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
89+
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
90+
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
91+
this.internalAsyncReader
92+
.set(retryTemplate.execute(() -> new InternalAsyncReader(readerListener, containerProperties)));
93+
}
94+
else {
95+
this.internalAsyncReader.set(new InternalAsyncReader(readerListener, containerProperties));
96+
}
97+
}
98+
catch (RetryException ex) {
99+
this.logger.error(ex,
100+
() -> "Unable to re-start reader container [%s] - retries exhausted".formatted(this.getBeanName()));
101+
this.publishReaderFailedToStart();
89102
}
90103
catch (Exception e) {
91104
var msg = "Error starting reader container [%s]".formatted(this.getBeanName());
92105
this.logger.error(e, () -> msg);
93-
if (containerProperties.getStartupFailurePolicy() != StartupFailurePolicy.RETRY) {
94-
this.publishReaderFailedToStart();
95-
}
106+
this.publishReaderFailedToStart();
96107
if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.STOP) {
97108
this.logger.info(() -> "Configured to stop on startup failures - exiting");
98109
throw new IllegalStateException(msg, e);
99110
}
100111
}
101-
102112
if (this.internalAsyncReader.get() != null) {
103113
this.logger.debug(() -> "Successfully created completable - submitting to executor");
104114
readerExecutor.submitCompletable(this.internalAsyncReader.get());
105115
waitForStartup(containerProperties.getReaderStartTimeout());
106116
}
107-
else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
108-
this.logger.info(() -> "Configured to retry on startup failures - retrying asynchronously");
109-
readerExecutor.submitCompletable(() -> {
110-
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
111-
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
112-
this.internalAsyncReader.set(retryTemplate.<InternalAsyncReader, PulsarException>execute(
113-
(__) -> new InternalAsyncReader(readerListener, containerProperties)));
114-
this.internalAsyncReader.get().run();
115-
}).whenComplete((__, ex) -> {
116-
if (ex == null) {
117-
this.logger
118-
.info(() -> "Successfully re-started reader container [%s]".formatted(this.getBeanName()));
119-
}
120-
else {
121-
this.logger.error(ex, () -> "Unable to re-start reader container [%s] - retries exhausted"
122-
.formatted(this.getBeanName()));
123-
this.publishReaderFailedToStart();
124-
}
125-
});
126-
}
127117
}
128118

129119
private void waitForStartup(Duration waitTime) {

0 commit comments

Comments
 (0)