Skip to content

Commit 473642e

Browse files
authored
Log a warning when Lambda customizer used (#879)
This adjusts the `PulsarTemplate` to log a warning when a user provides a `ProducerBuilderCustomizer` that is implemented as a Java Lambda. This is important as the customizer is used as part of the producer cache key and if not implemented properly will effectively disable producer caching and write performance will degrade. See #809
1 parent 3ad7235 commit 473642e

File tree

11 files changed

+710
-6
lines changed

11 files changed

+710
-6
lines changed

integration-tests/integration-tests.gradle

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@ dependencies {
2222
intTestImplementation 'org.testcontainers:rabbitmq'
2323
intTestImplementation libs.spring.boot.starter.test
2424
intTestImplementation libs.spring.boot.starter.amqp
25-
intTestImplementation libs.spring.boot.starter.pulsar
26-
intTestImplementation libs.spring.boot.starter.pulsar.reactive
25+
// Exclude spring-pulsar from boot in order to use current changes in project
26+
intTestImplementation(libs.spring.boot.starter.pulsar) {
27+
exclude group: "org.springframework.pulsar", module: "spring-pulsar"
28+
}
29+
intTestImplementation(libs.spring.boot.starter.pulsar.reactive) {
30+
exclude group: "org.springframework.pulsar", module: "spring-pulsar-reactive"
31+
}
2732
intTestImplementation libs.spring.boot.testcontainers
2833
intTestRuntimeOnly 'org.junit.platform:junit-platform-launcher'
2934
intTestRuntimeOnly libs.logback.classic
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.logging;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.regex.Pattern;
22+
23+
import org.junit.jupiter.api.Nested;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.testcontainers.containers.PulsarContainer;
27+
import org.testcontainers.junit.jupiter.Container;
28+
import org.testcontainers.junit.jupiter.Testcontainers;
29+
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.boot.SpringBootConfiguration;
32+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
33+
import org.springframework.boot.test.context.SpringBootTest;
34+
import org.springframework.boot.test.context.TestConfiguration;
35+
import org.springframework.boot.test.system.CapturedOutput;
36+
import org.springframework.boot.test.system.OutputCaptureExtension;
37+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
40+
import org.springframework.pulsar.core.PulsarTemplate;
41+
import org.springframework.pulsar.core.PulsarTemplateCustomizer;
42+
import org.springframework.pulsar.inttest.logging.PulsarTemplateLambdaWarnLoggerTests.WithWarnLoggerDisabled.WithWarnLoggerDisabledConfig;
43+
import org.springframework.pulsar.inttest.logging.PulsarTemplateLambdaWarnLoggerTests.WithWarnLoggerIncreasedFrequency.WithWarnLoggerIncreasedFrequencyConfig;
44+
import org.springframework.pulsar.support.internal.logging.LambdaCustomizerWarnLogger;
45+
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
46+
47+
/**
48+
* Integration tests that covers {@link LambdaCustomizerWarnLogger} and its usage in
49+
* {@link PulsarTemplate} of the following cases: <pre>
50+
* - user customizes the template to disable the warn logger
51+
* - user customizes the template to adjust the warn logger frequency
52+
* - template logs warning when a lambda customizer is used as producer customizer
53+
* - template does not log warning when a non-lambda customizer is used
54+
* </pre> The nature of the feature (logging and template customization) lends itself well
55+
* to an integration test w/ help of {@link CapturedOutput} and
56+
* {@link PulsarTemplateCustomizer}.
57+
*
58+
* @author Chris Bono
59+
*/
60+
@Testcontainers(disabledWithoutDocker = true)
61+
@ExtendWith(OutputCaptureExtension.class)
62+
class PulsarTemplateLambdaWarnLoggerTests {
63+
64+
@SuppressWarnings("unused")
65+
@Container
66+
@ServiceConnection
67+
static PulsarContainer PULSAR_CONTAINER = new PulsarContainer(PulsarTestContainerSupport.getPulsarImage());
68+
69+
@Nested
70+
@SpringBootTest(classes = TestAppConfig.class)
71+
@ExtendWith(OutputCaptureExtension.class)
72+
class WithWarnLoggerEnabledByDefault {
73+
74+
@Test
75+
void whenLambdaCustomizerIsUsedThenWarningIsLogged(CapturedOutput output,
76+
@Autowired PulsarTemplate<String> template) {
77+
TestUtils.sendRequestsWithCustomizer("lcwlt-default", 1001, template);
78+
TestUtils.assertThatWarningIsLoggedNumTimes(2, output);
79+
}
80+
81+
}
82+
83+
@Nested
84+
@SpringBootTest(classes = { TestAppConfig.class, WithWarnLoggerIncreasedFrequencyConfig.class })
85+
@ExtendWith(OutputCaptureExtension.class)
86+
class WithWarnLoggerIncreasedFrequency {
87+
88+
@Test
89+
void whenLambdaCustomizerIsUsedThenWarningIsLoggedMoreFrequently(CapturedOutput output,
90+
@Autowired PulsarTemplate<String> template) {
91+
TestUtils.sendRequestsWithCustomizer("lcwlt-adjusted", 1001, template);
92+
TestUtils.assertThatWarningIsLoggedNumTimes(11, output);
93+
}
94+
95+
@TestConfiguration(proxyBeanMethods = false)
96+
static class WithWarnLoggerIncreasedFrequencyConfig {
97+
98+
@Bean
99+
PulsarTemplateCustomizer<?> templateCustomizer() {
100+
return (template) -> template.logWarningForLambdaCustomizer(100);
101+
}
102+
103+
}
104+
105+
}
106+
107+
@Nested
108+
@SpringBootTest(classes = { TestAppConfig.class, WithWarnLoggerDisabledConfig.class })
109+
@ExtendWith(OutputCaptureExtension.class)
110+
class WithWarnLoggerDisabled {
111+
112+
@Test
113+
void whenLambdaCustomizerIsUsedThenWarningIsNotLogged(CapturedOutput output,
114+
@Autowired PulsarTemplate<String> template) {
115+
TestUtils.sendRequestsWithCustomizer("lcwlt-disabled", 1001, template);
116+
TestUtils.assertThatWarningIsLoggedNumTimes(0, output);
117+
}
118+
119+
@TestConfiguration(proxyBeanMethods = false)
120+
static class WithWarnLoggerDisabledConfig {
121+
122+
@Bean
123+
PulsarTemplateCustomizer<?> templateCustomizer() {
124+
return (template) -> template.logWarningForLambdaCustomizer(0);
125+
}
126+
127+
}
128+
129+
}
130+
131+
@Nested
132+
@SpringBootTest(classes = TestAppConfig.class, properties = "spring.pulsar.producer.cache.enabled=false")
133+
@ExtendWith(OutputCaptureExtension.class)
134+
class WithNonCachingProducerFactory {
135+
136+
@Test
137+
void whenLambdaCustomizerIsUsedThenWarningIsNotLogged(CapturedOutput output,
138+
@Autowired PulsarTemplate<String> template) {
139+
TestUtils.sendRequestsWithCustomizer("lcwlt-non-caching", 1001, template);
140+
TestUtils.assertThatWarningIsLoggedNumTimes(0, output);
141+
}
142+
143+
}
144+
145+
@SpringBootConfiguration
146+
@EnableAutoConfiguration
147+
static class TestAppConfig {
148+
149+
}
150+
151+
private static class TestUtils {
152+
153+
private static void sendRequestsWithCustomizer(String testPrefix, int numberOfSends,
154+
PulsarTemplate<String> template) {
155+
sendRequests(testPrefix, numberOfSends, (pb) -> {
156+
}, template);
157+
}
158+
159+
private static void sendRequestsWithoutCustomizer(String testPrefix, int numberOfSends,
160+
PulsarTemplate<String> template) {
161+
sendRequests(testPrefix, numberOfSends, null, template);
162+
}
163+
164+
private static void sendRequests(String testPrefix, int numberOfSends,
165+
ProducerBuilderCustomizer<String> customizer, PulsarTemplate<String> template) {
166+
for (int i = 0; i < numberOfSends; i++) {
167+
var msg = "LambdaCustomizerWarningLog-i:" + i;
168+
var builder = template.newMessage(msg).withTopic("%s-topic".formatted(testPrefix));
169+
if (customizer != null) {
170+
builder.withProducerCustomizer(customizer);
171+
}
172+
builder.send();
173+
}
174+
}
175+
176+
private static void assertThatWarningIsLoggedNumTimes(int expectedNumberOfTimes, CapturedOutput output) {
177+
// pause to make sure log is flushed to console before checking (sanity)
178+
try {
179+
Thread.sleep(1000);
180+
}
181+
catch (InterruptedException e) {
182+
throw new RuntimeException(e);
183+
}
184+
var pattern = Pattern.compile("(Producer customizer \\[.+?\\] is implemented as a Lambda)");
185+
assertThat(output.getAll())
186+
.satisfies((outputStr) -> assertThat(pattern.matcher(outputStr).results().count())
187+
.isEqualTo(expectedNumberOfTimes));
188+
}
189+
190+
}
191+
192+
}

integration-tests/src/intTest/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
<logger name="org.springframework.pulsar.function" level="INFO"/>
1313
<logger name="org.springframework.pulsar.inttest.app" level="INFO"/>
1414
<logger name="org.springframework.pulsar.inttest.config" level="INFO"/>
15+
<logger name="org.springframework.pulsar.inttest.logging" level="INFO"/>
1516
</configuration>

spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
4646
import org.springframework.pulsar.observation.PulsarTemplateObservation;
4747
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
48+
import org.springframework.pulsar.support.internal.logging.LambdaCustomizerWarnLogger;
4849
import org.springframework.pulsar.transaction.PulsarTransactionUtils;
4950
import org.springframework.util.Assert;
5051
import org.springframework.util.CollectionUtils;
@@ -77,6 +78,8 @@ public class PulsarTemplate<T>
7778

7879
private final Map<Thread, Transaction> threadBoundTransactions = new HashMap<>();
7980

81+
private final boolean isProducerFactoryCaching;
82+
8083
/**
8184
* Whether to record observations.
8285
*/
@@ -99,6 +102,12 @@ public class PulsarTemplate<T>
99102

100103
private String beanName = "";
101104

105+
/**
106+
* Logs warning when Lambda is used for producer builder customizer.
107+
*/
108+
@Nullable
109+
private LambdaCustomizerWarnLogger lambdaLogger;
110+
102111
/**
103112
* Transaction settings.
104113
*/
@@ -143,12 +152,18 @@ public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInt
143152
else {
144153
this.interceptorsCustomizers = null;
145154
}
155+
this.isProducerFactoryCaching = (this.producerFactory instanceof CachingPulsarProducerFactory<?>);
156+
this.lambdaLogger = newLambdaWarnLogger(1000);
146157
}
147158

148159
private ProducerBuilderCustomizer<T> adaptInterceptorToCustomizer(ProducerInterceptor interceptor) {
149160
return b -> b.intercept(interceptor);
150161
}
151162

163+
private LambdaCustomizerWarnLogger newLambdaWarnLogger(long frequency) {
164+
return new LambdaCustomizerWarnLogger(this.logger, frequency);
165+
}
166+
152167
@Override
153168
public void setApplicationContext(ApplicationContext applicationContext) {
154169
this.applicationContext = applicationContext;
@@ -163,6 +178,15 @@ public TransactionProperties transactions() {
163178
return this.transactionProps;
164179
}
165180

181+
/**
182+
* How often to log a warning when a Lambda producer builder customizer is used.
183+
* @param frequency how often to log warning (every Nth occurrence) or non-positive to
184+
* not log warning.
185+
*/
186+
public void logWarningForLambdaCustomizer(long frequency) {
187+
this.lambdaLogger = (frequency > 0) ? newLambdaWarnLogger(frequency) : null;
188+
}
189+
166190
/**
167191
* If observations are enabled, attempt to obtain the Observation registry and
168192
* convention.
@@ -359,11 +383,18 @@ private Producer<T> prepareProducerForSend(@Nullable String topic, @Nullable T m
359383
customizers.addAll(this.interceptorsCustomizers);
360384
}
361385
if (producerCustomizer != null) {
386+
possiblyLogWarningOnUsingLambdaCustomizers(producerCustomizer);
362387
customizers.add(producerCustomizer);
363388
}
364389
return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers);
365390
}
366391

392+
private void possiblyLogWarningOnUsingLambdaCustomizers(ProducerBuilderCustomizer<T> producerCustomizer) {
393+
if (this.lambdaLogger != null && this.isProducerFactoryCaching) {
394+
this.lambdaLogger.maybeLog(producerCustomizer);
395+
}
396+
}
397+
367398
/**
368399
* Execute some arbitrary operation(s) on the template and return the result. The
369400
* template is invoked within a local transaction and do not participate in a global

spring-pulsar/src/main/java/org/springframework/pulsar/support/JavaUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
* the singleton {@link #INSTANCE} and then chain calls to the utility methods.
2626
*
2727
* @author Soby Chacko
28+
* @author Chris Bono
2829
*/
2930
public final class JavaUtils {
3031

@@ -50,4 +51,13 @@ public <T> JavaUtils acceptIfNotNull(@Nullable T value, Consumer<T> consumer) {
5051
return this;
5152
}
5253

54+
/**
55+
* Determine if the specified class is a Lambda.
56+
* @param clazz the class to check
57+
* @return whether the specified class is a Lambda
58+
*/
59+
public boolean isLambda(Class<?> clazz) {
60+
return clazz.isSynthetic() && clazz.getName().contains("$$Lambda") && !clazz.isAnonymousClass();
61+
}
62+
5363
}

0 commit comments

Comments
 (0)