diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5b6cfcb2..c5e3f405 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -25,7 +25,6 @@ spring-dep-mgmt = "1.1.7" spring-boot = "4.0.0-SNAPSHOT" spring-boot-for-docs = "4.0.0-SNAPSHOT" spring-cloud-stream = "5.0.0-SNAPSHOT" -spring-retry = "2.0.12" system-lambda = "1.2.1" testcontainers = "1.21.3" # plugins @@ -56,7 +55,6 @@ pulsar-client-reactive-adapter = { module = "org.apache.pulsar:pulsar-client-rea pulsar-client-reactive-producer-cache-caffeine-shaded = { module = "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded", version.ref = "pulsar-reactive" } reactor-bom = { module = "io.projectreactor:reactor-bom", version.ref = "reactor" } spring-bom = { module = "org.springframework:spring-framework-bom", version.ref = "spring" } -spring-retry = { module = "org.springframework.retry:spring-retry", version.ref = "spring-retry" } # Testing libs assertj-bom = { module = "org.assertj:assertj-bom", version.ref = "assertj" } awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } diff --git a/spring-pulsar-dependencies/spring-pulsar-dependencies.gradle b/spring-pulsar-dependencies/spring-pulsar-dependencies.gradle index 63671ab1..796b567b 100644 --- a/spring-pulsar-dependencies/spring-pulsar-dependencies.gradle +++ b/spring-pulsar-dependencies/spring-pulsar-dependencies.gradle @@ -29,7 +29,6 @@ dependencies { api libs.pulsar.client.all api libs.pulsar.client.reactive.adapter api libs.pulsar.client.reactive.producer.cache.caffeine.shaded - api libs.spring.retry api libs.system.lambda } } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java index cb5e0553..8f9c5117 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java @@ -32,7 +32,7 @@ import org.jspecify.annotations.Nullable; import org.springframework.core.log.LogAccessor; -import org.springframework.pulsar.PulsarException; +import org.springframework.core.retry.RetryException; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; @@ -163,8 +163,18 @@ private void doStart() { CompletableFuture.supplyAsync(() -> { var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate()) .orElseGet(containerProps::getDefaultStartupFailureRetryTemplate); - return retryTemplate - .execute((__) -> startPipeline(containerProps)); + try { + AtomicBoolean initialAttempt = new AtomicBoolean(true); + return retryTemplate.execute(() -> { + if (initialAttempt.getAndSet(false)) { + throw new RuntimeException("Ignore initial attempt in retry template"); + } + return startPipeline(containerProps); + }); + } + catch (RetryException e) { + throw new RuntimeException(e); + } }).whenComplete((p, ex) -> { if (ex == null) { this.pipeline = p; diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java index 9fd257a3..c2e3b5b0 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java @@ -26,12 +26,14 @@ import org.apache.pulsar.common.schema.SchemaType; import org.jspecify.annotations.Nullable; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; -import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.backoff.FixedBackOff; /** * Contains runtime properties for a reactive listener container. @@ -68,10 +70,8 @@ public class ReactivePulsarContainerProperties { private RetryTemplate startupFailureRetryTemplate; - private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(10)) - .build(); + private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build()); private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP; diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java index 39d6102b..57340597 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java @@ -47,10 +47,15 @@ import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.assertj.core.api.InstanceOfAssertFactories; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryListener; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.retry.Retryable; import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; @@ -63,10 +68,7 @@ import org.springframework.pulsar.test.model.UserRecord; import org.springframework.pulsar.test.model.json.UserRecordObjectMapper; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; -import org.springframework.retry.RetryListener; -import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.backoff.FixedBackOff; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -459,22 +461,19 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() throws E var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(2) - .fixedBackoff(Duration.ofSeconds(1)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build()); + retryTemplate.setRetryListener(retryListener); var containerProps = new ReactivePulsarContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); containerProps.setStartupFailureRetryTemplate(retryTemplate); @@ -515,22 +514,20 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, + @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(1)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build()); + retryTemplate.setRetryListener(retryListener); var latch = new CountDownLatch(1); var containerProps = new ReactivePulsarContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); @@ -553,8 +550,8 @@ public void onError(RetryContext context, RetryCallback // factory called 3x (initial call + 2 retries) verify(consumerFactory, times(3)).createConsumer(any(), any()); - // only had to retry once (2nd call in retry template succeeded) - assertThat(retryCount).hasValue(1); + // had to retry 2x (1st retry fails and 2nd retry passes) + assertThat(retryCount).hasValue(2); assertThat(thrown).containsExactly(failCause); // should be able to process messages var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic); diff --git a/spring-pulsar/spring-pulsar.gradle b/spring-pulsar/spring-pulsar.gradle index e456aad4..c35c3a55 100644 --- a/spring-pulsar/spring-pulsar.gradle +++ b/spring-pulsar/spring-pulsar.gradle @@ -17,9 +17,6 @@ dependencies { api 'org.springframework:spring-context' api 'org.springframework:spring-messaging' api 'org.springframework:spring-tx' - api (libs.spring.retry) { - exclude group: 'org.springframework' - } api project(':spring-pulsar-cache-provider') implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow') implementation 'com.fasterxml.jackson.core:jackson-core' diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 0bc7e6b1..53949411 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -59,6 +59,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryException; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.config.StartupFailurePolicy; @@ -146,7 +147,6 @@ protected void doStart() { throw new IllegalStateException(msg, e); } } - if (this.listenerConsumer != null) { this.logger.debug(() -> "Successfully created completable - submitting to executor"); this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer); @@ -157,8 +157,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R this.listenerConsumerFuture = consumerExecutor.submitCompletable(() -> { var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate()) .orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate); - this.listenerConsumer = retryTemplate - .execute((__) -> new Listener(messageListener, containerProperties)); + try { + AtomicBoolean initialAttempt = new AtomicBoolean(true); + this.listenerConsumer = retryTemplate.execute(() -> { + if (initialAttempt.getAndSet(false)) { + throw new RuntimeException("Ignore initial attempt in retry template"); + } + return new Listener(messageListener, containerProperties); + }); + } + catch (RetryException e) { + throw new RuntimeException(e); + } + Assert.notNull(this.listenerConsumer, "listenerConsumer must not be null"); this.listenerConsumer.run(); }).whenComplete((__, ex) -> { if (ex == null) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java index b0b2d806..5b8ca5f5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java @@ -27,6 +27,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.jspecify.annotations.Nullable; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultSchemaResolver; @@ -36,10 +38,10 @@ import org.springframework.pulsar.core.TransactionProperties; import org.springframework.pulsar.observation.PulsarListenerObservationConvention; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; -import org.springframework.retry.support.RetryTemplate; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.Assert; +import org.springframework.util.backoff.FixedBackOff; import io.micrometer.observation.ObservationRegistry; @@ -105,10 +107,8 @@ public class PulsarContainerProperties { private @Nullable RetryTemplate startupFailureRetryTemplate; - private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(10)) - .build(); + private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build()); private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java index d5548522..62c84d7c 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainer.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.Message; @@ -34,8 +35,8 @@ import org.jspecify.annotations.Nullable; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.retry.RetryException; import org.springframework.core.task.SimpleAsyncTaskExecutor; -import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.ReaderBuilderCustomizer; @@ -109,8 +110,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R readerExecutor.submitCompletable(() -> { var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate()) .orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate); - this.internalAsyncReader.set(retryTemplate.execute( - (__) -> new InternalAsyncReader(readerListener, containerProperties))); + try { + AtomicBoolean initialAttempt = new AtomicBoolean(true); + this.internalAsyncReader.set(retryTemplate.execute(() -> { + if (initialAttempt.getAndSet(false)) { + throw new RuntimeException("Ignore initial attempt in retry template"); + } + return new InternalAsyncReader(readerListener, containerProperties); + })); + } + catch (RetryException e) { + throw new RuntimeException(e); + } + Assert.notNull(this.internalAsyncReader.get(), "internalAsynReader must not be null"); this.internalAsyncReader.get().run(); }).whenComplete((__, ex) -> { if (ex == null) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/reader/PulsarReaderContainerProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/reader/PulsarReaderContainerProperties.java index ff352069..778ba218 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/reader/PulsarReaderContainerProperties.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/reader/PulsarReaderContainerProperties.java @@ -27,12 +27,14 @@ import org.jspecify.annotations.NullUnmarked; import org.jspecify.annotations.Nullable; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; +import org.springframework.util.backoff.FixedBackOff; /** * Container properties for Pulsar {@link Reader}. @@ -62,10 +64,8 @@ public class PulsarReaderContainerProperties { private @Nullable RetryTemplate startupFailureRetryTemplate; - private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(10)) - .build(); + private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build()); private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerStartupFailureTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerStartupFailureTests.java index afaa479b..c382030b 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerStartupFailureTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerStartupFailureTests.java @@ -35,12 +35,14 @@ import org.junit.jupiter.api.Test; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; -import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.backoff.FixedBackOff; /** * Tests the startup failures policy on the @@ -153,7 +155,8 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); containerProps.setSchema(Schema.STRING); containerProps.setConcurrency(3); - var retryTemplate = RetryTemplate.builder().maxAttempts(2).fixedBackoff(Duration.ofSeconds(2)).build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build()); containerProps.setStartupFailureRetryTemplate(retryTemplate); var latch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> latch.countDown()); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java index 91ad82e8..a9b73e91 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java @@ -50,12 +50,17 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryListener; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.retry.Retryable; import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.ConsumerTestUtils; @@ -68,11 +73,8 @@ import org.springframework.pulsar.test.model.json.UserRecordObjectMapper; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; -import org.springframework.retry.RetryListener; -import org.springframework.retry.support.RetryTemplate; import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.util.backoff.FixedBackOff; /** * @author Soby Chacko @@ -545,22 +547,19 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() { var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(2) - .fixedBackoff(Duration.ofSeconds(2)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build()); + retryTemplate.setRetryListener(retryListener); var containerProps = new PulsarContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); containerProps.setStartupFailureRetryTemplate(retryTemplate); @@ -579,7 +578,7 @@ public void onError(RetryContext context, RetryCallback container.start(); // start container and expect ex not thrown and 2 retries - await().atMost(Duration.ofSeconds(15)).until(() -> retryCount.get() == 2); + await().atMost(Duration.ofSeconds(300)).until(() -> retryCount.get() == 2); assertThat(thrown).containsExactly(failCause, failCause); assertThat(container.isRunning()).isFalse(); // factory called 3x (initial + 2 retries) @@ -604,22 +603,19 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(2)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 3)).build()); + retryTemplate.setRetryListener(retryListener); var latch = new CountDownLatch(1); var containerProps = new PulsarContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); @@ -639,12 +635,12 @@ public void onError(RetryContext context, RetryCallback try { // start container and expect started after retries container.start(); - await().atMost(Duration.ofSeconds(10)).until(container::isRunning); + await().atMost(Duration.ofSeconds(300)).until(container::isRunning); // factory called 3x (initial call + 2 retries) verify(consumerFactory, times(3)).createConsumer(any(Schema.class), any(), any(), any(), any()); - // only had to retry once (2nd call in retry template succeeded) - assertThat(retryCount).hasValue(1); + // had to retry 2x (1st retry fails and 2nd retry passes) + assertThat(retryCount).hasValue(2); assertThat(thrown).containsExactly(failCause); // should be able to process messages var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainerTests.java index 05144c78..e20880c4 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainerTests.java @@ -48,6 +48,10 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryListener; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.retry.Retryable; import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; @@ -55,10 +59,7 @@ import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.event.ReaderFailedToStartEvent; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; -import org.springframework.retry.RetryListener; -import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.backoff.FixedBackOff; /** * Basic tests for {@link DefaultPulsarMessageReaderContainer}. @@ -247,22 +248,19 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() throws E var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(2) - .fixedBackoff(Duration.ofSeconds(2)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build()); + retryTemplate.setRetryListener(retryListener); var containerProps = new PulsarReaderContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); containerProps.setStartupFailureRetryTemplate(retryTemplate); @@ -303,22 +301,19 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception var thrown = new ArrayList(); var retryListener = new RetryListener() { @Override - public void close(RetryContext context, RetryCallback callback, - Throwable throwable) { - retryCount.set(context.getRetryCount()); + public void onRetrySuccess(RetryPolicy retryPolicy, Retryable retryable, @Nullable Object result) { + retryCount.incrementAndGet(); } @Override - public void onError(RetryContext context, RetryCallback callback, - Throwable throwable) { + public void onRetryFailure(RetryPolicy retryPolicy, Retryable retryable, Throwable throwable) { + retryCount.incrementAndGet(); thrown.add(throwable); } }; - var retryTemplate = RetryTemplate.builder() - .maxAttempts(3) - .fixedBackoff(Duration.ofSeconds(2)) - .withListener(retryListener) - .build(); + var retryTemplate = new RetryTemplate( + RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 3)).build()); + retryTemplate.setRetryListener(retryListener); var latch = new CountDownLatch(1); var containerProps = new PulsarReaderContainerProperties(); containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY); @@ -342,8 +337,8 @@ public void onError(RetryContext context, RetryCallback // factory called 3x (initial call + 2 retries) verify(readerFactory, times(3)).createReader(any(), any(), any(), any()); - // only had to retry once (2nd call in retry template succeeded) - assertThat(retryCount).hasValue(1); + // had to retry 2x (1st retry fails and 2nd retry passes) + assertThat(retryCount).hasValue(2); assertThat(thrown).containsExactly(failCause); // should be able to process messages var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic);