diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 55fb759a86f6..261ea671bb58 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -28,12 +28,14 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.env.Environment; @@ -57,6 +59,8 @@ import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; +import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; +import org.springframework.pulsar.transaction.PulsarTransactionManager; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar. @@ -126,8 +130,11 @@ private void applyProducerBuilderCustomizers(List> PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, ObjectProvider producerInterceptors, SchemaResolver schemaResolver, TopicResolver topicResolver) { - return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(), - schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled()); + PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory, + producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver, + this.properties.getTemplate().isObservationsEnabled()); + this.propertiesMapper.customizeTemplate(template); + return template; } @Bean @@ -142,6 +149,13 @@ DefaultPulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); } + @Bean + @ConditionalOnMissingBean + @Conditional(TransactionsEnabledCondition.class) + public PulsarAwareTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) { + return new PulsarTransactionManager(pulsarClient); + } + @SuppressWarnings("unchecked") private void applyConsumerBuilderCustomizers(List> customizers, ConsumerBuilder builder) { @@ -153,13 +167,15 @@ private void applyConsumerBuilderCustomizers(List> @ConditionalOnMissingBean(name = "pulsarListenerContainerFactory") ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver, Environment environment) { + TopicResolver topicResolver, ObjectProvider pulsarTransactionManager, + Environment environment) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); if (Threading.VIRTUAL.isActive(environment)) { containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-")); } + pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); } @@ -203,4 +219,26 @@ static class EnablePulsarConfiguration { } + /** + * Custom condition that is true when transactions have been enabled in the template + * and/or in the listener. + */ + static final class TransactionsEnabledCondition extends AnyNestedCondition { + + TransactionsEnabledCondition() { + super(ConfigurationPhase.REGISTER_BEAN); + } + + @ConditionalOnProperty(prefix = "spring.pulsar.template.transaction", name = "enabled", havingValue = "true") + static class TemplateTransactionEnabledCondition { + + } + + @ConditionalOnProperty(prefix = "spring.pulsar.listener.transaction", name = "enabled", havingValue = "true") + static class ListenerTransactionEnabledCondition { + + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 02c4f630bb3c..f0974e92353a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -36,6 +36,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.NestedConfigurationProperty; +import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.util.Assert; /** @@ -103,6 +104,14 @@ public Template getTemplate() { return this.template; } + /** + * Whether transactions are enabled for either the template or the listener. + * @return whether transactions are enabled for either the template or the listener + */ + boolean isTransactionEnabled() { + return this.template.getTransaction().isEnabled() || this.listener.getTransaction().isEnabled(); + } + public static class Client { /** @@ -763,6 +772,11 @@ public static class Listener { */ private boolean observationEnabled; + /** + * Transaction settings. + */ + private final Transaction transaction = new ListenerTransaction(); + public SchemaType getSchemaType() { return this.schemaType; } @@ -779,6 +793,10 @@ public void setObservationEnabled(boolean observationEnabled) { this.observationEnabled = observationEnabled; } + public Transaction getTransaction() { + return this.transaction; + } + } public static class Reader { @@ -858,6 +876,11 @@ public static class Template { */ private boolean observationsEnabled; + /** + * Transaction settings. + */ + private final Transaction transaction = new TemplateTransaction(); + public boolean isObservationsEnabled() { return this.observationsEnabled; } @@ -866,6 +889,89 @@ public void setObservationsEnabled(boolean observationsEnabled) { this.observationsEnabled = observationsEnabled; } + public Transaction getTransaction() { + return this.transaction; + } + + } + + public abstract static class Transaction { + + /** + * Whether transactions are enabled for the component. + */ + private boolean enabled; + + /** + * Whether the component requires transactions. + */ + private boolean required; + + /** + * Duration representing the transaction timeout - null to use default timeout of + * the underlying transaction system, or none if timeouts are not supported. + */ + private Duration timeout; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isRequired() { + return this.required; + } + + public void setRequired(boolean required) { + this.required = required; + } + + public Duration getTimeout() { + return this.timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + void validate() { + if (this.required && !this.enabled) { + String requiredProp = "%s.required".formatted(this.propertyPath()); + String enabledProp = "%s.enabled".formatted(this.propertyPath()); + throw new InvalidConfigurationPropertyValueException(requiredProp, this.required, + "Transactions must be enabled in order to be required. " + + "Either set %s to 'true' or make transactions optional by setting %s to 'false'" + .formatted(enabledProp, requiredProp)); + } + } + + /** + * Gets the property path that the transaction properties are mapped to. + * @return the property path that the transaction properties are mapped to + */ + protected abstract String propertyPath(); + + } + + static class TemplateTransaction extends Transaction { + + @Override + protected String propertyPath() { + return "spring.pulsar.template.transaction"; + } + + } + + static class ListenerTransaction extends Transaction { + + @Override + protected String propertyPath() { + return "spring.pulsar.listener.transaction"; + } + } public static class Authentication { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index 352bf7480bc6..90590c670e81 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; import org.springframework.util.StringUtils; @@ -64,6 +65,9 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); + if (this.properties.isTransactionEnabled()) { + clientBuilder.enableTransaction(true); + } customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication); customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties, connectionDetails); @@ -157,6 +161,15 @@ void customizeProducerBuilder(ProducerBuilder producerBuilder) { map.from(properties::getAccessMode).to(producerBuilder::accessMode); } + void customizeTemplate(PulsarTemplate template) { + PulsarProperties.Transaction properties = this.properties.getTemplate().getTransaction(); + properties.validate(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::isEnabled).to(template.transactions()::setEnabled); + map.from(properties::isRequired).to(template.transactions()::setRequired); + map.from(properties::getTimeout).to(template.transactions()::setTimeout); + } + void customizeConsumerBuilder(ConsumerBuilder consumerBuilder) { PulsarProperties.Consumer properties = this.properties.getConsumer(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); @@ -183,6 +196,7 @@ private void customizeConsumerBuilderSubscription(ConsumerBuilder consumerBui void customizeContainerProperties(PulsarContainerProperties containerProperties) { customizePulsarContainerConsumerSubscriptionProperties(containerProperties); customizePulsarContainerListenerProperties(containerProperties); + customizePulsarContainerTransactionProperties(containerProperties); } private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) { @@ -198,6 +212,15 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); } + private void customizePulsarContainerTransactionProperties(PulsarContainerProperties containerProperties) { + PulsarProperties.Transaction properties = this.properties.getListener().getTransaction(); + properties.validate(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::isEnabled).to(containerProperties.transactions()::setEnabled); + map.from(properties::isRequired).to(containerProperties.transactions()::setRequired); + map.from(properties::getTimeout).to(containerProperties.transactions()::setTimeout); + } + void customizeReaderBuilder(ReaderBuilder readerBuilder) { PulsarProperties.Reader properties = this.properties.getReader(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 995f50c720ed..9c54941c6ebb 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -70,6 +70,7 @@ import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -525,6 +526,24 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUse }); } + @Test + void whenTransactionManagerIsAvailableListenerContainerShouldUseTransactionManager() { + this.contextRunner.withPropertyValues("spring.pulsar.listener.transaction.enabled=true").run((context) -> { + ConcurrentPulsarListenerContainerFactory factory = context + .getBean(ConcurrentPulsarListenerContainerFactory.class); + assertThat(factory.getContainerProperties().transactions().getTransactionManager()).isNotNull(); + }); + } + + @Test + void whenTransactionManagerIsNotAvailableListenerContainerShouldNotUseTransactionManager() { + this.contextRunner.withPropertyValues("spring.pulsar.listener.transaction.enabled=false").run((context) -> { + ConcurrentPulsarListenerContainerFactory factory = context + .getBean(ConcurrentPulsarListenerContainerFactory.class); + assertThat(factory.getContainerProperties().transactions().getTransactionManager()).isNull(); + }); + } + } @Nested @@ -603,4 +622,77 @@ ReaderBuilderCustomizer customizerBar() { } + @Nested + class TransactionManagerTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + @SuppressWarnings("unchecked") + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class); + this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr) + .run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr)); + } + + @Test + void whenNoPropertiesSetDoesNotAutoconfigureBean() { + this.contextRunner + .run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class)); + } + + @Test + void whenListenerAndTemplateDisablesTransactionsDoesNotAutoconfigureBean() { + this.contextRunner + .withPropertyValues("spring.pulsar.listener.transaction.enabled=false", + "spring.pulsar.template.transaction.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class)); + } + + @Test + void whenListenerEnablesTransactionsAutoconfiguresBean() { + this.contextRunner + .withPropertyValues("spring.pulsar.listener.transaction.enabled=true", + "spring.pulsar.template.transaction.enabled=false") + .run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class)); + } + + @Test + void whenTemplateEnablesTransactionsAutoconfiguresBean() { + this.contextRunner + .withPropertyValues("spring.pulsar.listener.transaction.enabled=false", + "spring.pulsar.template.transaction.enabled=true") + .run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class)); + } + + @Test + void whenTemplateRequiresTransactionsThenTransactionsMustBeEnabled() { + this.contextRunner + .withPropertyValues("spring.pulsar.template.transaction.required=true", + "spring.pulsar.template.transaction.enabled=false") + .run((context) -> assertThat(context).hasFailed() + .getFailure() + .hasMessageEndingWith( + "Property spring.pulsar.template.transaction.required with value 'true' is invalid: " + + "Transactions must be enabled in order to be required. Either set " + + "spring.pulsar.template.transaction.enabled to 'true' or make transactions " + + "optional by setting spring.pulsar.template.transaction.required to 'false'")); + } + + @Test + void whenListenerRequiresTransactionsThenTransactionsMustBeEnabled() { + this.contextRunner + .withPropertyValues("spring.pulsar.listener.transaction.required=true", + "spring.pulsar.listener.transaction.enabled=false") + .run((context) -> assertThat(context).hasFailed() + .getFailure() + .hasMessageEndingWith( + "Property spring.pulsar.listener.transaction.required with value 'true' is invalid: " + + "Transactions must be enabled in order to be required. Either set " + + "spring.pulsar.listener.transaction.enabled to 'true' or make transactions " + + "optional by setting spring.pulsar.listener.transaction.required to 'false'")); + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index 98248a84075c..a707d8081e62 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -42,12 +42,16 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; +import org.springframework.pulsar.core.PulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; /** * Tests for {@link PulsarPropertiesMapper}. @@ -87,6 +91,28 @@ void customizeClientBuilderWhenHasAuthentication() throws UnsupportedAuthenticat then(builder).should().authentication("myclass", authParamString); } + @Test + void customizeClientBuilderWhenTransactionsEnabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(true); + properties.getTemplate().getTransaction().setEnabled(true); + ClientBuilder builder = mock(ClientBuilder.class); + new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, + new PropertiesPulsarConnectionDetails(properties)); + then(builder).should().enableTransaction(true); + } + + @Test + void customizeClientBuilderWhenTransactionsDisabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(false); + properties.getTemplate().getTransaction().setEnabled(false); + ClientBuilder builder = mock(ClientBuilder.class); + new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, + new PropertiesPulsarConnectionDetails(properties)); + then(builder).should(never()).enableTransaction(anyBoolean()); + } + @Test void customizeClientBuilderWhenHasConnectionDetails() { PulsarProperties properties = new PulsarProperties(); @@ -189,6 +215,20 @@ void customizeProducerBuilder() { then(builder).should().accessMode(ProducerAccessMode.Exclusive); } + @Test + @SuppressWarnings("unchecked") + void customizeTemplate() { + PulsarProperties properties = new PulsarProperties(); + properties.getTemplate().getTransaction().setEnabled(true); + properties.getTemplate().getTransaction().setRequired(true); + properties.getTemplate().getTransaction().setTimeout(Duration.ofSeconds(30)); + PulsarTemplate template = new PulsarTemplate<>(mock(PulsarProducerFactory.class)); + new PulsarPropertiesMapper(properties).customizeTemplate(template); + assertThat(template.transactions().isEnabled()).isTrue(); + assertThat(template.transactions().isRequired()).isTrue(); + assertThat(template.transactions().getTimeout()).isEqualTo(Duration.ofSeconds(30)); + } + @Test @SuppressWarnings("unchecked") void customizeConsumerBuilder() { @@ -220,11 +260,17 @@ void customizeContainerProperties() { properties.getConsumer().getSubscription().setType(SubscriptionType.Shared); properties.getListener().setSchemaType(SchemaType.AVRO); properties.getListener().setObservationEnabled(true); + properties.getListener().getTransaction().setEnabled(true); + properties.getListener().getTransaction().setRequired(true); + properties.getListener().getTransaction().setTimeout(Duration.ofSeconds(30)); PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern"); new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties); assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); assertThat(containerProperties.isObservationEnabled()).isTrue(); + assertThat(containerProperties.transactions().isEnabled()).isTrue(); + assertThat(containerProperties.transactions().isRequired()).isTrue(); + assertThat(containerProperties.transactions().getTimeout()).isEqualTo(Duration.ofSeconds(30)); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index c4aa787d5528..456325d2f15e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -355,9 +355,15 @@ void bind() { Map map = new HashMap<>(); map.put("spring.pulsar.listener.schema-type", "avro"); map.put("spring.pulsar.listener.observation-enabled", "true"); + map.put("spring.pulsar.listener.transaction.enabled", "true"); + map.put("spring.pulsar.listener.transaction.required", "true"); + map.put("spring.pulsar.listener.transaction.timeout", "60s"); PulsarProperties.Listener properties = bindPropeties(map).getListener(); assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO); assertThat(properties.isObservationEnabled()).isTrue(); + assertThat(properties.getTransaction().isEnabled()).isTrue(); + assertThat(properties.getTransaction().isRequired()).isTrue(); + assertThat(properties.getTransaction().getTimeout()).isEqualTo(Duration.ofSeconds(60)); } } @@ -390,8 +396,54 @@ class TemplateProperties { void bind() { Map map = new HashMap<>(); map.put("spring.pulsar.template.observations-enabled", "true"); + map.put("spring.pulsar.template.transaction.enabled", "true"); + map.put("spring.pulsar.template.transaction.required", "true"); + map.put("spring.pulsar.template.transaction.timeout", "60s"); PulsarProperties.Template properties = bindPropeties(map).getTemplate(); assertThat(properties.isObservationsEnabled()).isTrue(); + assertThat(properties.getTransaction().isEnabled()).isTrue(); + assertThat(properties.getTransaction().isRequired()).isTrue(); + assertThat(properties.getTransaction().getTimeout()).isEqualTo(Duration.ofSeconds(60)); + } + + } + + @Nested + class TransactionProperties { + + @Test + void transactionsEnabledWhenListenerAndTemplateBothEnabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(true); + properties.getTemplate().getTransaction().setEnabled(true); + assertThat(properties.isTransactionEnabled()).isTrue(); + + } + + @Test + void transactionsEnabledWhenListenerEnabledAndTemplateDisabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(true); + properties.getTemplate().getTransaction().setEnabled(false); + assertThat(properties.isTransactionEnabled()).isTrue(); + + } + + @Test + void transactionsEnabledWhenListenerDisabledAndTemplateEnabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(false); + properties.getTemplate().getTransaction().setEnabled(true); + assertThat(properties.isTransactionEnabled()).isTrue(); + + } + + void transactionsDisabledWhenListenerAndTemplateBothDisabled() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().getTransaction().setEnabled(false); + properties.getTemplate().getTransaction().setEnabled(false); + assertThat(properties.isTransactionEnabled()).isFalse(); + } } diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle index 7fe68aa2c2eb..e8def1789e0f 100644 --- a/spring-boot-project/spring-boot-dependencies/build.gradle +++ b/spring-boot-project/spring-boot-dependencies/build.gradle @@ -1950,7 +1950,7 @@ bom { releaseNotes("https://github.com/spring-projects/spring-ldap/releases/tag/{version}") } } - library("Spring Pulsar", "1.1.0-M2") { + library("Spring Pulsar", "1.1.0-SNAPSHOT") { considerSnapshots() group("org.springframework.pulsar") { imports = [