Skip to content

Commit 0afde4b

Browse files
committed
Update library versions (formatter only)
- Updates 'io.spring.javaformat:spring-javaformat-gradle-plugin' from '0.0.34` to `0.0.38` - This in turn formats many files (expectd) `
1 parent 30bfb35 commit 0afde4b

File tree

72 files changed

+907
-710
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+907
-710
lines changed

buildSrc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ dependencies {
4343
implementation 'commons-codec:commons-codec:1.15'
4444
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.4'
4545
implementation 'io.github.gradle-nexus:publish-plugin:1.1.0'
46-
implementation("io.spring.javaformat:spring-javaformat-gradle-plugin:0.0.34")
46+
implementation("io.spring.javaformat:spring-javaformat-gradle-plugin:0.0.38")
4747
implementation 'io.spring.nohttp:nohttp-gradle:0.0.10'
4848
implementation "org.apache.maven:maven-embedder:3.6.3"
4949
implementation "org.asciidoctor:asciidoctor-gradle-jvm:3.3.2"

integration-tests/src/intTest/java/org/springframework/pulsar/autoconfigure/PulsarFunctionAdministrationIntegrationTests.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ void verifyRabbitSourceIsCreatedAndMessagesAreSourcedIntoPulsar() throws Excepti
132132

133133
// Send messages to rabbit and wait for them to come through the rabbit source
134134
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
135-
List<String> messages = LongStream.range(0, RECEIVED_MESSAGE_LATCH.getCount()).mapToObj((i) -> "bar" + i)
136-
.toList();
135+
List<String> messages = LongStream.range(0, RECEIVED_MESSAGE_LATCH.getCount())
136+
.mapToObj((i) -> "bar" + i)
137+
.toList();
137138
messages.forEach(msg -> rabbitTemplate.convertAndSend(RABBIT_QUEUE, msg));
138139

139140
assertThat(RECEIVED_MESSAGE_LATCH.await(10, TimeUnit.SECONDS)).isTrue();
@@ -180,20 +181,23 @@ private PulsarAdmin getAdmin() throws PulsarClientException {
180181
private void assertSourceExistsWithStatus(String name, boolean isRunning, PulsarAdmin admin)
181182
throws PulsarAdminException {
182183
assertThat(admin.sources().getSourceStatus("public", "default", name)).isNotNull()
183-
.extracting(SourceStatus::getNumRunning).isEqualTo(isRunning ? 1 : 0);
184+
.extracting(SourceStatus::getNumRunning)
185+
.isEqualTo(isRunning ? 1 : 0);
184186
}
185187

186188
private void assertSourceDoesNotExist(String name, PulsarAdmin admin) {
187189
assertThatThrownBy(() -> admin.sources().getSourceStatus("public", "default", name))
188-
.isInstanceOf(NotFoundException.class);
190+
.isInstanceOf(NotFoundException.class);
189191
}
190192

191193
static boolean rabbitConnectorExists() {
192194
try {
193195
Resource[] connectors = ResourcePatternUtils.getResourcePatternResolver(new DefaultResourceLoader())
194-
.getResources("classpath:/connectors/**");
195-
boolean available = Arrays.stream(connectors).map(Resource::getFilename).filter(Objects::nonNull)
196-
.anyMatch((name) -> name.contains("pulsar-io-rabbitmq"));
196+
.getResources("classpath:/connectors/**");
197+
boolean available = Arrays.stream(connectors)
198+
.map(Resource::getFilename)
199+
.filter(Objects::nonNull)
200+
.anyMatch((name) -> name.contains("pulsar-io-rabbitmq"));
197201
if (!available) {
198202
logTestDisabledReason();
199203
return false;
@@ -226,9 +230,14 @@ static PulsarSource rabbitPulsarSource(@Nullable FunctionStopPolicy stopPolicy)
226230
configs.put("password", "guest");
227231
configs.put("queueName", RABBIT_QUEUE + suffix);
228232
configs.put("connectionName", "pft_foo_connection" + suffix);
229-
SourceConfig sourceConfig = SourceConfig.builder().tenant("public").namespace("default")
230-
.name("rabbit-test-source" + suffix).archive("builtin://rabbitmq").topicName(PULSAR_TOPIC + suffix)
231-
.configs(configs).build();
233+
SourceConfig sourceConfig = SourceConfig.builder()
234+
.tenant("public")
235+
.namespace("default")
236+
.name("rabbit-test-source" + suffix)
237+
.archive("builtin://rabbitmq")
238+
.topicName(PULSAR_TOPIC + suffix)
239+
.configs(configs)
240+
.build();
232241
return new PulsarSource(sourceConfig, stopPolicy != null ? stopPolicy : FunctionStopPolicy.DELETE, null);
233242
}
234243

@@ -276,7 +285,7 @@ static class ContainerLoggingTestWatcher implements TestWatcher {
276285
@Override
277286
public void testFailed(ExtensionContext context, Throwable cause) {
278287
this.logger.error(() -> "Test %s failed due to: %s - inspect container logs below:%n%n%s"
279-
.formatted(context.getDisplayName(), cause.getMessage(), getPulsarContainerLogs()));
288+
.formatted(context.getDisplayName(), cause.getMessage(), getPulsarContainerLogs()));
280289
}
281290

282291
private String getPulsarContainerLogs() {

integration-tests/src/intTest/java/org/springframework/pulsar/autoconfigure/PulsarListenerIntegrationTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ void basicPulsarListener() throws Exception {
6464
app.setWebApplicationType(WebApplicationType.NONE);
6565

6666
try (ConfigurableApplicationContext context = app
67-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
67+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
6868
@SuppressWarnings("unchecked")
6969
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
7070
pulsarTemplate.send("plt-basic-topic", "John Doe");
@@ -78,7 +78,7 @@ void basicPulsarListenerCustomType() throws Exception {
7878
app.setWebApplicationType(WebApplicationType.NONE);
7979

8080
try (ConfigurableApplicationContext context = app
81-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
81+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
8282
@SuppressWarnings("unchecked")
8383
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
8484
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
@@ -92,7 +92,7 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
9292
app.setWebApplicationType(WebApplicationType.NONE);
9393

9494
try (ConfigurableApplicationContext context = app
95-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
95+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
9696
@SuppressWarnings("unchecked")
9797
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
9898
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
@@ -106,7 +106,7 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
106106
app.setWebApplicationType(WebApplicationType.NONE);
107107

108108
try (ConfigurableApplicationContext context = app
109-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
109+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
110110
@SuppressWarnings("unchecked")
111111
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
112112
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
@@ -120,7 +120,7 @@ void batchPulsarListener() throws Exception {
120120
app.setWebApplicationType(WebApplicationType.NONE);
121121

122122
try (ConfigurableApplicationContext context = app
123-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
123+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
124124
@SuppressWarnings("unchecked")
125125
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
126126
for (int i = 0; i < 10; i++) {

integration-tests/src/intTest/java/org/springframework/pulsar/autoconfigure/ReactivePulsarListenerIntegrationTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ void basicListener() throws Exception {
7272
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
7373
app.setWebApplicationType(WebApplicationType.NONE);
7474
try (ConfigurableApplicationContext context = app
75-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
75+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
7676
@SuppressWarnings("unchecked")
7777
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
7878
pulsarTemplate.send("rplt-topic1", "John Doe").block();
@@ -85,7 +85,7 @@ void basicListenerCustomType() throws Exception {
8585
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
8686
app.setWebApplicationType(WebApplicationType.NONE);
8787
try (ConfigurableApplicationContext context = app
88-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
88+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
8989
@SuppressWarnings("unchecked")
9090
ReactivePulsarTemplate<Foo> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
9191
pulsarTemplate.send("rplt-custom-topic1", new Foo("John Doe"), Schema.JSON(Foo.class)).block();
@@ -98,7 +98,7 @@ void basicListenerCustomTypeWithTypeMapping() throws Exception {
9898
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
9999
app.setWebApplicationType(WebApplicationType.NONE);
100100
try (ConfigurableApplicationContext context = app
101-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
101+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
102102
@SuppressWarnings("unchecked")
103103
ReactivePulsarTemplate<Foo> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
104104
pulsarTemplate.send("rplt-custom-topic2", new Foo("John Doe")).block();
@@ -111,7 +111,7 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
111111
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
112112
app.setWebApplicationType(WebApplicationType.NONE);
113113
try (ConfigurableApplicationContext context = app
114-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
114+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
115115
@SuppressWarnings("unchecked")
116116
ReactivePulsarTemplate<Foo> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
117117
pulsarTemplate.send("rplt-topicMapping-topic1", new Foo("Crazy8z"), Schema.JSON(Foo.class)).block();
@@ -124,7 +124,7 @@ void fluxListener() throws Exception {
124124
SpringApplication app = new SpringApplication(FluxListenerConfig.class);
125125
app.setWebApplicationType(WebApplicationType.NONE);
126126
try (ConfigurableApplicationContext context = app
127-
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
127+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
128128
@SuppressWarnings("unchecked")
129129
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
130130
for (int i = 0; i < 10; i++) {

spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProviderFactory.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ public class CaffeineCacheProviderFactory<K, V> implements CacheProviderFactory<
3535
@Override
3636
public CacheProvider<K, V> create(Duration cacheExpireAfterAccess, Long cacheMaximumSize,
3737
Integer cacheInitialCapacity, EvictionListener<K, V> evictionListener) {
38-
Cache<K, V> cache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess)
39-
.maximumSize(cacheMaximumSize).initialCapacity(cacheInitialCapacity)
40-
.scheduler(Scheduler.systemScheduler()).evictionListener((RemovalListener<K, V>) (key, value,
41-
cause) -> evictionListener.onEviction(key, value, cause.toString()))
42-
.build();
38+
Cache<K, V> cache = Caffeine.newBuilder()
39+
.expireAfterAccess(cacheExpireAfterAccess)
40+
.maximumSize(cacheMaximumSize)
41+
.initialCapacity(cacheInitialCapacity)
42+
.scheduler(Scheduler.systemScheduler())
43+
.evictionListener((RemovalListener<K, V>) (key, value, cause) -> evictionListener.onEviction(key, value,
44+
cause.toString()))
45+
.build();
4346
return new CaffeineCacheProvider<>(cache);
4447
}
4548

spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProviderFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ CacheProvider<K, V> create(Duration cacheExpireAfterAccess, Long cacheMaximumSiz
4848
*/
4949
@SuppressWarnings("unchecked")
5050
static <K, V> CacheProviderFactory<K, V> load() {
51-
return ServiceLoader.load(CacheProviderFactory.class).findFirst()
52-
.orElseThrow(() -> new IllegalStateException("No ProducerCacheFactory available"));
51+
return ServiceLoader.load(CacheProviderFactory.class)
52+
.findFirst()
53+
.orElseThrow(() -> new IllegalStateException("No ProducerCacheFactory available"));
5354
}
5455

5556
/**

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/aot/ReactivePulsarRuntimeHints.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,22 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
5656
// and introspect all public methods. The components are a mix of JDK classes,
5757
// core Pulsar classes,
5858
// some other shaded components available through Pulsar client.
59-
Stream.of(HashSet.class, TreeMap.class, Authentication.class, AuthenticationDataProvider.class,
60-
SecretsSerializer.class, NioSocketChannel.class, AbstractByteBufAllocator.class,
61-
NioDatagramChannel.class, PulsarAdminBuilderImpl.class, OffloadProcessStatusImpl.class, Commands.class,
62-
ReferenceCountUtil.class).forEach(
63-
type -> reflectionHints.registerType(type,
64-
builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_CONSTRUCTORS,
65-
MemberCategory.INVOKE_DECLARED_METHODS,
66-
MemberCategory.INTROSPECT_PUBLIC_METHODS)));
59+
Stream
60+
.of(HashSet.class, TreeMap.class, Authentication.class, AuthenticationDataProvider.class,
61+
SecretsSerializer.class, NioSocketChannel.class, AbstractByteBufAllocator.class,
62+
NioDatagramChannel.class, PulsarAdminBuilderImpl.class, OffloadProcessStatusImpl.class,
63+
Commands.class, ReferenceCountUtil.class)
64+
.forEach(type -> reflectionHints.registerType(type,
65+
builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_CONSTRUCTORS,
66+
MemberCategory.INVOKE_DECLARED_METHODS, MemberCategory.INTROSPECT_PUBLIC_METHODS)));
6767

6868
// In addition to the above member category levels, these components need field
6969
// and declared class level access.
7070
Stream.of(ClientConfigurationData.class, ConsumerConfigurationData.class, ProducerConfigurationData.class)
71-
.forEach(type -> reflectionHints.registerType(type,
72-
builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_CONSTRUCTORS,
73-
MemberCategory.INVOKE_DECLARED_METHODS, MemberCategory.INTROSPECT_PUBLIC_METHODS,
74-
MemberCategory.DECLARED_CLASSES, MemberCategory.DECLARED_FIELDS)));
71+
.forEach(type -> reflectionHints.registerType(type,
72+
builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_CONSTRUCTORS,
73+
MemberCategory.INVOKE_DECLARED_METHODS, MemberCategory.INTROSPECT_PUBLIC_METHODS,
74+
MemberCategory.DECLARED_CLASSES, MemberCategory.DECLARED_FIELDS)));
7575

7676
// These are inaccessible interfaces/classes in a normal scenario, thus using the
7777
// String version,
@@ -83,15 +83,16 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
8383
"org.apache.pulsar.shade.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField",
8484
"org.apache.pulsar.shade.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField",
8585
"org.apache.pulsar.shade.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField")
86-
.forEach(typeName -> reflectionHints.registerTypeIfPresent(classLoader, typeName,
87-
MemberCategory.DECLARED_FIELDS));
86+
.forEach(typeName -> reflectionHints.registerTypeIfPresent(classLoader, typeName,
87+
MemberCategory.DECLARED_FIELDS));
8888

89-
Stream.of("reactor.core.publisher.Flux", "com.github.benmanes.caffeine.cache.SSMSA",
90-
"com.github.benmanes.caffeine.cache.PSAMS", "com.github.benmanes.caffeine.cache.SSLMSA",
91-
"com.github.benmanes.caffeine.cache.PSAMW")
92-
.forEach(typeName -> reflectionHints.registerTypeIfPresent(classLoader, typeName,
93-
MemberCategory.INVOKE_DECLARED_CONSTRUCTORS, MemberCategory.INVOKE_DECLARED_METHODS,
94-
MemberCategory.INTROSPECT_PUBLIC_METHODS));
89+
Stream
90+
.of("reactor.core.publisher.Flux", "com.github.benmanes.caffeine.cache.SSMSA",
91+
"com.github.benmanes.caffeine.cache.PSAMS", "com.github.benmanes.caffeine.cache.SSLMSA",
92+
"com.github.benmanes.caffeine.cache.PSAMW")
93+
.forEach(typeName -> reflectionHints.registerTypeIfPresent(classLoader, typeName,
94+
MemberCategory.INVOKE_DECLARED_CONSTRUCTORS, MemberCategory.INVOKE_DECLARED_METHODS,
95+
MemberCategory.INTROSPECT_PUBLIC_METHODS));
9596

9697
// Registering JDK dynamic proxies for these interfaces. Since the Connection
9798
// interface is protected,
@@ -101,12 +102,13 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
101102
// registered using the
102103
// string version of the API because all of them need to be registered through a
103104
// single call.
104-
hints.proxies().registerJdkProxy(TypeReference.of("org.apache.pulsar.shade.io.netty.util.TimerTask"),
105-
TypeReference.of("org.apache.pulsar.client.impl.ConnectionHandler$Connection"),
106-
TypeReference.of("org.apache.pulsar.client.api.Producer"),
107-
TypeReference.of("org.springframework.aop.SpringProxy"),
108-
TypeReference.of("org.springframework.aop.framework.Advised"),
109-
TypeReference.of("org.springframework.core.DecoratingProxy"));
105+
hints.proxies()
106+
.registerJdkProxy(TypeReference.of("org.apache.pulsar.shade.io.netty.util.TimerTask"),
107+
TypeReference.of("org.apache.pulsar.client.impl.ConnectionHandler$Connection"),
108+
TypeReference.of("org.apache.pulsar.client.api.Producer"),
109+
TypeReference.of("org.springframework.aop.SpringProxy"),
110+
TypeReference.of("org.springframework.aop.framework.Advised"),
111+
TypeReference.of("org.springframework.core.DecoratingProxy"));
110112
}
111113

112114
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageHandler(
120120
MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters();
121121
MethodParameter messageParameter = null;
122122
Optional<MethodParameter> parameter = Arrays.stream(methodParameters)
123-
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
124-
|| !methodParameter1.getParameterType().equals(Acknowledgement.class)
125-
|| !methodParameter1.hasParameterAnnotation(Header.class))
126-
.findFirst();
123+
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
124+
|| !methodParameter1.getParameterType().equals(Acknowledgement.class)
125+
|| !methodParameter1.hasParameterAnnotation(Header.class))
126+
.findFirst();
127127
long count = Arrays.stream(methodParameters)
128-
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
129-
&& !methodParameter1.getParameterType().equals(Acknowledgement.class)
130-
&& !methodParameter1.hasParameterAnnotation(Header.class))
131-
.count();
128+
.filter(methodParameter1 -> !methodParameter1.getParameterType().equals(Consumer.class)
129+
&& !methodParameter1.getParameterType().equals(Acknowledgement.class)
130+
&& !methodParameter1.hasParameterAnnotation(Header.class))
131+
.count();
132132
Assert.isTrue(count == 1, "More than 1 expected payload types found");
133133
if (parameter.isPresent()) {
134134
messageParameter = parameter.get();
@@ -140,7 +140,7 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageHandler(
140140
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
141141
ResolvableType messageType = resolvableType(messageParameter);
142142
schemaResolver.resolveSchema(schemaType, messageType)
143-
.ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema));
143+
.ifResolved(schema -> pulsarContainerProperties.setSchema((Schema) schema));
144144

145145
// Make sure the schemaType is updated to match the current schema
146146
if (pulsarContainerProperties.getSchema() != null) {
@@ -154,7 +154,7 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageHandler(
154154
|| !ObjectUtils.isEmpty(pulsarContainerProperties.getTopics());
155155
if (!hasTopicInfo) {
156156
topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null)
157-
.ifResolved((topic) -> pulsarContainerProperties.setTopics(Collections.singleton(topic)));
157+
.ifResolved((topic) -> pulsarContainerProperties.setTopics(Collections.singleton(topic)));
158158
}
159159

160160
ReactiveMessageConsumerBuilderCustomizer<V> customizer1 = b -> b.deadLetterPolicy(this.deadLetterPolicy);
@@ -189,7 +189,7 @@ private boolean isContainerType(Class<?> rawClass) {
189189

190190
protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringMessageAdapter<V> messageListener) {
191191
InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory
192-
.createInvocableHandlerMethod(getBean(), getMethod());
192+
.createInvocableHandlerMethod(getBean(), getMethod());
193193
return new HandlerAdapter(invocableHandlerMethod);
194194
}
195195

0 commit comments

Comments
 (0)