Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ spring-dep-mgmt = "1.1.7"
spring-boot = "4.0.0-SNAPSHOT"
spring-boot-for-docs = "4.0.0-SNAPSHOT"
spring-cloud-stream = "5.0.0-SNAPSHOT"
spring-retry = "2.0.12"
system-lambda = "1.2.1"
testcontainers = "1.21.3"
# plugins
Expand Down Expand Up @@ -56,7 +55,6 @@ pulsar-client-reactive-adapter = { module = "org.apache.pulsar:pulsar-client-rea
pulsar-client-reactive-producer-cache-caffeine-shaded = { module = "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded", version.ref = "pulsar-reactive" }
reactor-bom = { module = "io.projectreactor:reactor-bom", version.ref = "reactor" }
spring-bom = { module = "org.springframework:spring-framework-bom", version.ref = "spring" }
spring-retry = { module = "org.springframework.retry:spring-retry", version.ref = "spring-retry" }
# Testing libs
assertj-bom = { module = "org.assertj:assertj-bom", version.ref = "assertj" }
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies {
api libs.pulsar.client.all
api libs.pulsar.client.reactive.adapter
api libs.pulsar.client.reactive.producer.cache.caffeine.shaded
api libs.spring.retry
api libs.system.lambda
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.core.retry.RetryException;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
Expand Down Expand Up @@ -163,8 +163,18 @@ private void doStart() {
CompletableFuture.supplyAsync(() -> {
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
return retryTemplate
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
try {
AtomicBoolean initialAttempt = new AtomicBoolean(true);
return retryTemplate.execute(() -> {
if (initialAttempt.getAndSet(false)) {
throw new RuntimeException("Ignore initial attempt in retry template");
}
return startPipeline(containerProps);
});
}
catch (RetryException e) {
throw new RuntimeException(e);
}
}).whenComplete((p, ex) -> {
if (ex == null) {
this.pipeline = p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.backoff.FixedBackOff;

/**
* Contains runtime properties for a reactive listener container.
Expand Down Expand Up @@ -68,10 +70,8 @@ public class ReactivePulsarContainerProperties<T> {

private RetryTemplate startupFailureRetryTemplate;

private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(Duration.ofSeconds(10))
.build();
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());

private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.core.retry.RetryListener;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.retry.Retryable;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
Expand All @@ -63,10 +68,7 @@
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.backoff.FixedBackOff;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -459,22 +461,19 @@ void whenPolicyIsRetryAndRetriesAreExhaustedThenContainerDoesNotStart() throws E
var thrown = new ArrayList<Throwable>();
var retryListener = new RetryListener() {
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
retryCount.set(context.getRetryCount());
public void onRetrySuccess(RetryPolicy retryPolicy, Retryable<?> retryable, @Nullable Object result) {
retryCount.incrementAndGet();
}

@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
retryCount.incrementAndGet();
thrown.add(throwable);
}
};
var retryTemplate = RetryTemplate.builder()
.maxAttempts(2)
.fixedBackoff(Duration.ofSeconds(1))
.withListener(retryListener)
.build();
var retryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
retryTemplate.setRetryListener(retryListener);
var containerProps = new ReactivePulsarContainerProperties<String>();
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
containerProps.setStartupFailureRetryTemplate(retryTemplate);
Expand Down Expand Up @@ -515,22 +514,20 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception
var thrown = new ArrayList<Throwable>();
var retryListener = new RetryListener() {
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
retryCount.set(context.getRetryCount());
public void onRetrySuccess(RetryPolicy retryPolicy, Retryable<?> retryable,
@Nullable Object result) {
retryCount.incrementAndGet();
}

@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
public void onRetryFailure(RetryPolicy retryPolicy, Retryable<?> retryable, Throwable throwable) {
retryCount.incrementAndGet();
thrown.add(throwable);
}
};
var retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(Duration.ofSeconds(1))
.withListener(retryListener)
.build();
var retryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
retryTemplate.setRetryListener(retryListener);
var latch = new CountDownLatch(1);
var containerProps = new ReactivePulsarContainerProperties<String>();
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
Expand All @@ -553,8 +550,8 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback

// factory called 3x (initial call + 2 retries)
verify(consumerFactory, times(3)).createConsumer(any(), any());
// only had to retry once (2nd call in retry template succeeded)
assertThat(retryCount).hasValue(1);
// had to retry 2x (1st retry fails and 2nd retry passes)
assertThat(retryCount).hasValue(2);
assertThat(thrown).containsExactly(failCause);
// should be able to process messages
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, topic);
Expand Down
3 changes: 0 additions & 3 deletions spring-pulsar/spring-pulsar.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ dependencies {
api 'org.springframework:spring-context'
api 'org.springframework:spring-messaging'
api 'org.springframework:spring-tx'
api (libs.spring.retry) {
exclude group: 'org.springframework'
}
api project(':spring-pulsar-cache-provider')
implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow')
implementation 'com.fasterxml.jackson.core:jackson-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.springframework.beans.BeanUtils;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.retry.RetryException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.config.StartupFailurePolicy;
Expand Down Expand Up @@ -146,7 +147,6 @@ protected void doStart() {
throw new IllegalStateException(msg, e);
}
}

if (this.listenerConsumer != null) {
this.logger.debug(() -> "Successfully created completable - submitting to executor");
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
Expand All @@ -157,8 +157,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R
this.listenerConsumerFuture = consumerExecutor.submitCompletable(() -> {
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
this.listenerConsumer = retryTemplate
.<Listener, PulsarException>execute((__) -> new Listener(messageListener, containerProperties));
try {
AtomicBoolean initialAttempt = new AtomicBoolean(true);
this.listenerConsumer = retryTemplate.execute(() -> {
if (initialAttempt.getAndSet(false)) {
throw new RuntimeException("Ignore initial attempt in retry template");
}
return new Listener(messageListener, containerProperties);
});
}
catch (RetryException e) {
throw new RuntimeException(e);
}
Assert.notNull(this.listenerConsumer, "listenerConsumer must not be null");
this.listenerConsumer.run();
}).whenComplete((__, ex) -> {
if (ex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultSchemaResolver;
Expand All @@ -36,10 +38,10 @@
import org.springframework.pulsar.core.TransactionProperties;
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;

import io.micrometer.observation.ObservationRegistry;

Expand Down Expand Up @@ -105,10 +107,8 @@ public class PulsarContainerProperties {

private @Nullable RetryTemplate startupFailureRetryTemplate;

private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(Duration.ofSeconds(10))
.build();
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());

private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.api.Message;
Expand All @@ -34,8 +35,8 @@
import org.jspecify.annotations.Nullable;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.retry.RetryException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
Expand Down Expand Up @@ -109,8 +110,19 @@ else if (containerProperties.getStartupFailurePolicy() == StartupFailurePolicy.R
readerExecutor.submitCompletable(() -> {
var retryTemplate = Optional.ofNullable(containerProperties.getStartupFailureRetryTemplate())
.orElseGet(containerProperties::getDefaultStartupFailureRetryTemplate);
this.internalAsyncReader.set(retryTemplate.<InternalAsyncReader, PulsarException>execute(
(__) -> new InternalAsyncReader(readerListener, containerProperties)));
try {
AtomicBoolean initialAttempt = new AtomicBoolean(true);
this.internalAsyncReader.set(retryTemplate.execute(() -> {
if (initialAttempt.getAndSet(false)) {
throw new RuntimeException("Ignore initial attempt in retry template");
}
return new InternalAsyncReader(readerListener, containerProperties);
}));
}
catch (RetryException e) {
throw new RuntimeException(e);
}
Assert.notNull(this.internalAsyncReader.get(), "internalAsynReader must not be null");
this.internalAsyncReader.get().run();
}).whenComplete((__, ex) -> {
if (ex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.jspecify.annotations.NullUnmarked;
import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;

/**
* Container properties for Pulsar {@link Reader}.
Expand Down Expand Up @@ -62,10 +64,8 @@ public class PulsarReaderContainerProperties {

private @Nullable RetryTemplate startupFailureRetryTemplate;

private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(Duration.ofSeconds(10))
.build();
private final RetryTemplate defaultStartupFailureRetryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(10).toMillis(), 3)).build());

private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.backoff.FixedBackOff;

/**
* Tests the startup failures policy on the
Expand Down Expand Up @@ -153,7 +155,8 @@ void whenPolicyIsRetryAndRetryIsSuccessfulThenContainerStarts() throws Exception
containerProps.setStartupFailurePolicy(StartupFailurePolicy.RETRY);
containerProps.setSchema(Schema.STRING);
containerProps.setConcurrency(3);
var retryTemplate = RetryTemplate.builder().maxAttempts(2).fixedBackoff(Duration.ofSeconds(2)).build();
var retryTemplate = new RetryTemplate(
RetryPolicy.builder().backOff(new FixedBackOff(Duration.ofSeconds(2).toMillis(), 2)).build());
containerProps.setStartupFailureRetryTemplate(retryTemplate);
var latch = new CountDownLatch(1);
containerProps.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> latch.countDown());
Expand Down
Loading