Skip to content

Commit 37e6075

Browse files
authored
Add multi-customizers to reactive reader and consumer (#436)
* Also add builder to ReactivePulsarSenderFactory See #432
1 parent 495de95 commit 37e6075

11 files changed

+489
-382
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,37 @@
2020
import java.util.List;
2121

2222
import org.apache.pulsar.client.api.Schema;
23-
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
24-
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
2523
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
2624
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
27-
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
2825
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
2926

27+
import org.springframework.lang.Nullable;
3028
import org.springframework.util.CollectionUtils;
3129

3230
/**
3331
* Default implementation for {@link ReactivePulsarConsumerFactory}.
3432
*
3533
* @param <T> underlying payload type for the reactive consumer.
3634
* @author Christophe Bornet
35+
* @author Chris Bono
3736
*/
3837
public class DefaultReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {
3938

40-
private final ReactiveMessageConsumerSpec consumerSpec;
41-
4239
private final ReactivePulsarClient reactivePulsarClient;
4340

41+
@Nullable
42+
private final List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers;
43+
44+
/**
45+
* Construct an instance.
46+
* @param reactivePulsarClient the reactive client
47+
* @param defaultConfigCustomizers the optional list of customizers that defines the
48+
* default configuration for each created consumer.
49+
*/
4450
public DefaultReactivePulsarConsumerFactory(ReactivePulsarClient reactivePulsarClient,
45-
ReactiveMessageConsumerSpec consumerSpec) {
46-
this.consumerSpec = new ImmutableReactiveMessageConsumerSpec(
47-
consumerSpec != null ? consumerSpec : new MutableReactiveMessageConsumerSpec());
51+
List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers) {
4852
this.reactivePulsarClient = reactivePulsarClient;
53+
this.defaultConfigCustomizers = defaultConfigCustomizers;
4954
}
5055

5156
@Override
@@ -57,14 +62,19 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
5762
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
5863
List<ReactiveMessageConsumerBuilderCustomizer<T>> customizers) {
5964

60-
ReactiveMessageConsumerBuilder<T> consumer = this.reactivePulsarClient.messageConsumer(schema);
65+
ReactiveMessageConsumerBuilder<T> consumerBuilder = this.reactivePulsarClient.messageConsumer(schema);
66+
67+
// Apply the default customizers
68+
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
69+
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
70+
}
6171

62-
consumer.applySpec(this.consumerSpec);
72+
// Apply the user specified customizers
6373
if (!CollectionUtils.isEmpty(customizers)) {
64-
customizers.forEach((c) -> c.customize(consumer));
74+
customizers.forEach((c) -> c.customize(consumerBuilder));
6575
}
6676

67-
return consumer.build();
77+
return consumerBuilder.build();
6878
}
6979

7080
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactory.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,35 @@
2222
import org.apache.pulsar.client.api.Schema;
2323
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
2424
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder;
25-
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
2625
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
2726

27+
import org.springframework.lang.Nullable;
2828
import org.springframework.util.CollectionUtils;
2929

3030
/**
3131
* Default implementation for {@link ReactivePulsarReaderFactory}.
3232
*
3333
* @param <T> underlying payload type for the reactive reader.
3434
* @author Christophe Bornet
35+
* @author Chris Bono
3536
*/
3637
public class DefaultReactivePulsarReaderFactory<T> implements ReactivePulsarReaderFactory<T> {
3738

38-
private final ReactiveMessageReaderSpec readerSpec;
39-
4039
private final ReactivePulsarClient reactivePulsarClient;
4140

41+
@Nullable
42+
private final List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers;
43+
44+
/**
45+
* Construct an instance.
46+
* @param reactivePulsarClient the reactive client
47+
* @param defaultConfigCustomizers the optional list of customizers that defines the
48+
* default configuration for each created reader.
49+
*/
4250
public DefaultReactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
43-
ReactiveMessageReaderSpec readerSpec) {
51+
List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers) {
4452
this.reactivePulsarClient = reactivePulsarClient;
45-
this.readerSpec = readerSpec;
53+
this.defaultConfigCustomizers = defaultConfigCustomizers;
4654
}
4755

4856
@Override
@@ -54,12 +62,19 @@ public ReactiveMessageReader<T> createReader(Schema<T> schema) {
5462
public ReactiveMessageReader<T> createReader(Schema<T> schema,
5563
List<ReactiveMessageReaderBuilderCustomizer<T>> customizers) {
5664

57-
ReactiveMessageReaderBuilder<T> reader = this.reactivePulsarClient.messageReader(schema)
58-
.applySpec(this.readerSpec);
65+
ReactiveMessageReaderBuilder<T> readerBuilder = this.reactivePulsarClient.messageReader(schema);
66+
67+
// Apply the default customizers
68+
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
69+
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(readerBuilder)));
70+
}
71+
72+
// Apply the user specified customizers
5973
if (!CollectionUtils.isEmpty(customizers)) {
60-
customizers.forEach((c) -> c.customize(reader));
74+
customizers.forEach((c) -> c.customize(readerBuilder));
6175
}
62-
return reader.build();
76+
77+
return readerBuilder.build();
6378
}
6479

6580
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java

Lines changed: 128 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,81 +23,70 @@
2323
import org.apache.pulsar.client.api.PulsarClient;
2424
import org.apache.pulsar.client.api.Schema;
2525
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
26-
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
27-
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
2826
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
2927
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
3028
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
31-
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
3229
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
3330

3431
import org.springframework.core.log.LogAccessor;
3532
import org.springframework.lang.Nullable;
3633
import org.springframework.pulsar.core.DefaultTopicResolver;
3734
import org.springframework.pulsar.core.TopicResolver;
35+
import org.springframework.util.Assert;
3836
import org.springframework.util.CollectionUtils;
3937

4038
/**
4139
* Default implementation of {@link ReactivePulsarSenderFactory}.
4240
*
43-
* @param <T> reactive sender type.
41+
* @param <T> underlying payload type for the reactive sender.
4442
* @author Christophe Bornet
4543
* @author Chris Bono
4644
*/
47-
public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {
45+
public final class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {
4846

4947
private final LogAccessor logger = new LogAccessor(this.getClass());
5048

5149
private final ReactivePulsarClient reactivePulsarClient;
5250

53-
private final ReactiveMessageSenderSpec reactiveMessageSenderSpec;
51+
private final TopicResolver topicResolver;
5452

5553
@Nullable
5654
private final ReactiveMessageSenderCache reactiveMessageSenderCache;
5755

5856
@Nullable
59-
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers;
57+
private String defaultTopic;
6058

61-
private TopicResolver topicResolver;
59+
@Nullable
60+
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;
61+
62+
private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver,
63+
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String defaultTopic,
64+
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers) {
65+
this.reactivePulsarClient = reactivePulsarClient;
66+
this.topicResolver = topicResolver;
67+
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
68+
this.defaultTopic = defaultTopic;
69+
this.defaultConfigCustomizers = defaultConfigCustomizers;
70+
}
6271

6372
/**
64-
* Construct an instance.
65-
* @param pulsarClient the pulsar client to adapt into a reactive client
66-
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
67-
* created senders
68-
* @param reactiveMessageSenderCache cache used to cache created senders
69-
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
70-
* to apply to the created senders
73+
* Create a builder that uses the specified Reactive pulsar client.
74+
* @param reactivePulsarClient the reactive client
75+
* @param <T> underlying payload type for the reactive sender
76+
* @return the newly created builder instance
7177
*/
72-
public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient,
73-
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
74-
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
75-
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers) {
76-
this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec,
77-
reactiveMessageSenderCache, defaultSenderBuilderCustomizers, new DefaultTopicResolver());
78+
public static <T> Builder<T> builderFor(ReactivePulsarClient reactivePulsarClient) {
79+
return new Builder<>(reactivePulsarClient);
7880
}
7981

8082
/**
81-
* Construct an instance.
82-
* @param reactivePulsarClient the reactive client to use
83-
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
84-
* created senders
85-
* @param reactiveMessageSenderCache cache used to cache created senders
86-
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
87-
* to apply to the created senders
88-
* @param topicResolver the topic resolver to use
83+
* Create a builder that adapts the specified pulsar client.
84+
* @param pulsarClient the Pulsar client to adapt into a Reactive client
85+
* @param <T> underlying payload type for the reactive sender
86+
* @return the newly created builder instance
8987
*/
90-
public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
91-
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
92-
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
93-
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers,
94-
TopicResolver topicResolver) {
95-
this.reactivePulsarClient = reactivePulsarClient;
96-
this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(
97-
reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());
98-
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
99-
this.topicResolver = topicResolver;
100-
this.defaultSenderBuilderCustomizers = defaultSenderBuilderCustomizers;
88+
public static <T> Builder<T> builderFor(PulsarClient pulsarClient) {
89+
return new Builder<>(AdaptedReactivePulsarClientFactory.create(pulsarClient));
10190
}
10291

10392
@Override
@@ -121,34 +110,123 @@ public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String
121110
private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic,
122111
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
123112
Objects.requireNonNull(schema, "Schema must be specified");
124-
String resolvedTopic = this.topicResolver
125-
.resolveTopic(topic, () -> getReactiveMessageSenderSpec().getTopicName())
126-
.orElseThrow();
113+
String resolvedTopic = this.topicResolver.resolveTopic(topic, () -> getDefaultTopic()).orElseThrow();
127114
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));
128115

129116
ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
130-
sender.applySpec(this.reactiveMessageSenderSpec);
131117

132-
// Apply the default config customizer (preserve the topic)
133-
if (!CollectionUtils.isEmpty(this.defaultSenderBuilderCustomizers)) {
134-
this.defaultSenderBuilderCustomizers.forEach((customizer -> customizer.customize(sender)));
118+
// Apply the default customizers (preserve the topic)
119+
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
120+
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(sender)));
135121
}
136122
sender.topic(resolvedTopic);
123+
137124
if (this.reactiveMessageSenderCache != null) {
138125
sender.cache(this.reactiveMessageSenderCache);
139126
}
127+
128+
// Apply the user specified customizers (preserve the topic)
140129
if (!CollectionUtils.isEmpty(customizers)) {
141130
customizers.forEach((c) -> c.customize(sender));
142131
}
143-
// make sure the customizer do not override the topic
144132
sender.topic(resolvedTopic);
145133

146134
return sender.build();
147135
}
148136

149137
@Override
150-
public ReactiveMessageSenderSpec getReactiveMessageSenderSpec() {
151-
return this.reactiveMessageSenderSpec;
138+
public String getDefaultTopic() {
139+
return this.defaultTopic;
140+
}
141+
142+
/**
143+
* Builder for {@link DefaultReactivePulsarSenderFactory}.
144+
*
145+
* @param <T> the reactive sender type
146+
*/
147+
public static final class Builder<T> {
148+
149+
private final ReactivePulsarClient reactivePulsarClient;
150+
151+
private TopicResolver topicResolver = new DefaultTopicResolver();
152+
153+
@Nullable
154+
private ReactiveMessageSenderCache messageSenderCache;
155+
156+
@Nullable
157+
private String defaultTopic;
158+
159+
@Nullable
160+
private List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;
161+
162+
private Builder(ReactivePulsarClient reactivePulsarClient) {
163+
Assert.notNull(reactivePulsarClient, "Reactive client is required");
164+
this.reactivePulsarClient = reactivePulsarClient;
165+
}
166+
167+
/**
168+
* Provide the topic resolver to use.
169+
* @param topicResolver the topic resolver to use
170+
* @return this same builder instance
171+
*/
172+
public Builder<T> withTopicResolver(TopicResolver topicResolver) {
173+
this.topicResolver = topicResolver;
174+
return this;
175+
}
176+
177+
/**
178+
* Provide the message sender cache to use.
179+
* @param messageSenderCache the message sender cache to use
180+
* @return this same builder instance
181+
*/
182+
public Builder<T> withMessageSenderCache(ReactiveMessageSenderCache messageSenderCache) {
183+
this.messageSenderCache = messageSenderCache;
184+
return this;
185+
}
186+
187+
/**
188+
* Provide the default topic to use when one is not specified.
189+
* @param defaultTopic the default topic to use
190+
* @return this same builder instance
191+
*/
192+
public Builder<T> withDefaultTopic(String defaultTopic) {
193+
this.defaultTopic = defaultTopic;
194+
return this;
195+
}
196+
197+
/**
198+
* Provide a customizer to apply to the sender builder.
199+
* @param customizer the customizer to apply to the builder before creating
200+
* senders
201+
* @return this same builder instance
202+
*/
203+
public Builder<T> withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer) {
204+
this.defaultConfigCustomizers = List.of(customizer);
205+
return this;
206+
}
207+
208+
/**
209+
* Provide an optional list of sender builder customizers to apply to the builder
210+
* before creating the senders.
211+
* @param customizers optional list of sender builder customizers to apply to the
212+
* builder before creating the senders.
213+
* @return this same builder instance
214+
*/
215+
public Builder<T> withDefaultConfigCustomizers(List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
216+
this.defaultConfigCustomizers = customizers;
217+
return this;
218+
}
219+
220+
/**
221+
* Construct the sender factory using the specified settings.
222+
* @return pulsar sender factory
223+
*/
224+
public DefaultReactivePulsarSenderFactory<T> build() {
225+
Assert.notNull(this.topicResolver, "Topic resolver is required");
226+
return new DefaultReactivePulsarSenderFactory<>(this.reactivePulsarClient, this.topicResolver,
227+
this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers);
228+
}
229+
152230
}
153231

154232
}

0 commit comments

Comments
 (0)