Skip to content

Commit fc4742f

Browse files
committed
Replace spring-retry with core-retry
Resolves #1224 Signed-off-by: onobc <[email protected]>
1 parent 9375135 commit fc4742f

File tree

13 files changed

+130
-112
lines changed

13 files changed

+130
-112
lines changed

gradle/libs.versions.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ spring-dep-mgmt = "1.1.7"
2525
spring-boot = "4.0.0-SNAPSHOT"
2626
spring-boot-for-docs = "4.0.0-SNAPSHOT"
2727
spring-cloud-stream = "5.0.0-SNAPSHOT"
28-
spring-retry = "2.0.12"
2928
system-lambda = "1.2.1"
3029
testcontainers = "1.21.3"
3130
# plugins
@@ -56,7 +55,6 @@ pulsar-client-reactive-adapter = { module = "org.apache.pulsar:pulsar-client-rea
5655
pulsar-client-reactive-producer-cache-caffeine-shaded = { module = "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded", version.ref = "pulsar-reactive" }
5756
reactor-bom = { module = "io.projectreactor:reactor-bom", version.ref = "reactor" }
5857
spring-bom = { module = "org.springframework:spring-framework-bom", version.ref = "spring" }
59-
spring-retry = { module = "org.springframework.retry:spring-retry", version.ref = "spring-retry" }
6058
# Testing libs
6159
assertj-bom = { module = "org.assertj:assertj-bom", version.ref = "assertj" }
6260
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }

spring-pulsar-dependencies/spring-pulsar-dependencies.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ dependencies {
2929
api libs.pulsar.client.all
3030
api libs.pulsar.client.reactive.adapter
3131
api libs.pulsar.client.reactive.producer.cache.caffeine.shaded
32-
api libs.spring.retry
3332
api libs.system.lambda
3433
}
3534
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.jspecify.annotations.Nullable;
3333

3434
import org.springframework.core.log.LogAccessor;
35-
import org.springframework.pulsar.PulsarException;
35+
import org.springframework.core.retry.RetryException;
3636
import org.springframework.pulsar.config.StartupFailurePolicy;
3737
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
3838
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
@@ -163,8 +163,18 @@ private void doStart() {
163163
CompletableFuture.supplyAsync(() -> {
164164
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
165165
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
166-
return retryTemplate
167-
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
166+
try {
167+
AtomicBoolean initialAttempt = new AtomicBoolean(true);
168+
return retryTemplate.execute(() -> {
169+
if (initialAttempt.getAndSet(false)) {
170+
throw new RuntimeException("Ignore initial attempt in retry template");
171+
}
172+
return startPipeline(containerProps);
173+
});
174+
}
175+
catch (RetryException e) {
176+
throw new RuntimeException(e);
177+
}
168178
}).whenComplete((p, ex) -> {
169179
if (ex == null) {
170180
this.pipeline = p;

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import org.apache.pulsar.common.schema.SchemaType;
2727
import org.jspecify.annotations.Nullable;
2828

29+
import org.springframework.core.retry.RetryPolicy;
30+
import org.springframework.core.retry.RetryTemplate;
2931
import org.springframework.pulsar.config.StartupFailurePolicy;
3032
import org.springframework.pulsar.core.DefaultSchemaResolver;
3133
import org.springframework.pulsar.core.DefaultTopicResolver;
3234
import org.springframework.pulsar.core.SchemaResolver;
3335
import org.springframework.pulsar.core.TopicResolver;
34-
import org.springframework.retry.support.RetryTemplate;
36+
import org.springframework.util.backoff.FixedBackOff;
3537

3638
/**
3739
* Contains runtime properties for a reactive listener container.
@@ -68,10 +70,8 @@ public class ReactivePulsarContainerProperties<T> {
6870

6971
private RetryTemplate startupFailureRetryTemplate;
7072

71-
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
72-
.maxAttempts(3)
73-
.fixedBackoff(Duration.ofSeconds(10))
74-
.build();
73+
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
74+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());
7575

7676
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
7777

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@
4747
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
4848
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
4949
import org.assertj.core.api.InstanceOfAssertFactories;
50+
import org.jspecify.annotations.Nullable;
5051
import org.junit.jupiter.api.Nested;
5152
import org.junit.jupiter.api.Test;
5253

5354
import org.springframework.core.log.LogAccessor;
55+
import org.springframework.core.retry.RetryListener;
56+
import org.springframework.core.retry.RetryPolicy;
57+
import org.springframework.core.retry.RetryTemplate;
58+
import org.springframework.core.retry.Retryable;
5459
import org.springframework.pulsar.PulsarException;
5560
import org.springframework.pulsar.config.StartupFailurePolicy;
5661
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
@@ -63,10 +68,7 @@
6368
import org.springframework.pulsar.test.model.UserRecord;
6469
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
6570
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
66-
import org.springframework.retry.RetryCallback;
67-
import org.springframework.retry.RetryContext;
68-
import org.springframework.retry.RetryListener;
69-
import org.springframework.retry.support.RetryTemplate;
71+
import org.springframework.util.backoff.FixedBackOff;
7072

7173
import reactor.core.publisher.Flux;
7274
import reactor.core.publisher.Mono;
@@ -459,22 +461,19 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() throws E
459461
var thrown = new ArrayList<Throwable>();
460462
var retryListener = new RetryListener() {
461463
@Override
462-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
463-
Throwable throwable) {
464-
retryCount.set(context.getRetryCount());
464+
public void onRetrySuccess(RetryPolicy retryPolicy, Retryable<?> retryable, @Nullable Object result) {
465+
retryCount.incrementAndGet();
465466
}
466467

467468
@Override
468-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
469-
Throwable throwable) {
469+
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
470+
retryCount.incrementAndGet();
470471
thrown.add(throwable);
471472
}
472473
};
473-
var retryTemplate = RetryTemplate.builder()
474-
.maxAttempts(2)
475-
.fixedBackoff(Duration.ofSeconds(1))
476-
.withListener(retryListener)
477-
.build();
474+
var retryTemplate = new RetryTemplate(
475+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
476+
retryTemplate.setRetryListener(retryListener);
478477
var containerProps = new ReactivePulsarContainerProperties<String>();
479478
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
480479
containerProps.setStartupFailureRetryTemplate(retryTemplate);
@@ -515,22 +514,20 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception
515514
var thrown = new ArrayList<Throwable>();
516515
var retryListener = new RetryListener() {
517516
@Override
518-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
519-
Throwable throwable) {
520-
retryCount.set(context.getRetryCount());
517+
public void onRetrySuccess(RetryPolicy retryPolicy, Retryable<?> retryable,
518+
@Nullable Object result) {
519+
retryCount.incrementAndGet();
521520
}
522521

523522
@Override
524-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
525-
Throwable throwable) {
523+
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
524+
retryCount.incrementAndGet();
526525
thrown.add(throwable);
527526
}
528527
};
529-
var retryTemplate = RetryTemplate.builder()
530-
.maxAttempts(3)
531-
.fixedBackoff(Duration.ofSeconds(1))
532-
.withListener(retryListener)
533-
.build();
528+
var retryTemplate = new RetryTemplate(
529+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
530+
retryTemplate.setRetryListener(retryListener);
534531
var latch = new CountDownLatch(1);
535532
var containerProps = new ReactivePulsarContainerProperties<String>();
536533
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
@@ -553,8 +550,8 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
553550

554551
// factory called 3x (initial call + 2 retries)
555552
verify(consumerFactory, times(3)).createConsumer(any(), any());
556-
// only had to retry once (2nd call in retry template succeeded)
557-
assertThat(retryCount).hasValue(1);
553+
// had to retry 2x (1st retry fails and 2nd retry passes)
554+
assertThat(retryCount).hasValue(2);
558555
assertThat(thrown).containsExactly(failCause);
559556
// should be able to process messages
560557
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic);

spring-pulsar/spring-pulsar.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ dependencies {
1717
api 'org.springframework:spring-context'
1818
api 'org.springframework:spring-messaging'
1919
api 'org.springframework:spring-tx'
20-
api (libs.spring.retry) {
21-
exclude group: 'org.springframework'
22-
}
2320
api project(':spring-pulsar-cache-provider')
2421
implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow')
2522
implementation 'com.fasterxml.jackson.core:jackson-core'

spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.springframework.beans.BeanUtils;
6060
import org.springframework.context.ApplicationEventPublisher;
6161
import org.springframework.core.log.LogAccessor;
62+
import org.springframework.core.retry.RetryException;
6263
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6364
import org.springframework.pulsar.PulsarException;
6465
import org.springframework.pulsar.config.StartupFailurePolicy;
@@ -146,7 +147,6 @@ protected void doStart() {
146147
throw new IllegalStateException(msg, e);
147148
}
148149
}
149-
150150
if (this.listenerConsumer != null) {
151151
this.logger.debug(() -> "Successfully created completable - submitting to executor");
152152
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
@@ -157,8 +157,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R
157157
this.listenerConsumerFuture = consumerExecutor.submitCompletable(() -> {
158158
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
159159
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
160-
this.listenerConsumer = retryTemplate
161-
.<Listener, PulsarException>execute((__) -> new Listener(messageListener, containerProperties));
160+
try {
161+
AtomicBoolean initialAttempt = new AtomicBoolean(true);
162+
this.listenerConsumer = retryTemplate.execute(() -> {
163+
if (initialAttempt.getAndSet(false)) {
164+
throw new RuntimeException("Ignore initial attempt in retry template");
165+
}
166+
return new Listener(messageListener, containerProperties);
167+
});
168+
}
169+
catch (RetryException e) {
170+
throw new RuntimeException(e);
171+
}
172+
Assert.notNull(this.listenerConsumer, "listenerConsumer must not be null");
162173
this.listenerConsumer.run();
163174
}).whenComplete((__, ex) -> {
164175
if (ex == null) {

spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.pulsar.common.schema.SchemaType;
2828
import org.jspecify.annotations.Nullable;
2929

30+
import org.springframework.core.retry.RetryPolicy;
31+
import org.springframework.core.retry.RetryTemplate;
3032
import org.springframework.core.task.AsyncTaskExecutor;
3133
import org.springframework.pulsar.config.StartupFailurePolicy;
3234
import org.springframework.pulsar.core.DefaultSchemaResolver;
@@ -36,10 +38,10 @@
3638
import org.springframework.pulsar.core.TransactionProperties;
3739
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
3840
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
39-
import org.springframework.retry.support.RetryTemplate;
4041
import org.springframework.transaction.TransactionDefinition;
4142
import org.springframework.transaction.support.DefaultTransactionDefinition;
4243
import org.springframework.util.Assert;
44+
import org.springframework.util.backoff.FixedBackOff;
4345

4446
import io.micrometer.observation.ObservationRegistry;
4547

@@ -105,10 +107,8 @@ public class PulsarContainerProperties {
105107

106108
private @Nullable RetryTemplate startupFailureRetryTemplate;
107109

108-
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
109-
.maxAttempts(3)
110-
.fixedBackoff(Duration.ofSeconds(10))
111-
.build();
110+
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
111+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());
112112

113113
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
114114

spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Optional;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicReference;
2728

2829
import org.apache.pulsar.client.api.Message;
@@ -34,8 +35,8 @@
3435
import org.jspecify.annotations.Nullable;
3536

3637
import org.springframework.context.ApplicationEventPublisher;
38+
import org.springframework.core.retry.RetryException;
3739
import org.springframework.core.task.SimpleAsyncTaskExecutor;
38-
import org.springframework.pulsar.PulsarException;
3940
import org.springframework.pulsar.config.StartupFailurePolicy;
4041
import org.springframework.pulsar.core.PulsarReaderFactory;
4142
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
@@ -109,8 +110,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R
109110
readerExecutor.submitCompletable(() -> {
110111
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
111112
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
112-
this.internalAsyncReader.set(retryTemplate.<InternalAsyncReader, PulsarException>execute(
113-
(__) -> new InternalAsyncReader(readerListener, containerProperties)));
113+
try {
114+
AtomicBoolean initialAttempt = new AtomicBoolean(true);
115+
this.internalAsyncReader.set(retryTemplate.execute(() -> {
116+
if (initialAttempt.getAndSet(false)) {
117+
throw new RuntimeException("Ignore initial attempt in retry template");
118+
}
119+
return new InternalAsyncReader(readerListener, containerProperties);
120+
}));
121+
}
122+
catch (RetryException e) {
123+
throw new RuntimeException(e);
124+
}
125+
Assert.notNull(this.internalAsyncReader.get(), "internalAsynReader must not be null");
114126
this.internalAsyncReader.get().run();
115127
}).whenComplete((__, ex) -> {
116128
if (ex == null) {

spring-pulsar/src/main/java/org/springframework/pulsar/reader/PulsarReaderContainerProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
import org.jspecify.annotations.NullUnmarked;
2828
import org.jspecify.annotations.Nullable;
2929

30+
import org.springframework.core.retry.RetryPolicy;
31+
import org.springframework.core.retry.RetryTemplate;
3032
import org.springframework.core.task.AsyncTaskExecutor;
3133
import org.springframework.pulsar.config.StartupFailurePolicy;
3234
import org.springframework.pulsar.core.DefaultSchemaResolver;
3335
import org.springframework.pulsar.core.SchemaResolver;
34-
import org.springframework.retry.support.RetryTemplate;
3536
import org.springframework.util.Assert;
37+
import org.springframework.util.backoff.FixedBackOff;
3638

3739
/**
3840
* Container properties for Pulsar {@link Reader}.
@@ -62,10 +64,8 @@ public class PulsarReaderContainerProperties {
6264

6365
private @Nullable RetryTemplate startupFailureRetryTemplate;
6466

65-
private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
66-
.maxAttempts(3)
67-
.fixedBackoff(Duration.ofSeconds(10))
68-
.build();
67+
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
68+
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());
6969

7070
private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;
7171

0 commit comments

Comments
 (0)