Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -29,6 +31,8 @@
import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.util.CollectionUtils;
Expand All @@ -38,6 +42,7 @@
*
* @param <T> message type.
* @author Christophe Bornet
* @author Chris Bono
*/
public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
implements ReactivePulsarMessageListenerContainer<T> {
Expand Down Expand Up @@ -135,13 +140,56 @@ public void stop() {

private void doStart() {
setRunning(true);
this.pipeline = startPipeline(this.pulsarContainerProperties);
var containerProps = this.getContainerProperties();
try {
this.pipeline = startPipeline(this.pulsarContainerProperties);
}
catch (Exception e) {
this.logger.error(e, () -> "Error starting Reactive pipeline");
this.doStop();
if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.STOP) {
this.logger.info(() -> "Configured to stop on startup failures - exiting");
throw new IllegalStateException("Error starting Reactive pipeline", e);
}
}
// Pipeline started w/o errors - short circuit
if (this.pipeline != null && this.pipeline.isRunning()) {
return;
}

if (containerProps.getStartupFailurePolicy() == StartupFailurePolicy.RETRY) {
this.logger.info(() -> "Configured to retry on startup failures - retrying");
CompletableFuture.supplyAsync(() -> {
var retryTemplate = Optional.ofNullable(containerProps.getStartupFailureRetryTemplate())
.orElseGet(containerProps::getDefaultStartupFailureRetryTemplate);
ReactiveMessagePipeline pipeline = null;
try {
pipeline = retryTemplate
.<ReactiveMessagePipeline, PulsarException>execute((__) -> startPipeline(containerProps));
}
catch (PulsarException ex) {
this.logger.error(ex, () -> "Unable to start Reactive pipeline - retries exhausted");
this.doStop();
}
return pipeline;
}).whenComplete((p, e) -> {
this.pipeline = p;
setRunning(this.pipeline != null ? this.pipeline.isRunning() : false);
if (e != null && !(e instanceof PulsarException)) {
// PulsarException handled in supplyAsync handler above
this.logger.error(e, () -> "Unable to start Reactive pipeline");
this.doStop();
}
});
}
}

public void doStop() {
try {
this.logger.info("Closing Pulsar Reactive pipeline.");
this.pipeline.close();
if (this.pipeline != null) {
this.pipeline.close();
}
}
catch (Exception e) {
this.logger.error(e, () -> "Error closing Pulsar Reactive pipeline.");
Expand Down Expand Up @@ -174,6 +222,9 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
customizers.add(this.consumerCustomizer);
}

// NOTE: The following various pipeline builders always set 'pipelineRetrySpec'
// to null as the container controls the retry of the pipeline start. Otherwise
// they do not work well together.
ReactiveMessageConsumer<T> consumer = getReactivePulsarConsumerFactory()
.createConsumer(containerProperties.getSchema(), customizers);
ReactiveMessagePipelineBuilder<T> pipelineBuilder = ApiImplementationFactory
Expand All @@ -183,6 +234,7 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
if (messageHandler instanceof ReactivePulsarStreamingHandler<?>) {
pipeline = pipelineBuilder
.streamingMessageHandler(((ReactivePulsarStreamingHandler<T>) messageHandler)::received)
.pipelineRetrySpec(null)
.build();
}
else {
Expand All @@ -195,10 +247,10 @@ private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<
if (containerProperties.isUseKeyOrderedProcessing()) {
concurrentPipelineBuilder.useKeyOrderedProcessing();
}
pipeline = concurrentPipelineBuilder.build();
pipeline = concurrentPipelineBuilder.pipelineRetrySpec(null).build();
}
else {
pipeline = pipelineBuilder.build();
pipeline = pipelineBuilder.pipelineRetrySpec(null).build();
}
}
pipeline.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,16 +18,20 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.retry.support.RetryTemplate;

/**
* Contains runtime properties for a reactive listener container.
Expand Down Expand Up @@ -61,6 +65,16 @@ public class ReactivePulsarContainerProperties<T> {

private boolean useKeyOrderedProcessing = false;

@Nullable
private RetryTemplate startupFailureRetryTemplate;

private final RetryTemplate defaultStartupFailureRetryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(Duration.ofSeconds(10))
.build();

private StartupFailurePolicy startupFailurePolicy = StartupFailurePolicy.STOP;

public ReactivePulsarMessageHandler getMessageHandler() {
return this.messageHandler;
}
Expand Down Expand Up @@ -161,4 +175,46 @@ public void setUseKeyOrderedProcessing(boolean useKeyOrderedProcessing) {
this.useKeyOrderedProcessing = useKeyOrderedProcessing;
}

@Nullable
public RetryTemplate getStartupFailureRetryTemplate() {
return this.startupFailureRetryTemplate;
}

/**
* Get the default template to use to retry startup when no custom retry template has
* been specified.
* @return the default retry template that will retry 3 times with a fixed delay of 10
* seconds between each attempt.
* @since 1.2.0
*/
public RetryTemplate getDefaultStartupFailureRetryTemplate() {
return this.defaultStartupFailureRetryTemplate;
}

/**
* Set the template to use to retry startup when an exception occurs during startup.
* @param startupFailureRetryTemplate the retry template to use
* @since 1.2.0
*/
public void setStartupFailureRetryTemplate(RetryTemplate startupFailureRetryTemplate) {
this.startupFailureRetryTemplate = startupFailureRetryTemplate;
if (this.startupFailureRetryTemplate != null) {
setStartupFailurePolicy(StartupFailurePolicy.RETRY);
}
}

public StartupFailurePolicy getStartupFailurePolicy() {
return this.startupFailurePolicy;
}

/**
* The action to take on the container when a failure occurs during startup.
* @param startupFailurePolicy action to take when a failure occurs during startup
* @since 1.2.0
*/
public void setStartupFailurePolicy(StartupFailurePolicy startupFailurePolicy) {
this.startupFailurePolicy = Objects.requireNonNull(startupFailurePolicy,
"startupFailurePolicy must not be null");
}

}
Loading