@@ -89,12 +89,11 @@ public class PulsarAutoConfiguration {
89
89
@ ConditionalOnMissingBean (PulsarProducerFactory .class )
90
90
@ ConditionalOnProperty (name = "spring.pulsar.producer.cache.enabled" , havingValue = "false" )
91
91
DefaultPulsarProducerFactory <?> pulsarProducerFactory (PulsarClient pulsarClient , TopicResolver topicResolver ,
92
- ObjectProvider <ProducerBuilderCustomizer <?>> customizersProvider ,
93
- PulsarTopicBuilder topicBuilder ) {
92
+ ObjectProvider <ProducerBuilderCustomizer <?>> customizersProvider , PulsarTopicBuilder topicBuilder ) {
94
93
List <ProducerBuilderCustomizer <Object >> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers (
95
94
customizersProvider );
96
- DefaultPulsarProducerFactory <?> producerFactory = new DefaultPulsarProducerFactory <>(pulsarClient , this . properties . getProducer (). getTopicName (),
97
- lambdaSafeCustomizers , topicResolver );
95
+ DefaultPulsarProducerFactory <?> producerFactory = new DefaultPulsarProducerFactory <>(pulsarClient ,
96
+ this . properties . getProducer (). getTopicName (), lambdaSafeCustomizers , topicResolver );
98
97
producerFactory .setTopicBuilder (topicBuilder );
99
98
return producerFactory ;
100
99
}
@@ -103,14 +102,14 @@ DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient,
103
102
@ ConditionalOnMissingBean (PulsarProducerFactory .class )
104
103
@ ConditionalOnProperty (name = "spring.pulsar.producer.cache.enabled" , havingValue = "true" , matchIfMissing = true )
105
104
CachingPulsarProducerFactory <?> cachingPulsarProducerFactory (PulsarClient pulsarClient , TopicResolver topicResolver ,
106
- ObjectProvider <ProducerBuilderCustomizer <?>> customizersProvider ,
107
- PulsarTopicBuilder topicBuilder ) {
105
+ ObjectProvider <ProducerBuilderCustomizer <?>> customizersProvider , PulsarTopicBuilder topicBuilder ) {
108
106
PulsarProperties .Producer .Cache cacheProperties = this .properties .getProducer ().getCache ();
109
107
List <ProducerBuilderCustomizer <Object >> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers (
110
108
customizersProvider );
111
- CachingPulsarProducerFactory <?> producerFactory = new CachingPulsarProducerFactory <>(pulsarClient , this .properties .getProducer ().getTopicName (),
112
- lambdaSafeCustomizers , topicResolver , cacheProperties .getExpireAfterAccess (),
113
- cacheProperties .getMaximumSize (), cacheProperties .getInitialCapacity ());
109
+ CachingPulsarProducerFactory <?> producerFactory = new CachingPulsarProducerFactory <>(pulsarClient ,
110
+ this .properties .getProducer ().getTopicName (), lambdaSafeCustomizers , topicResolver ,
111
+ cacheProperties .getExpireAfterAccess (), cacheProperties .getMaximumSize (),
112
+ cacheProperties .getInitialCapacity ());
114
113
producerFactory .setTopicBuilder (topicBuilder );
115
114
return producerFactory ;
116
115
}
@@ -145,14 +144,14 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
145
144
@ Bean
146
145
@ ConditionalOnMissingBean (PulsarConsumerFactory .class )
147
146
DefaultPulsarConsumerFactory <?> pulsarConsumerFactory (PulsarClient pulsarClient ,
148
- ObjectProvider <ConsumerBuilderCustomizer <?>> customizersProvider ,
149
- PulsarTopicBuilder topicBuilder ) {
147
+ ObjectProvider <ConsumerBuilderCustomizer <?>> customizersProvider , PulsarTopicBuilder topicBuilder ) {
150
148
List <ConsumerBuilderCustomizer <?>> customizers = new ArrayList <>();
151
149
customizers .add (this .propertiesMapper ::customizeConsumerBuilder );
152
150
customizers .addAll (customizersProvider .orderedStream ().toList ());
153
151
List <ConsumerBuilderCustomizer <Object >> lambdaSafeCustomizers = List
154
152
.of ((builder ) -> applyConsumerBuilderCustomizers (customizers , builder ));
155
- DefaultPulsarConsumerFactory <?> consumerFactory = new DefaultPulsarConsumerFactory <>(pulsarClient , lambdaSafeCustomizers );
153
+ DefaultPulsarConsumerFactory <?> consumerFactory = new DefaultPulsarConsumerFactory <>(pulsarClient ,
154
+ lambdaSafeCustomizers );
156
155
consumerFactory .setTopicBuilder (topicBuilder );
157
156
return consumerFactory ;
158
157
}
@@ -191,14 +190,14 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
191
190
@ Bean
192
191
@ ConditionalOnMissingBean (PulsarReaderFactory .class )
193
192
DefaultPulsarReaderFactory <?> pulsarReaderFactory (PulsarClient pulsarClient ,
194
- ObjectProvider <ReaderBuilderCustomizer <?>> customizersProvider ,
195
- PulsarTopicBuilder topicBuilder ) {
193
+ ObjectProvider <ReaderBuilderCustomizer <?>> customizersProvider , PulsarTopicBuilder topicBuilder ) {
196
194
List <ReaderBuilderCustomizer <?>> customizers = new ArrayList <>();
197
195
customizers .add (this .propertiesMapper ::customizeReaderBuilder );
198
196
customizers .addAll (customizersProvider .orderedStream ().toList ());
199
197
List <ReaderBuilderCustomizer <Object >> lambdaSafeCustomizers = List
200
198
.of ((builder ) -> applyReaderBuilderCustomizers (customizers , builder ));
201
- DefaultPulsarReaderFactory <?> readerFactory = new DefaultPulsarReaderFactory <>(pulsarClient , lambdaSafeCustomizers );
199
+ DefaultPulsarReaderFactory <?> readerFactory = new DefaultPulsarReaderFactory <>(pulsarClient ,
200
+ lambdaSafeCustomizers );
202
201
readerFactory .setTopicBuilder (topicBuilder );
203
202
return readerFactory ;
204
203
}
0 commit comments