Skip to content

Commit 70e9e60

Browse files
authored
Allow custom object mapper for messages (#755)
This commit adds support for a user-provided Jackson ObjectMapper to be used when de/serializing JSON messages. Additionally, adds Gradle test fixtures to the spring-pulsar module and deprecates the UserRecord and UserPojo in spring-pulsar-test in favor of their equivalent in the test fixture. See #723
1 parent 3fcb22d commit 70e9e60

File tree

32 files changed

+864
-50
lines changed

32 files changed

+864
-50
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*** xref:reference/reactive-pulsar/reactive-message-consumption.adoc[]
2222
*** xref:reference/tombstones-reactive.adoc[]
2323
** xref:reference/topic-resolution.adoc[]
24+
** xref:reference/custom-object-mapper.adoc[]
2425
** xref:reference/pulsar-admin.adoc[]
2526
** xref:reference/pulsar-function.adoc[]
2627
** xref:reference/observability.adoc[]
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
[[custom-object-mapper]]
2+
= Custom Object Mapper
3+
include::../attributes/attributes.adoc[]
4+
5+
Pulsar uses an internal Jackson `ObjectMapper` when de/serializing JSON messages.
6+
If you instead want to provide your own object mapper instance, you can register a `SchemaResolverCustomizer` and set your mapper on the `DefaultSchemaResolver` as follows:
7+
8+
[source,java,indent=0,subs="verbatim"]
9+
----
10+
@Bean
11+
SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
12+
return (DefaultSchemaResolver schemaResolver) -> {
13+
var myObjectMapper = obtainMyObjectMapper();
14+
schemaResolver.setObjectMapper(myObjectMapper);
15+
};
16+
}
17+
----
18+
19+
This results in your object mapper being used to de/serialize all JSON messages that go through the schema resolution process (i.e. in cases where you do not pass a schema in directly when producing/consuming messages).
20+
21+
Under the hood, the resolver creates a special JSON schema which leverages the custom mapper and is used as the schema for all resolved JSON messages.
22+
23+
If you need to pass schema instances directly you can use the `JSONSchemaUtil` to create schemas that respect the custom mapper.
24+
The following example shows how to do this when sending a message with the `PulsarTemplate` variant that takes a schema parameter:
25+
26+
[source,java,indent=0,subs="verbatim"]
27+
----
28+
void sendMessage(PulsarTemplate<MyPojo> template, MyPojo toSend) {
29+
var myObjectMapper = obtainMyObjectMapper();
30+
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(MyPojo.class, myObjectMapper);
31+
template.send(toSend, schema);
32+
}
33+
----
34+
35+
36+
[CAUTION]
37+
====
38+
Pulsar configures its default object mapper in a particular way.
39+
Unless you have a specific reason to not do so, it is highly recommended that you configure your mapper with these same options as follows:
40+
[source,java,indent=0,subs="verbatim"]
41+
----
42+
myObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
43+
myObjectMapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, false);
44+
myObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
45+
----
46+
47+
====
48+
NOTE: A later version of the framework may instead provide a customizer that operates on the default mapper rather than requiring a separate instance.

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
= What's new?
22

3+
[[what-s-new-in-1-2-since-1-1]]
4+
== What's New in 1.2 Since 1.1
5+
:page-section-summary-toc: 1
6+
7+
This section covers the changes made from version 1.1 to version 1.2.
8+
9+
=== Custom Object Mapper
10+
You can provide your own Jackson `ObjectMapper` that Pulsar will use when producing and consuming JSON messages.
11+
See xref:./reference/custom-object-mapper.adoc[Custom Object Mapper] for more details.
12+
313
[[what-s-new-in-1-1-since-1-0]]
414
== What's New in 1.1 Since 1.0
515
:page-section-summary-toc: 1

spring-pulsar-reactive/spring-pulsar-reactive.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
optional libs.json.path
3434

3535
testImplementation project(':spring-pulsar-test')
36+
testImplementation(testFixtures(project(":spring-pulsar")))
3637
testRuntimeOnly libs.logback.classic
3738
testImplementation 'io.projectreactor:reactor-test'
3839
testImplementation 'org.assertj:assertj-core'

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
import org.springframework.lang.Nullable;
4848
import org.springframework.pulsar.core.DefaultSchemaResolver;
4949
import org.springframework.pulsar.core.DefaultTopicResolver;
50+
import org.springframework.pulsar.core.JSONSchemaUtil;
51+
import org.springframework.pulsar.test.model.UserRecord;
52+
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
5053
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
51-
import org.springframework.pulsar.test.support.model.UserRecord;
5254
import org.springframework.util.function.ThrowingConsumer;
5355

5456
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -332,6 +334,25 @@ void withJsonSchema() throws Exception {
332334

333335
}
334336

337+
@Nested
338+
class CustomObjectMapperTests {
339+
340+
@Test
341+
void sendWithCustomJsonSchema() throws Exception {
342+
// Prepare the schema with custom object mapper
343+
var objectMapper = UserRecordObjectMapper.withSer();
344+
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(UserRecord.class, objectMapper);
345+
var topic = "rptt-custom-object-mapper-topic";
346+
var user = new UserRecord("elFoo", 21);
347+
// serializer adds '-ser' to name and 10 to age
348+
var expectedUser = new UserRecord("elFoo-ser", 31);
349+
ThrowingConsumer<ReactivePulsarTemplate<UserRecord>> sendFunction = (
350+
template) -> template.send(topic, user, schema).subscribe();
351+
sendAndConsume(sendFunction, topic, schema, expectedUser, false);
352+
}
353+
354+
}
355+
335356
public static class Foo {
336357

337358
private String foo;

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import org.apache.pulsar.client.api.DeadLetterPolicy;
2728
import org.apache.pulsar.client.api.PulsarClient;
@@ -37,10 +38,13 @@
3738
import org.junit.jupiter.api.Test;
3839

3940
import org.springframework.core.log.LogAccessor;
41+
import org.springframework.pulsar.core.JSONSchemaUtil;
4042
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
4143
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory;
4244
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
4345
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
46+
import org.springframework.pulsar.test.model.UserRecord;
47+
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
4448
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
4549

4650
import reactor.core.publisher.Flux;
@@ -306,28 +310,69 @@ void deadLetterTopicCustomizer() throws Exception {
306310
}
307311
}
308312

313+
@Test
314+
void oneByOneMessageHandlerWithCustomObjectMapper() throws Exception {
315+
var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
316+
ReactivePulsarMessageListenerContainer<UserRecord> container = null;
317+
try {
318+
// Prepare the schema with custom object mapper
319+
var objectMapper = UserRecordObjectMapper.withDeser();
320+
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(UserRecord.class, objectMapper);
321+
322+
var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
323+
var topic = topicNameForTest("com-topic");
324+
var consumerFactory = createAndPrepareConsumerFactory(topic, schema, reactivePulsarClient);
325+
var containerProperties = new ReactivePulsarContainerProperties<UserRecord>();
326+
containerProperties.setSchema(schema);
327+
var latch = new CountDownLatch(1);
328+
AtomicReference<UserRecord> consumedRecordRef = new AtomicReference<>();
329+
containerProperties.setMessageHandler((ReactivePulsarOneByOneMessageHandler<UserRecord>) (msg) -> {
330+
consumedRecordRef.set(msg.getValue());
331+
return Mono.fromRunnable(latch::countDown);
332+
});
333+
container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties);
334+
container.start();
335+
336+
var sentUserRecord = new UserRecord("person", 51);
337+
// deser adds '-deser' to name and 5 to age
338+
var expectedConsumedUser = new UserRecord("person-deser", 56);
339+
createPulsarTemplate(topic, reactivePulsarClient).send(sentUserRecord).subscribe();
340+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
341+
assertThat(consumedRecordRef).hasValue(expectedConsumedUser);
342+
}
343+
finally {
344+
safeStopContainer(container);
345+
pulsarClient.close();
346+
}
347+
}
348+
309349
private String topicNameForTest(String suffix) {
310350
return "drpmlct-" + suffix;
311351
}
312352

313353
private DefaultReactivePulsarConsumerFactory<String> createAndPrepareConsumerFactory(String topic,
314354
ReactivePulsarClient reactivePulsarClient) {
315-
ReactiveMessageConsumerBuilderCustomizer<String> defaultConfig = (builder) -> {
355+
return this.createAndPrepareConsumerFactory(topic, Schema.STRING, reactivePulsarClient);
356+
}
357+
358+
private <T> DefaultReactivePulsarConsumerFactory<T> createAndPrepareConsumerFactory(String topic, Schema<T> schema,
359+
ReactivePulsarClient reactivePulsarClient) {
360+
ReactiveMessageConsumerBuilderCustomizer<T> defaultConfig = (builder) -> {
316361
builder.topic(topic);
317362
builder.subscriptionName(topic + "-sub");
318363
};
319-
var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, List.of(defaultConfig));
364+
var consumerFactory = new DefaultReactivePulsarConsumerFactory<T>(reactivePulsarClient, List.of(defaultConfig));
320365
// Ensure subscription is created
321-
consumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(5));
366+
consumerFactory.createConsumer(schema).consumeNothing().block(Duration.ofSeconds(5));
322367
return consumerFactory;
323368
}
324369

325-
private ReactivePulsarTemplate<String> createPulsarTemplate(String topic,
370+
private <T> ReactivePulsarTemplate<T> createPulsarTemplate(String topic,
326371
ReactivePulsarClient reactivePulsarClient) {
327-
var producerFactory = DefaultReactivePulsarSenderFactory.<String>builderFor(reactivePulsarClient)
372+
var producerFactory = DefaultReactivePulsarSenderFactory.<T>builderFor(reactivePulsarClient)
328373
.withDefaultTopic(topic)
329374
.build();
330-
return new ReactivePulsarTemplate<>(producerFactory);
375+
return new ReactivePulsarTemplate<T>(producerFactory);
331376
}
332377

333378
private void safeStopContainer(ReactivePulsarMessageListenerContainer<?> container) {

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerAutoConsumeSchemaTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
4949
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
5050
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig;
51-
import org.springframework.pulsar.test.support.model.UserPojo;
52-
import org.springframework.pulsar.test.support.model.UserRecord;
51+
import org.springframework.pulsar.test.model.UserPojo;
52+
import org.springframework.pulsar.test.model.UserRecord;
5353
import org.springframework.test.context.ContextConfiguration;
5454

5555
import reactor.core.publisher.Mono;

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@
7474
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
7575
import org.springframework.pulsar.reactive.support.MessageUtils;
7676
import org.springframework.pulsar.support.PulsarHeaders;
77-
import org.springframework.pulsar.test.support.model.UserPojo;
78-
import org.springframework.pulsar.test.support.model.UserRecord;
77+
import org.springframework.pulsar.test.model.UserPojo;
78+
import org.springframework.pulsar.test.model.UserRecord;
7979
import org.springframework.test.context.ContextConfiguration;
8080
import org.springframework.test.util.ReflectionTestUtils;
8181
import org.springframework.util.ObjectUtils;

spring-pulsar-sample-apps/sample-imperative-produce-consume/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ ext['pulsar.version'] = "${pulsarVersion}"
2121
dependencies {
2222
implementation 'org.springframework.boot:spring-boot-starter-pulsar'
2323
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
24-
testImplementation project(':spring-pulsar-test')
24+
// temporary until JsonSchemaUtil published
25+
implementation project(':spring-pulsar')
26+
implementation(testFixtures(project(":spring-pulsar")))
27+
implementation project(':spring-pulsar-test')
2528
testRuntimeOnly 'ch.qos.logback:logback-classic'
2629
testImplementation "org.springframework.boot:spring-boot-starter-test"
2730
testImplementation "org.springframework.boot:spring-boot-testcontainers"

spring-pulsar-sample-apps/sample-imperative-produce-consume/src/main/java/com/example/ImperativeProduceAndConsumeApp.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727
import org.springframework.context.annotation.Bean;
2828
import org.springframework.context.annotation.Configuration;
2929
import org.springframework.pulsar.annotation.PulsarListener;
30+
import org.springframework.pulsar.core.DefaultSchemaResolver;
3031
import org.springframework.pulsar.core.PulsarTemplate;
3132
import org.springframework.pulsar.core.PulsarTopic;
33+
import org.springframework.pulsar.core.SchemaResolver;
34+
import org.springframework.pulsar.test.model.UserRecord;
35+
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
3236

3337
@SpringBootApplication
3438
public class ImperativeProduceAndConsumeApp {
@@ -55,7 +59,7 @@ ApplicationRunner sendPrimitiveMessagesToPulsarTopic(PulsarTemplate<String> temp
5559
};
5660
}
5761

58-
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
62+
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
5963
void consumePrimitiveMessagesFromPulsarTopic(String msg) {
6064
LOG.info("++++++CONSUME {}------", msg);
6165
}
@@ -79,7 +83,7 @@ ApplicationRunner sendComplexMessagesToPulsarTopic(PulsarTemplate<Foo> template)
7983
};
8084
}
8185

82-
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
86+
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
8387
void consumeComplexMessagesFromPulsarTopic(Foo msg) {
8488
LOG.info("++++++CONSUME {}------", msg);
8589
}
@@ -108,7 +112,7 @@ ApplicationRunner sendPartitionedMessagesToPulsarTopic(PulsarTemplate<String> te
108112
};
109113
}
110114

111-
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
115+
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
112116
void consumePartitionedMessagesFromPulsarTopic(String msg) {
113117
LOG.info("++++++CONSUME {}------", msg);
114118
}
@@ -132,7 +136,7 @@ ApplicationRunner sendBatchMessagesToPulsarTopic(PulsarTemplate<Foo> template) {
132136
};
133137
}
134138

135-
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub", batch = true)
139+
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub", batch = true)
136140
void consumeBatchMessagesFromPulsarTopic(List<Foo> messages) {
137141
messages.forEach((msg) -> LOG.info("++++++CONSUME {}------", msg));
138142
}
@@ -162,6 +166,38 @@ void consumeBarWithoutTopicOrSchema(Bar msg) {
162166

163167
}
164168

169+
@Configuration(proxyBeanMethods = false)
170+
static class ProduceConsumeCustomObjectMapper {
171+
172+
private static final String TOPIC = "produce-consume-custom-object-mapper";
173+
174+
@Bean
175+
SchemaResolver.SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
176+
return (DefaultSchemaResolver schemaResolver) -> {
177+
var objectMapper = UserRecordObjectMapper.withSerAndDeser();
178+
schemaResolver.setObjectMapper(objectMapper);
179+
};
180+
}
181+
182+
@Bean
183+
ApplicationRunner sendWithCustomObjectMapper(PulsarTemplate<UserRecord> template) {
184+
return (args) -> {
185+
for (int i = 0; i < 10; i++) {
186+
var user = new UserRecord("user-" + i, 30);
187+
template.send(TOPIC, user);
188+
LOG.info("++++++PRODUCE {}------", user);
189+
}
190+
};
191+
}
192+
193+
@PulsarListener(topics = TOPIC)
194+
void consumeWithCustomObjectMapper(UserRecord user) {
195+
LOG.info("++++++CONSUME {}------", user);
196+
}
197+
198+
}
199+
200+
165201
record Foo(String name, Integer value) {
166202
}
167203

0 commit comments

Comments
 (0)