Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,11 @@ public static class Listener {
*/
private boolean observationEnabled;

/**
* Startup settings.
*/
private final Startup startup = new Startup();

public SchemaType getSchemaType() {
return this.schemaType;
}
Expand All @@ -846,6 +851,10 @@ public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

public Startup getStartup() {
return this.startup;
}

}

public static class Reader {
Expand Down Expand Up @@ -876,6 +885,11 @@ public static class Reader {
*/
private boolean readCompacted;

/**
* Startup settings.
*/
private final Startup startup = new Startup();

public String getName() {
return this.name;
}
Expand Down Expand Up @@ -916,6 +930,53 @@ public void setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
}

public Startup getStartup() {
return this.startup;
}

}

public static class Startup {

/**
* The max time to wait for the container to start.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description of configuration properties do not start with the, a, etc. I suggest Time to wait for the container to start.

*/
private Duration timeout;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a default for this property? If so I think we should use the opportunity to set the default so that it's documented, and very in a test that it's consistent in case it changes.


/**
* The action to take when the container fails to start.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. I suggest Action to take when the container fails to start.

*/
private FailurePolicy onFailure;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the previous behavior was to continue and log an exception. There's no default value here so it's hard to see if that's still the case. If that's the case, this field should have a default value that matches the default behavior.


public Duration getTimeout() {
return this.timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

public FailurePolicy getOnFailure() {
return this.onFailure;
}

public void setOnFailure(FailurePolicy onFailure) {
this.onFailure = onFailure;
}

}

public enum FailurePolicy {

/** Stop the container and throw exception. */
STOP,

/** Stop the container but do not throw exception. */
CONTINUE,

/** Retry startup asynchronously. */
RETRY

}

public static class Template {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.json.JsonWriter;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
Expand Down Expand Up @@ -198,6 +200,17 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
customizeListenerStartupProperties(containerProperties);
}

private void customizeListenerStartupProperties(PulsarContainerProperties containerProperties) {
PulsarProperties.Startup properties = this.properties.getListener().getStartup();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getTimeout).to(containerProperties::setConsumerStartTimeout);
map.from(properties::getOnFailure)
.as(FailurePolicy::name)
.as(StartupFailurePolicy::valueOf)
.to(containerProperties::setStartupFailurePolicy);
}

<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
Expand All @@ -214,6 +227,17 @@ void customizeReaderContainerProperties(PulsarReaderContainerProperties readerCo
PulsarProperties.Reader properties = this.properties.getReader();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getTopics).to(readerContainerProperties::setTopics);
customizeReaderStartupProperties(readerContainerProperties);
}

private void customizeReaderStartupProperties(PulsarReaderContainerProperties readerContainerProperties) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that this is the exact same code as customizeListenerStartupProperties? Since the feature is shared, isn't there a way to avoid the code duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code to de-duplicate was more complicated than having this in multiple places.

PulsarProperties.Startup properties = this.properties.getReader().getStartup();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getTimeout).to(readerContainerProperties::setReaderStartTimeout);
map.from(properties::getOnFailure)
.as(FailurePolicy::name)
.as(StartupFailurePolicy::valueOf)
.to(readerContainerProperties::setStartupFailurePolicy);
}

private Consumer<Duration> timeoutProperty(BiConsumer<Integer, TimeUnit> setter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;

/**
Expand Down Expand Up @@ -96,6 +98,16 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
customizeListenerStartupProperties(containerProperties);
}

private void customizeListenerStartupProperties(ReactivePulsarContainerProperties<?> containerProperties) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well.

PulsarProperties.Startup properties = this.properties.getListener().getStartup();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getOnFailure)
.as(FailurePolicy::name)
.as(StartupFailurePolicy::valueOf)
.to(containerProperties::setStartupFailurePolicy);
}

void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder<?> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -265,6 +268,8 @@ void customizeContainerProperties() {
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setConcurrency(10);
properties.getListener().setObservationEnabled(true);
properties.getListener().getStartup().setOnFailure(FailurePolicy.RETRY);
properties.getListener().getStartup().setTimeout(Duration.ofSeconds(25));
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
Expand All @@ -274,6 +279,8 @@ void customizeContainerProperties() {
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.RETRY);
assertThat(containerProperties.getConsumerStartTimeout()).isEqualTo(Duration.ofSeconds(25));
}

@Test
Expand All @@ -295,4 +302,19 @@ void customizeReaderBuilder() {
then(builder).should().readCompacted(true);
}

@Test
@SuppressWarnings("unchecked")
void customizeReaderContainerProperties() {
PulsarProperties properties = new PulsarProperties();
List<String> topics = List.of("mytopic");
properties.getReader().setTopics(topics);
properties.getReader().getStartup().setOnFailure(FailurePolicy.CONTINUE);
properties.getReader().getStartup().setTimeout(Duration.ofSeconds(25));
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
new PulsarPropertiesMapper(properties).customizeReaderContainerProperties(readerContainerProperties);
assertThat(readerContainerProperties.getTopics()).isEqualTo(topics);
assertThat(readerContainerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE);
assertThat(readerContainerProperties.getReaderStartTimeout()).isEqualTo(Duration.ofSeconds(25));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Defaults.TypeMapping;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
import org.springframework.boot.context.properties.bind.BindException;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
Expand Down Expand Up @@ -395,10 +396,14 @@ void bind() {
map.put("spring.pulsar.listener.schema-type", "avro");
map.put("spring.pulsar.listener.concurrency", "10");
map.put("spring.pulsar.listener.observation-enabled", "true");
map.put("spring.pulsar.listener.startup.on-failure", "retry");
map.put("spring.pulsar.listener.startup.timeout", "2m");
PulsarProperties.Listener properties = bindProperties(map).getListener();
assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(properties.getConcurrency()).isEqualTo(10);
assertThat(properties.isObservationEnabled()).isTrue();
assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.RETRY);
assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofMinutes(2));
}

}
Expand All @@ -414,12 +419,16 @@ void bind() {
map.put("spring.pulsar.reader.subscription-name", "my-subscription");
map.put("spring.pulsar.reader.subscription-role-prefix", "sub-role");
map.put("spring.pulsar.reader.read-compacted", "true");
map.put("spring.pulsar.reader.startup.on-failure", "continue");
map.put("spring.pulsar.reader.startup.timeout", "23s");
PulsarProperties.Reader properties = bindProperties(map).getReader();
assertThat(properties.getName()).isEqualTo("my-reader");
assertThat(properties.getTopics()).containsExactly("my-topic");
assertThat(properties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(properties.getSubscriptionRolePrefix()).isEqualTo("sub-role");
assertThat(properties.isReadCompacted()).isTrue();
assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.CONTINUE);
assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofSeconds(23));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer.Subscription;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy;
import org.springframework.pulsar.config.StartupFailurePolicy;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -123,12 +125,14 @@ void customizeContainerProperties() {
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setConcurrency(10);
properties.getListener().getStartup().setOnFailure(FailurePolicy.CONTINUE);
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE);
}

@Test
Expand Down