Skip to content

Commit 836d385

Browse files
authored
Automatically set customizer on listeners (#495)
When there is only a single customizer and a single listener defined in the application then the customizer will automatically be associated with the listener. This works for @PulsarListener, @PulsarReader, and @ReactivePulsarListener. See #480
1 parent a052b79 commit 836d385

File tree

15 files changed

+929
-188
lines changed

15 files changed

+929
-188
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,26 @@ void listen(String message) {
299299

300300
TIP: The properties used are direct Pulsar consumer properties, not the `spring.pulsar.consumer` application configuration properties
301301

302+
==== Customizing the ConsumerBuilder
303+
304+
You can customize any fields available through `ConsumerBuilder` using a `PulsarListenerConsumerBuilderCustomizer` by providing a `@Bean` of type `PulsarListenerConsumerBuilderCustomizer` and then making it available to the `PulsarListener` as shown below.
305+
306+
[source, java]
307+
----
308+
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
309+
public void listen(String message) {
310+
System.out.println("Message Received: " + message);
311+
}
312+
313+
@Bean
314+
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
315+
return (builder) -> builder.consumerName("myConsumer");
316+
}
317+
----
318+
319+
TIP: If your application only has a single `@PulsarListener` and a single `PulsarListenerConsumerBuilderCustomizer` bean registered then the customizer will be automatically applied.
320+
321+
302322
[[schema-info-listener-imperative]]
303323
:listener-class: PulsarListener
304324
include::schema-info/schema-info-listener.adoc[leveloffset=+1]
@@ -1112,7 +1132,7 @@ Suppose you want the reader to start reading messages arbitrarily from a topic o
11121132
==== Customizing the ReaderBuilder
11131133

11141134
You can customize any fields available through `ReaderBuilder` using a `PulsarReaderReaderBuilderCustomizer` in Spring for Apache Pulsar.
1115-
You can provide a `@Bean` of type `PulsarReaderBuilderCustomizer` and then make it available to the `PulsarReader` as below.
1135+
You can provide a `@Bean` of type `PulsarReaderReaderBuilderCustomizer` and then make it available to the `PulsarReader` as below.
11161136

11171137
[source, java]
11181138
----
@@ -1131,6 +1151,8 @@ public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
11311151
}
11321152
----
11331153

1154+
TIP: If your application only has a single `@PulsarReader` and a single `PulsarReaderReaderBuilderCustomizer` bean registered then the customizer will be automatically applied.
1155+
11341156
[[topic-resolution-process-imperative]]
11351157
== Topic Resolution
11361158
include::topic-resolution.adoc[leveloffset=+1]

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomi
270270
}
271271
----
272272

273+
TIP: If your application only has a single `@ReactivePulsarListener` and a single `ReactivePulsarListenerMessageConsumerBuilderCustomizer` bean registered then the customizer will be automatically applied.
274+
273275
You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder.
274276
This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple `ReactivePulsarListener` methods whose configuration varies.
275277

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
226226
this.deadLetterPolicy = deadLetterPolicy;
227227
}
228228

229+
public ReactiveMessageConsumerBuilderCustomizer<V> getConsumerCustomizer() {
230+
return this.consumerCustomizer;
231+
}
232+
229233
public void setConsumerCustomizer(ReactiveMessageConsumerBuilderCustomizer<V> consumerCustomizer) {
230234
this.consumerCustomizer = consumerCustomizer;
231235
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,13 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V> extends Abstra
110110

111111
private final AtomicInteger counter = new AtomicInteger();
112112

113+
private final List<MethodReactivePulsarListenerEndpoint<?>> processedEndpoints = new ArrayList<>();
114+
113115
@Override
114116
public void afterSingletonsInstantiated() {
115117
this.registrar.setBeanFactory(this.beanFactory);
116-
117118
this.beanFactory.getBeanProvider(PulsarListenerConfigurer.class)
118119
.forEach(c -> c.configurePulsarListeners(this.registrar));
119-
120120
if (this.registrar.getEndpointRegistry() == null) {
121121
if (this.endpointRegistry == null) {
122122
Assert.state(this.beanFactory != null,
@@ -127,12 +127,11 @@ public void afterSingletonsInstantiated() {
127127
}
128128
this.registrar.setEndpointRegistry(this.endpointRegistry);
129129
}
130-
131130
if (this.defaultContainerFactoryBeanName != null) {
132131
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
133132
}
134133
addFormatters(this.messageHandlerMethodFactory.getDefaultFormattingConversionService());
135-
134+
postProcessEndpointsBeforeRegistration();
136135
// Actually register all listeners
137136
this.registrar.afterPropertiesSet();
138137
}
@@ -201,14 +200,11 @@ protected void processListener(MethodReactivePulsarListenerEndpoint<?> endpoint,
201200
@Nullable
202201
private ReactivePulsarListenerContainerFactory<?> resolveContainerFactory(
203202
ReactivePulsarListener ReactivePulsarListener, Object factoryTarget, String beanName) {
204-
205203
String containerFactory = ReactivePulsarListener.containerFactory();
206204
if (!StringUtils.hasText(containerFactory)) {
207205
return null;
208206
}
209-
210207
ReactivePulsarListenerContainerFactory<?> factory = null;
211-
212208
Object resolved = resolveExpression(containerFactory);
213209
if (resolved instanceof ReactivePulsarListenerContainerFactory) {
214210
return (ReactivePulsarListenerContainerFactory<?>) resolved;
@@ -230,7 +226,6 @@ private ReactivePulsarListenerContainerFactory<?> resolveContainerFactory(
230226

231227
private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListenerEndpoint<?> endpoint,
232228
ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) {
233-
234229
endpoint.setBean(bean);
235230
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
236231
endpoint.setSubscriptionName(getEndpointSubscriptionName(reactivePulsarListener));
@@ -239,7 +234,6 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
239234
endpoint.setTopicPattern(topicPattern);
240235
resolveSubscriptionType(endpoint, reactivePulsarListener);
241236
endpoint.setSchemaType(reactivePulsarListener.schemaType());
242-
243237
String concurrency = reactivePulsarListener.concurrency();
244238
if (StringUtils.hasText(concurrency)) {
245239
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
@@ -249,16 +243,15 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
249243
endpoint.setUseKeyOrderedProcessing(
250244
resolveExpressionAsBoolean(useKeyOrderedProcessing, "useKeyOrderedProcessing"));
251245
}
252-
253246
String autoStartup = reactivePulsarListener.autoStartup();
254247
if (StringUtils.hasText(autoStartup)) {
255248
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
256249
}
257250
endpoint.setFluxListener(reactivePulsarListener.stream());
258251
endpoint.setBeanFactory(this.beanFactory);
259-
260252
resolveDeadLetterPolicy(endpoint, reactivePulsarListener);
261253
resolveConsumerCustomizer(endpoint, reactivePulsarListener);
254+
this.processedEndpoints.add(endpoint);
262255
}
263256

264257
private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint<?> endpoint,
@@ -286,9 +279,29 @@ private void resolveDeadLetterPolicy(MethodReactivePulsarListenerEndpoint<?> end
286279
}
287280
}
288281

282+
@SuppressWarnings("unchecked")
283+
protected void postProcessEndpointsBeforeRegistration() {
284+
if (this.processedEndpoints.size() == 1) {
285+
MethodReactivePulsarListenerEndpoint<?> endpoint = this.processedEndpoints.get(0);
286+
if (endpoint.getConsumerCustomizer() != null) {
287+
return;
288+
}
289+
this.beanFactory.getBeanProvider(ReactivePulsarListenerMessageConsumerBuilderCustomizer.class)
290+
.ifUnique((customizer) -> {
291+
this.logger.info(() -> String
292+
.format("Setting the only registered ReactivePulsarListenerMessageConsumerBuilderCustomizer "
293+
+ "on the only registered @ReactivePulsarListener (%s)", endpoint.getId()));
294+
endpoint.setConsumerCustomizer(customizer::customize);
295+
});
296+
}
297+
}
298+
289299
@SuppressWarnings({ "rawtypes", "unchecked" })
290300
private void resolveConsumerCustomizer(MethodReactivePulsarListenerEndpoint<?> endpoint,
291301
ReactivePulsarListener reactivePulsarListener) {
302+
if (!StringUtils.hasText(reactivePulsarListener.consumerCustomizer())) {
303+
return;
304+
}
292305
Object consumerCustomizer = resolveExpression(reactivePulsarListener.consumerCustomizer());
293306
if (consumerCustomizer instanceof ReactivePulsarListenerMessageConsumerBuilderCustomizer customizer) {
294307
endpoint.setConsumerCustomizer(customizer::customize);

0 commit comments

Comments
 (0)