Skip to content

Commit 0b9baf8

Browse files
authored
DefaultPulsarProducerFactory accepts multiple customizers (#434)
See #432
1 parent 52a18cb commit 0b9baf8

File tree

9 files changed

+82
-30
lines changed

9 files changed

+82
-30
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,17 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
6767
* configuration.
6868
* @param pulsarClient the client used to create the producers
6969
* @param defaultTopic the default topic to use for the producers
70-
* @param defaultConfigCustomizer the default configuration to apply to the producers
70+
* @param defaultConfigCustomizers the optional list of customizers to apply to the
71+
* created producers
7172
* @param topicResolver the topic resolver to use
7273
* @param cacheExpireAfterAccess time period to expire unused entries in the cache
7374
* @param cacheMaximumSize maximum size of cache (entries)
7475
* @param cacheInitialCapacity the initial size of cache
7576
*/
7677
public CachingPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic,
77-
ProducerBuilderCustomizer<T> defaultConfigCustomizer, TopicResolver topicResolver,
78+
List<ProducerBuilderCustomizer<T>> defaultConfigCustomizers, TopicResolver topicResolver,
7879
Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity) {
79-
super(pulsarClient, defaultTopic, defaultConfigCustomizer, topicResolver);
80+
super(pulsarClient, defaultTopic, defaultConfigCustomizers, topicResolver);
8081
var cacheFactory = CacheProviderFactory.<ProducerCacheKey<T>, Producer<T>>load();
8182
this.producerCache = cacheFactory.create(cacheExpireAfterAccess, cacheMaximumSize, cacheInitialCapacity,
8283
(key, producer, cause) -> {

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarProducerFactory.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class DefaultPulsarProducerFactory<T> implements PulsarProducerFactory<T>
5252
private final String defaultTopic;
5353

5454
@Nullable
55-
private final ProducerBuilderCustomizer<T> defaultConfigCustomizer;
55+
private final List<ProducerBuilderCustomizer<T>> defaultConfigCustomizers;
5656

5757
private final TopicResolver topicResolver;
5858

@@ -61,8 +61,7 @@ public class DefaultPulsarProducerFactory<T> implements PulsarProducerFactory<T>
6161
* @param pulsarClient the client used to create the producers
6262
*/
6363
public DefaultPulsarProducerFactory(PulsarClient pulsarClient) {
64-
this(pulsarClient, null, (pb) -> {
65-
}, new DefaultTopicResolver());
64+
this(pulsarClient, null, null, new DefaultTopicResolver());
6665
}
6766

6867
/**
@@ -71,33 +70,34 @@ public DefaultPulsarProducerFactory(PulsarClient pulsarClient) {
7170
* @param defaultTopic the default topic to use for the producers
7271
*/
7372
public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic) {
74-
this(pulsarClient, defaultTopic, (pb) -> {
75-
}, new DefaultTopicResolver());
73+
this(pulsarClient, defaultTopic, null, new DefaultTopicResolver());
7674
}
7775

7876
/**
7977
* Construct a producer factory that uses a default topic resolver.
8078
* @param pulsarClient the client used to create the producers
8179
* @param defaultTopic the default topic to use for the producers
82-
* @param defaultConfigCustomizer the default configuration to apply to the producers
80+
* @param defaultConfigCustomizers the optional list of customizers to apply to the
81+
* created producers
8382
*/
8483
public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic,
85-
@Nullable ProducerBuilderCustomizer<T> defaultConfigCustomizer) {
86-
this(pulsarClient, defaultTopic, defaultConfigCustomizer, new DefaultTopicResolver());
84+
@Nullable List<ProducerBuilderCustomizer<T>> defaultConfigCustomizers) {
85+
this(pulsarClient, defaultTopic, defaultConfigCustomizers, new DefaultTopicResolver());
8786
}
8887

8988
/**
9089
* Construct a producer factory that uses the specified parameters.
9190
* @param pulsarClient the client used to create the producers
9291
* @param defaultTopic the default topic to use for the producers
93-
* @param defaultConfigCustomizer the default configuration to apply to the producers
92+
* @param defaultConfigCustomizers the optional list of customizers to apply to the
93+
* created producers
9494
* @param topicResolver the topic resolver to use
9595
*/
9696
public DefaultPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic,
97-
@Nullable ProducerBuilderCustomizer<T> defaultConfigCustomizer, TopicResolver topicResolver) {
97+
@Nullable List<ProducerBuilderCustomizer<T>> defaultConfigCustomizers, TopicResolver topicResolver) {
9898
this.pulsarClient = pulsarClient;
9999
this.defaultTopic = defaultTopic;
100-
this.defaultConfigCustomizer = defaultConfigCustomizer;
100+
this.defaultConfigCustomizers = defaultConfigCustomizers;
101101
this.topicResolver = topicResolver;
102102
}
103103

@@ -142,8 +142,8 @@ protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
142142
var producerBuilder = this.pulsarClient.newProducer(schema);
143143

144144
// Apply the default config customizer (preserve the topic)
145-
if (this.defaultConfigCustomizer != null) {
146-
this.defaultConfigCustomizer.customize(producerBuilder);
145+
if (this.defaultConfigCustomizers != null) {
146+
this.defaultConfigCustomizers.forEach((customizer) -> customizer.customize(producerBuilder));
147147
}
148148
producerBuilder.topic(resolvedTopic);
149149

spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,9 @@ protected Producer<String> actualProducer(Producer<String> wrappedProducer) {
228228

229229
@Override
230230
protected CachingPulsarProducerFactory<String> producerFactory(PulsarClient pulsarClient,
231-
@Nullable String defaultTopic, @Nullable ProducerBuilderCustomizer<String> defaultConfigCustomizer) {
231+
@Nullable String defaultTopic, @Nullable List<ProducerBuilderCustomizer<String>> defaultConfigCustomizers) {
232232
var producerFactory = new CachingPulsarProducerFactory<String>(pulsarClient, defaultTopic,
233-
defaultConfigCustomizer, new DefaultTopicResolver(), Duration.ofMinutes(5L), 30L, 2);
233+
defaultConfigCustomizers, new DefaultTopicResolver(), Duration.ofMinutes(5L), 30L, 2);
234234
producerFactories.add(producerFactory);
235235
return producerFactory;
236236
}

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarProducerFactoryTests.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,19 @@
1717
package org.springframework.pulsar.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.inOrder;
22+
import static org.mockito.Mockito.mock;
23+
24+
import java.util.List;
2025

2126
import org.apache.pulsar.client.api.Producer;
27+
import org.apache.pulsar.client.api.ProducerBuilder;
2228
import org.apache.pulsar.client.api.PulsarClient;
2329
import org.apache.pulsar.client.api.PulsarClientException;
30+
import org.junit.jupiter.api.Nested;
2431
import org.junit.jupiter.api.Test;
32+
import org.mockito.InOrder;
2533

2634
import org.springframework.lang.Nullable;
2735

@@ -46,8 +54,42 @@ void createProducerMultipleTimeDoesNotCacheProducer() throws PulsarClientExcepti
4654

4755
@Override
4856
protected PulsarProducerFactory<String> producerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic,
49-
@Nullable ProducerBuilderCustomizer<String> defaultConfigCustomizer) {
50-
return new DefaultPulsarProducerFactory<>(pulsarClient, defaultTopic, defaultConfigCustomizer);
57+
@Nullable List<ProducerBuilderCustomizer<String>> defaultConfigCustomizers) {
58+
return new DefaultPulsarProducerFactory<>(pulsarClient, defaultTopic, defaultConfigCustomizers);
59+
}
60+
61+
@Nested
62+
@SuppressWarnings("unchecked")
63+
class DefaultConfigCustomizerApi {
64+
65+
private ProducerBuilderCustomizer<String> configCustomizer1 = mock(ProducerBuilderCustomizer.class);
66+
67+
private ProducerBuilderCustomizer<String> configCustomizer2 = mock(ProducerBuilderCustomizer.class);
68+
69+
private ProducerBuilderCustomizer<String> createProducerCustomizer = mock(ProducerBuilderCustomizer.class);
70+
71+
@Test
72+
void singleConfigCustomizer() throws PulsarClientException {
73+
try (var ignored = newProducerFactoryWithDefaultConfigCustomizers(List.of(configCustomizer1))
74+
.createProducer(schema, "topic0", createProducerCustomizer)) {
75+
InOrder inOrder = inOrder(configCustomizer1, createProducerCustomizer);
76+
inOrder.verify(configCustomizer1).customize(any(ProducerBuilder.class));
77+
inOrder.verify(createProducerCustomizer).customize(any(ProducerBuilder.class));
78+
}
79+
}
80+
81+
@Test
82+
void multipleConfigCustomizers() throws PulsarClientException {
83+
try (var ignored = newProducerFactoryWithDefaultConfigCustomizers(
84+
List.of(configCustomizer2, configCustomizer1))
85+
.createProducer(schema, "topic0", createProducerCustomizer)) {
86+
InOrder inOrder = inOrder(configCustomizer1, configCustomizer2, createProducerCustomizer);
87+
inOrder.verify(configCustomizer2).customize(any(ProducerBuilder.class));
88+
inOrder.verify(configCustomizer1).customize(any(ProducerBuilder.class));
89+
inOrder.verify(createProducerCustomizer).customize(any(ProducerBuilder.class));
90+
}
91+
}
92+
5193
}
5294

5395
}

spring-pulsar/src/test/java/org/springframework/pulsar/core/FailoverConsumerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.io.Serial;
22+
import java.util.List;
2223
import java.util.concurrent.CountDownLatch;
2324
import java.util.concurrent.TimeUnit;
2425

@@ -85,7 +86,7 @@ void testFailOverConsumersOnPartitionedTopic() throws Exception {
8586
container3.start();
8687

8788
DefaultPulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
88-
"my-part-topic-1", (pb) -> pb.messageRoutingMode(MessageRoutingMode.CustomPartition));
89+
"my-part-topic-1", List.of((pb) -> pb.messageRoutingMode(MessageRoutingMode.CustomPartition)));
8990
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
9091

9192
pulsarTemplate.newMessage("hello john doe")

spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarProducerFactoryTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Arrays;
2828
import java.util.Collections;
29+
import java.util.List;
2930
import java.util.Set;
3031

3132
import org.apache.pulsar.client.api.Producer;
@@ -100,7 +101,12 @@ protected PulsarProducerFactory<String> newProducerFactoryWithDefaultTopic(Strin
100101
}
101102

102103
private PulsarProducerFactory<String> newProducerFactoryWithDefaultKeys(Set<String> defaultKeys) {
103-
return producerFactory(pulsarClient, null, (pb) -> defaultKeys.forEach(pb::addEncryptionKey));
104+
return producerFactory(pulsarClient, null, List.of((pb) -> defaultKeys.forEach(pb::addEncryptionKey)));
105+
}
106+
107+
protected PulsarProducerFactory<String> newProducerFactoryWithDefaultConfigCustomizers(
108+
List<ProducerBuilderCustomizer<String>> customizers) {
109+
return producerFactory(pulsarClient, null, customizers);
104110
}
105111

106112
/**
@@ -116,11 +122,12 @@ protected Producer<String> actualProducer(Producer<String> producer) {
116122
* Subclasses override to provide concrete {@link PulsarProducerFactory} instance.
117123
* @param pulsarClient the Pulsar client
118124
* @param defaultTopic the default topic to use for the producers
119-
* @param defaultConfigCustomizer the default configuration to apply to the producers
125+
* @param defaultConfigCustomizers the optional list of customizers to apply to the
126+
* created producers
120127
* @return a Pulsar producer factory instance to use for the tests
121128
*/
122129
protected abstract PulsarProducerFactory<String> producerFactory(PulsarClient pulsarClient,
123-
@Nullable String defaultTopic, @Nullable ProducerBuilderCustomizer<String> defaultConfigCustomizer);
130+
@Nullable String defaultTopic, @Nullable List<ProducerBuilderCustomizer<String>> defaultConfigCustomizers);
124131

125132
@Test
126133
@SuppressWarnings("unchecked")

spring-pulsar/src/test/java/org/springframework/pulsar/core/SharedSubscriptionConsumerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.util.HashMap;
22+
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.TimeUnit;
@@ -133,7 +134,7 @@ void keySharedSubscriptionWithDefaultAutoSplitHashingRange() throws Exception {
133134
Thread.sleep(5_000);
134135

135136
DefaultPulsarProducerFactory<String> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
136-
"key-shared-batch-disabled-topic", (pb) -> pb.enableBatching(false));
137+
"key-shared-batch-disabled-topic", List.of((pb) -> pb.enableBatching(false)));
137138
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(producerFactory);
138139
for (int i = 0; i < 10; i++) {
139140
pulsarTemplate.newMessage("alice-" + i)

spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ void pulsarListenerProvidedConsumerProperties(@Autowired PulsarListenerEndpointR
177177
void concurrencyOnPulsarListenerWithFailoverSubscription(@Autowired PulsarListenerEndpointRegistry registry)
178178
throws Exception {
179179
var pulsarProducerFactory = new DefaultPulsarProducerFactory<String>(pulsarClient, null,
180-
(pb) -> pb.enableBatching(false));
180+
List.of((pb) -> pb.enableBatching(false)));
181181
var customTemplate = new PulsarTemplate<>(pulsarProducerFactory);
182182
var bar = (ConcurrentPulsarMessageListenerContainer<?>) registry.getListenerContainer("bar");
183183

@@ -194,7 +194,7 @@ void concurrencyOnPulsarListenerWithFailoverSubscription(@Autowired PulsarListen
194194
void nonDefaultConcurrencySettingNotAllowedOnExclusiveSubscriptions(
195195
@Autowired PulsarListenerEndpointRegistry registry) throws Exception {
196196
var pulsarProducerFactory = new DefaultPulsarProducerFactory<String>(pulsarClient, null,
197-
(pb) -> pb.enableBatching(false));
197+
List.of((pb) -> pb.enableBatching(false)));
198198
var customTemplate = new PulsarTemplate<>(pulsarProducerFactory);
199199
var bar = (ConcurrentPulsarMessageListenerContainer<?>) registry.getListenerContainer("bar");
200200

spring-pulsar/src/test/java/org/springframework/pulsar/reader/DefaultPulsarMessageReaderContainerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void basicDefaultReader() throws Exception {
8787
container.start();
8888

8989
DefaultPulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
90-
pulsarClient, "dprlct-001", (pb) -> pb.topic("dprlct-001"));
90+
pulsarClient, "dprlct-001", List.of((pb) -> pb.topic("dprlct-001")));
9191
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
9292
pulsarTemplate.sendAsync("hello john doe");
9393
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -115,7 +115,7 @@ void topicProvidedThroughContainerProperties() throws Exception {
115115
container = new DefaultPulsarMessageReaderContainer<>(pulsarReaderFactory, containerProps);
116116
container.start();
117117
DefaultPulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
118-
pulsarClient, "dprlct-002", (pb) -> pb.topic("dprlct-002"));
118+
pulsarClient, "dprlct-002", List.of((pb) -> pb.topic("dprlct-002")));
119119
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
120120
pulsarTemplate.sendAsync("hello buzz doe");
121121
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -144,7 +144,7 @@ void latestMessageId() throws Exception {
144144

145145
var prodConfig = Map.<String, Object>of("topicName", "dprlct-003");
146146
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, "dprlct-003",
147-
(pb) -> pb.topic("dprlct-003"));
147+
List.of((pb) -> pb.topic("dprlct-003")));
148148
var pulsarTemplate = new PulsarTemplate<>(producerFactory);
149149

150150
// The following sends will not be received by the reader as we are using the

0 commit comments

Comments
 (0)