Skip to content

Commit 08ad7aa

Browse files
onobcwilkinsonaphilwebb
committed
Add Spring Pulsar transaction support
Adds auto-config for Spring for Apache Pulsar transactions. Introduces a new `spring.pulsar.transaction.enabled` property which can be used to enable transactions. This feature is opt-in and remains disabled by default. See gh-40189 Co-authored-by: Andy Wilkinson <[email protected]> Co-authored-by: Phillip Webb <[email protected]>
1 parent 07f8274 commit 08ad7aa

File tree

7 files changed

+182
-5
lines changed

7 files changed

+182
-5
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.springframework.pulsar.core.TopicResolver;
5858
import org.springframework.pulsar.listener.PulsarContainerProperties;
5959
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
60+
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
61+
import org.springframework.pulsar.transaction.PulsarTransactionManager;
6062

6163
/**
6264
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
@@ -126,8 +128,11 @@ private void applyProducerBuilderCustomizers(List<ProducerBuilderCustomizer<?>>
126128
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
127129
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
128130
TopicResolver topicResolver) {
129-
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
130-
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
131+
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
132+
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
133+
this.properties.getTemplate().isObservationsEnabled());
134+
this.propertiesMapper.customizeTemplate(template);
135+
return template;
131136
}
132137

133138
@Bean
@@ -142,6 +147,13 @@ DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
142147
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
143148
}
144149

150+
@Bean
151+
@ConditionalOnMissingBean(PulsarAwareTransactionManager.class)
152+
@ConditionalOnProperty(prefix = "spring.pulsar.transaction", name = "enabled")
153+
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
154+
return new PulsarTransactionManager(pulsarClient);
155+
}
156+
145157
@SuppressWarnings("unchecked")
146158
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
147159
ConsumerBuilder<?> builder) {
@@ -153,13 +165,15 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
153165
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
154166
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
155167
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
156-
TopicResolver topicResolver, Environment environment) {
168+
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
169+
Environment environment) {
157170
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
158171
containerProperties.setSchemaResolver(schemaResolver);
159172
containerProperties.setTopicResolver(topicResolver);
160173
if (Threading.VIRTUAL.isActive(environment)) {
161174
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
162175
}
176+
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
163177
this.propertiesMapper.customizeContainerProperties(containerProperties);
164178
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
165179
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class PulsarProperties {
6767

6868
private final Template template = new Template();
6969

70+
private final Transaction transaction = new Transaction();
71+
7072
public Client getClient() {
7173
return this.client;
7274
}
@@ -103,6 +105,10 @@ public Template getTemplate() {
103105
return this.template;
104106
}
105107

108+
public Transaction getTransaction() {
109+
return this.transaction;
110+
}
111+
106112
public static class Client {
107113

108114
/**
@@ -868,6 +874,23 @@ public void setObservationsEnabled(boolean observationsEnabled) {
868874

869875
}
870876

877+
public static class Transaction {
878+
879+
/**
880+
* Whether transaction support is enabled.
881+
*/
882+
private boolean enabled;
883+
884+
public boolean isEnabled() {
885+
return this.enabled;
886+
}
887+
888+
public void setEnabled(boolean enabled) {
889+
this.enabled = enabled;
890+
}
891+
892+
}
893+
871894
public static class Authentication {
872895

873896
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;
4040

4141
import org.springframework.boot.context.properties.PropertyMapper;
42+
import org.springframework.pulsar.core.PulsarTemplate;
4243
import org.springframework.pulsar.listener.PulsarContainerProperties;
4344
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
4445
import org.springframework.util.StringUtils;
@@ -64,6 +65,7 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
6465
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
6566
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
6667
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
68+
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
6769
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
6870
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
6971
connectionDetails);
@@ -157,6 +159,10 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {
157159
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
158160
}
159161

162+
<T> void customizeTemplate(PulsarTemplate<T> template) {
163+
template.transactions().setEnabled(this.properties.getTransaction().isEnabled());
164+
}
165+
160166
<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
161167
PulsarProperties.Consumer properties = this.properties.getConsumer();
162168
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
@@ -183,6 +189,7 @@ private void customizeConsumerBuilderSubscription(ConsumerBuilder<?> consumerBui
183189
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
184190
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
185191
customizePulsarContainerListenerProperties(containerProperties);
192+
containerProperties.transactions().setEnabled(this.properties.getTransaction().isEnabled());
186193
}
187194

188195
private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
7171
import org.springframework.pulsar.core.SchemaResolver;
7272
import org.springframework.pulsar.core.TopicResolver;
73+
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
74+
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
7375
import org.springframework.test.util.ReflectionTestUtils;
7476

7577
import static org.assertj.core.api.Assertions.assertThat;
@@ -330,6 +332,13 @@ void whenObservationsDisabledDoesNotEnableObservation() {
330332
.hasFieldOrPropertyWithValue("observationEnabled", false));
331333
}
332334

335+
@Test
336+
void whenTransactionEnabledTrueEnablesTransactions() {
337+
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
338+
.run((context) -> assertThat(context.getBean(PulsarTemplate.class).transactions().isEnabled())
339+
.isTrue());
340+
}
341+
333342
@Configuration(proxyBeanMethods = false)
334343
static class InterceptorTestConfiguration {
335344

@@ -525,6 +534,28 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUse
525534
});
526535
}
527536

537+
@Test
538+
void whenTransactionEnabledTrueListenerContainerShouldUseTransactions() {
539+
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true").run((context) -> {
540+
ConcurrentPulsarListenerContainerFactory<?> factory = context
541+
.getBean(ConcurrentPulsarListenerContainerFactory.class);
542+
TransactionSettings transactions = factory.getContainerProperties().transactions();
543+
assertThat(transactions.isEnabled()).isTrue();
544+
assertThat(transactions.getTransactionManager()).isNotNull();
545+
});
546+
}
547+
548+
@Test
549+
void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
550+
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false").run((context) -> {
551+
ConcurrentPulsarListenerContainerFactory<?> factory = context
552+
.getBean(ConcurrentPulsarListenerContainerFactory.class);
553+
TransactionSettings transactions = factory.getContainerProperties().transactions();
554+
assertThat(transactions.isEnabled()).isFalse();
555+
assertThat(transactions.getTransactionManager()).isNull();
556+
});
557+
}
558+
528559
}
529560

530561
@Nested
@@ -603,4 +634,37 @@ ReaderBuilderCustomizer<?> customizerBar() {
603634

604635
}
605636

637+
@Nested
638+
class TransactionManagerTests {
639+
640+
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
641+
642+
@Test
643+
@SuppressWarnings("unchecked")
644+
void whenUserHasDefinedATransactionManagerTheAutoConfigurationBacksOff() {
645+
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
646+
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
647+
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
648+
}
649+
650+
@Test
651+
void whenNoPropertiesAreSetTransactionManagerShouldNotBeDefined() {
652+
this.contextRunner
653+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
654+
}
655+
656+
@Test
657+
void whenTransactionEnabledFalseTransactionManagerIsNotAutoConfigured() {
658+
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false")
659+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
660+
}
661+
662+
@Test
663+
void whenTransactionEnabledTrueTransactionManagerIsAutoConfigured() {
664+
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
665+
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
666+
}
667+
668+
}
669+
606670
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,20 @@
3838
import org.apache.pulsar.client.impl.AutoClusterFailover;
3939
import org.apache.pulsar.common.schema.SchemaType;
4040
import org.junit.jupiter.api.Test;
41-
import org.mockito.Mockito;
4241

4342
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4443
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44+
import org.springframework.pulsar.core.PulsarProducerFactory;
45+
import org.springframework.pulsar.core.PulsarTemplate;
4546
import org.springframework.pulsar.listener.PulsarContainerProperties;
4647

4748
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.mockito.ArgumentMatchers.any;
50+
import static org.mockito.ArgumentMatchers.anyBoolean;
4851
import static org.mockito.BDDMockito.given;
4952
import static org.mockito.BDDMockito.then;
5053
import static org.mockito.Mockito.mock;
54+
import static org.mockito.Mockito.never;
5155

5256
/**
5357
* Tests for {@link PulsarPropertiesMapper}.
@@ -87,6 +91,26 @@ void customizeClientBuilderWhenHasAuthentication() throws UnsupportedAuthenticat
8791
then(builder).should().authentication("myclass", authParamString);
8892
}
8993

94+
@Test
95+
void customizeClientBuilderWhenTransactionEnabled() {
96+
PulsarProperties properties = new PulsarProperties();
97+
properties.getTransaction().setEnabled(true);
98+
ClientBuilder builder = mock(ClientBuilder.class);
99+
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
100+
new PropertiesPulsarConnectionDetails(properties));
101+
then(builder).should().enableTransaction(true);
102+
}
103+
104+
@Test
105+
void customizeClientBuilderWhenTransactionDisabled() {
106+
PulsarProperties properties = new PulsarProperties();
107+
properties.getTransaction().setEnabled(false);
108+
ClientBuilder builder = mock(ClientBuilder.class);
109+
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
110+
new PropertiesPulsarConnectionDetails(properties));
111+
then(builder).should(never()).enableTransaction(anyBoolean());
112+
}
113+
90114
@Test
91115
void customizeClientBuilderWhenHasConnectionDetails() {
92116
PulsarProperties properties = new PulsarProperties();
@@ -120,7 +144,7 @@ void customizeClientBuilderWhenHasFailover() {
120144
ClientBuilder builder = mock(ClientBuilder.class);
121145
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
122146
new PropertiesPulsarConnectionDetails(properties));
123-
then(builder).should().serviceUrlProvider(Mockito.any(AutoClusterFailover.class));
147+
then(builder).should().serviceUrlProvider(any(AutoClusterFailover.class));
124148
}
125149

126150
@Test
@@ -189,6 +213,16 @@ void customizeProducerBuilder() {
189213
then(builder).should().accessMode(ProducerAccessMode.Exclusive);
190214
}
191215

216+
@Test
217+
@SuppressWarnings("unchecked")
218+
void customizeTemplate() {
219+
PulsarProperties properties = new PulsarProperties();
220+
properties.getTransaction().setEnabled(true);
221+
PulsarTemplate<Object> template = new PulsarTemplate<>(mock(PulsarProducerFactory.class));
222+
new PulsarPropertiesMapper(properties).customizeTemplate(template);
223+
assertThat(template.transactions().isEnabled()).isTrue();
224+
}
225+
192226
@Test
193227
@SuppressWarnings("unchecked")
194228
void customizeConsumerBuilder() {
@@ -220,11 +254,13 @@ void customizeContainerProperties() {
220254
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
221255
properties.getListener().setSchemaType(SchemaType.AVRO);
222256
properties.getListener().setObservationEnabled(true);
257+
properties.getTransaction().setEnabled(true);
223258
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
224259
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
225260
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
226261
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
227262
assertThat(containerProperties.isObservationEnabled()).isTrue();
263+
assertThat(containerProperties.transactions().isEnabled()).isTrue();
228264
}
229265

230266
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,4 +396,17 @@ void bind() {
396396

397397
}
398398

399+
@Nested
400+
class TransactionProperties {
401+
402+
@Test
403+
void bind() {
404+
Map<String, String> map = new HashMap<>();
405+
map.put("spring.pulsar.transaction.enabled", "true");
406+
PulsarProperties.Transaction properties = bindPropeties(map).getTransaction();
407+
assertThat(properties.isEnabled()).isTrue();
408+
}
409+
410+
}
411+
399412
}

spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,26 @@ TIP: For more details on any of the above components and to discover other avail
215215

216216

217217

218+
[[messaging.pulsar.transactions]]
219+
== Transaction Support
220+
221+
Spring for Apache Pulsar supports transactions when using `PulsarTemplate` and `@PulsarListener`.
222+
223+
NOTE: Transactions are not currently supported when using the reactive variants.
224+
225+
Setting the configprop:spring.pulsar.transaction.enabled[] property to `true` will:
226+
227+
* Configure a `PulsarTransactionManager` bean
228+
* Enable transaction support for `PulsarTemplate`
229+
* Enable transaction support for `@PulsarListener` methods
230+
231+
The `transactional` attribute of `@PulsarListener` can be used to fine-tune when transactions should be used with listeners.
232+
233+
For more control of the Spring for Apache Pulsar transaction features you should define your own `PulsarTemplate` and/or `ConcurrentPulsarListenerContainerFactory` beans.
234+
You can also define a `PulsarAwareTransactionManager` bean if the default auto-configured `PulsarTransactionManager` is not suitable.
235+
236+
237+
218238
[[messaging.pulsar.additional-properties]]
219239
== Additional Pulsar Properties
220240

0 commit comments

Comments
 (0)