Skip to content

Commit fe07a7a

Browse files
onobcjitokim
andauthored
Allow custom object mapper for headers (#762)
This commit adds support for a user-provided Jackson ObjectMapper to be used when deserializing JSON header values. See #723 Co-authored-by: Jihoon Kim <[email protected]>
1 parent 5637df2 commit fe07a7a

File tree

14 files changed

+614
-84
lines changed

14 files changed

+614
-84
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ The `JsonPulsarHeaderMapper` has a property called `addToStringClasses()` that l
4343
During inbound mapping, they are mapped as `String`.
4444
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.
4545

46+
===== Custom ObjectMapper
47+
The JSON mapper uses a reasonable configured Jasckson `ObjectMapper` to handle serialization of header values.
48+
However, to provide a custom object mapper one must simply provide an `ObjectMapper` bean with the name `pulsarHeaderObjectMapper`.
49+
For example:
50+
[source, java]
51+
----
52+
@Configuration(proxyBeanMethods = false)
53+
static class PulsarHeadersCustomObjectMapperTestConfig {
54+
55+
@Bean(name = "pulsarHeaderObjectMapper")
56+
ObjectMapper customObjectMapper() {
57+
var objectMapper = new ObjectMapper();
58+
// do things with your special header object mapper here
59+
return objectMapper;
60+
}
61+
}
62+
----
63+
4664
=== Inbound/Outbound Patterns
4765
On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`.
4866
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata.

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.springframework.core.MethodParameter;
3232
import org.springframework.core.ResolvableType;
3333
import org.springframework.core.log.LogAccessor;
34-
import org.springframework.expression.BeanResolver;
3534
import org.springframework.lang.Nullable;
3635
import org.springframework.messaging.converter.SmartMessageConverter;
3736
import org.springframework.messaging.handler.annotation.Header;
@@ -53,6 +52,7 @@
5352
import org.springframework.util.Assert;
5453
import org.springframework.util.ObjectUtils;
5554

55+
import com.fasterxml.jackson.databind.ObjectMapper;
5656
import reactor.core.publisher.Flux;
5757

5858
/**
@@ -62,6 +62,7 @@
6262
* @param <V> Message payload type
6363
* @author Christophe Bornet
6464
* @author Chris Bono
65+
* @author Jihoon Kim
6566
*/
6667
public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePulsarListenerEndpoint<V> {
6768

@@ -71,6 +72,8 @@ public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePul
7172

7273
private Method method;
7374

75+
private ObjectMapper objectMapper;
76+
7477
private MessageHandlerMethodFactory messageHandlerMethodFactory;
7578

7679
private SmartMessageConverter messagingConverter;
@@ -209,28 +212,33 @@ protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringM
209212
@SuppressWarnings({ "unchecked", "rawtypes" })
210213
protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageListenerInstance(
211214
@Nullable MessageConverter messageConverter) {
212-
213215
AbstractPulsarMessageToSpringMessageAdapter<V> listener;
214216
if (isFluxListener()) {
215217
listener = new PulsarReactiveStreamingMessagingMessageListenerAdapter<>(this.bean, this.method);
216218
}
217219
else {
218220
listener = new PulsarReactiveOneByOneMessagingMessageListenerAdapter<>(this.bean, this.method);
219221
}
220-
221-
if (messageConverter instanceof PulsarMessageConverter) {
222-
listener.setMessageConverter((PulsarMessageConverter) messageConverter);
222+
if (messageConverter instanceof PulsarMessageConverter pulsarMessageConverter) {
223+
listener.setMessageConverter(pulsarMessageConverter);
223224
}
224225
if (this.messagingConverter != null) {
225226
listener.setMessagingConverter(this.messagingConverter);
226227
}
227-
BeanResolver resolver = getBeanResolver();
228+
if (this.objectMapper != null) {
229+
listener.setObjectMapper(this.objectMapper);
230+
}
231+
var resolver = getBeanResolver();
228232
if (resolver != null) {
229233
listener.setBeanResolver(resolver);
230234
}
231235
return listener;
232236
}
233237

238+
public void setObjectMapper(ObjectMapper objectMapper) {
239+
this.objectMapper = objectMapper;
240+
}
241+
234242
public void setMessagingConverter(SmartMessageConverter messagingConverter) {
235243
this.messagingConverter = messagingConverter;
236244
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.format.FormatterRegistry;
4747
import org.springframework.lang.Nullable;
4848
import org.springframework.pulsar.annotation.AbstractPulsarAnnotationsBeanPostProcessor;
49+
import org.springframework.pulsar.annotation.PulsarHeaderObjectMapperUtils;
4950
import org.springframework.pulsar.annotation.PulsarListenerConfigurer;
5051
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
5152
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
@@ -79,6 +80,7 @@
7980
* @param <V> the payload type.
8081
* @author Christophe Bornet
8182
* @author Soby Chacko
83+
* @author Jihoon Kim
8284
* @see ReactivePulsarListener
8385
* @see EnableReactivePulsar
8486
* @see PulsarListenerConfigurer
@@ -281,6 +283,9 @@ private void resolveDeadLetterPolicy(MethodReactivePulsarListenerEndpoint<?> end
281283

282284
@SuppressWarnings("unchecked")
283285
protected void postProcessEndpointsBeforeRegistration() {
286+
PulsarHeaderObjectMapperUtils.customMapper(this.beanFactory)
287+
.ifPresent((objectMapper) -> this.processedEndpoints
288+
.forEach((endpoint) -> endpoint.setObjectMapper(objectMapper)));
284289
if (this.processedEndpoints.size() == 1) {
285290
MethodReactivePulsarListenerEndpoint<?> endpoint = this.processedEndpoints.get(0);
286291
if (endpoint.getConsumerCustomizer() != null) {

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 122 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.beans.factory.annotation.Autowired;
5151
import org.springframework.context.annotation.Bean;
5252
import org.springframework.context.annotation.Configuration;
53+
import org.springframework.messaging.MessageHeaders;
5354
import org.springframework.messaging.handler.annotation.Header;
5455
import org.springframework.pulsar.annotation.EnablePulsar;
5556
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
@@ -68,18 +69,23 @@
6869
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
6970
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
7071
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.BasicListenersTestCases.BasicListenersTestCasesConfig;
72+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
7173
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
7274
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
7375
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
7476
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
7577
import org.springframework.pulsar.reactive.support.MessageUtils;
7678
import org.springframework.pulsar.support.PulsarHeaders;
79+
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
7780
import org.springframework.pulsar.test.model.UserPojo;
7881
import org.springframework.pulsar.test.model.UserRecord;
82+
import org.springframework.pulsar.test.model.json.UserRecordDeserializer;
7983
import org.springframework.test.context.ContextConfiguration;
8084
import org.springframework.test.util.ReflectionTestUtils;
8185
import org.springframework.util.ObjectUtils;
8286

87+
import com.fasterxml.jackson.databind.ObjectMapper;
88+
import com.fasterxml.jackson.databind.module.SimpleModule;
8389
import reactor.core.publisher.Flux;
8490
import reactor.core.publisher.Mono;
8591

@@ -541,40 +547,67 @@ Mono<Void> listenString(String ignored) {
541547
class PulsarHeadersTest {
542548

543549
static CountDownLatch simpleListenerLatch = new CountDownLatch(1);
550+
static CountDownLatch simpleListenerPojoLatch = new CountDownLatch(1);
544551
static CountDownLatch pulsarMessageListenerLatch = new CountDownLatch(1);
545552
static CountDownLatch springMessagingMessageListenerLatch = new CountDownLatch(1);
546553

547554
static AtomicReference<String> capturedData = new AtomicReference<>();
548555
static AtomicReference<MessageId> messageId = new AtomicReference<>();
549556
static AtomicReference<String> topicName = new AtomicReference<>();
550557
static AtomicReference<String> fooValue = new AtomicReference<>();
558+
static AtomicReference<Object> pojoValue = new AtomicReference<>();
551559
static AtomicReference<byte[]> rawData = new AtomicReference<>();
552560

553561
@Test
554562
void simpleListenerWithHeaders() throws Exception {
555-
MessageId messageId = pulsarTemplate.newMessage("hello-simple-listener")
563+
var topic = "rplt-simpleListenerWithHeaders";
564+
var msg = "hello-%s".formatted(topic);
565+
MessageId messageId = pulsarTemplate.newMessage(msg)
556566
.withMessageCustomizer(messageBuilder -> messageBuilder.property("foo", "simpleListenerWithHeaders"))
557-
.withTopic("simpleListenerWithHeaders")
567+
.withTopic(topic)
558568
.send();
559569
assertThat(simpleListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
560-
assertThat(capturedData.get()).isEqualTo("hello-simple-listener");
561-
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
562-
assertThat(topicName.get()).isEqualTo("persistent://public/default/simpleListenerWithHeaders");
563-
assertThat(fooValue.get()).isEqualTo("simpleListenerWithHeaders");
564-
assertThat(rawData.get()).isEqualTo("hello-simple-listener".getBytes(StandardCharsets.UTF_8));
570+
assertThat(PulsarHeadersTest.messageId).hasValue(messageId);
571+
assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic));
572+
assertThat(capturedData).hasValue(msg);
573+
assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8));
574+
assertThat(fooValue).hasValue("simpleListenerWithHeaders");
575+
}
576+
577+
@Test
578+
void simpleListenerWithPojoHeader() throws Exception {
579+
var topic = "rplt-simpleListenerWithPojoHeader";
580+
var msg = "hello-%s".formatted(topic);
581+
// In order to send complex headers (pojo) must manually map and set each
582+
// header as follows
583+
var user = new UserRecord("that", 100);
584+
var headers = new HashMap<String, Object>();
585+
headers.put("user", user);
586+
var headerMapper = JsonPulsarHeaderMapper.builder().build();
587+
var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers));
588+
MessageId messageId = pulsarTemplate.newMessage(msg)
589+
.withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property))
590+
.withTopic(topic)
591+
.send();
592+
assertThat(simpleListenerPojoLatch.await(10, TimeUnit.SECONDS)).isTrue();
593+
assertThat(PulsarHeadersTest.messageId).hasValue(messageId);
594+
assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic));
595+
assertThat(pojoValue).hasValue(user);
596+
assertThat(capturedData).hasValue(msg);
597+
assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8));
565598
}
566599

567600
@Test
568601
void pulsarMessageListenerWithHeaders() throws Exception {
569602
MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-message-listener")
570603
.withMessageCustomizer(
571604
messageBuilder -> messageBuilder.property("foo", "pulsarMessageListenerWithHeaders"))
572-
.withTopic("pulsarMessageListenerWithHeaders")
605+
.withTopic("rplt-pulsarMessageListenerWithHeaders")
573606
.send();
574607
assertThat(pulsarMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
575608
assertThat(capturedData.get()).isEqualTo("hello-pulsar-message-listener");
576609
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
577-
assertThat(topicName.get()).isEqualTo("persistent://public/default/pulsarMessageListenerWithHeaders");
610+
assertThat(topicName.get()).isEqualTo("persistent://public/default/rplt-pulsarMessageListenerWithHeaders");
578611
assertThat(fooValue.get()).isEqualTo("pulsarMessageListenerWithHeaders");
579612
assertThat(rawData.get()).isEqualTo("hello-pulsar-message-listener".getBytes(StandardCharsets.UTF_8));
580613
}
@@ -584,13 +617,13 @@ void springMessagingMessageListenerWithHeaders() throws Exception {
584617
MessageId messageId = pulsarTemplate.newMessage("hello-spring-messaging-message-listener")
585618
.withMessageCustomizer(
586619
messageBuilder -> messageBuilder.property("foo", "springMessagingMessageListenerWithHeaders"))
587-
.withTopic("springMessagingMessageListenerWithHeaders")
620+
.withTopic("rplt-springMessagingMessageListenerWithHeaders")
588621
.send();
589622
assertThat(springMessagingMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
590623
assertThat(capturedData.get()).isEqualTo("hello-spring-messaging-message-listener");
591624
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
592625
assertThat(topicName.get())
593-
.isEqualTo("persistent://public/default/springMessagingMessageListenerWithHeaders");
626+
.isEqualTo("persistent://public/default/rplt-springMessagingMessageListenerWithHeaders");
594627
assertThat(fooValue.get()).isEqualTo("springMessagingMessageListenerWithHeaders");
595628
assertThat(rawData.get())
596629
.isEqualTo("hello-spring-messaging-message-listener".getBytes(StandardCharsets.UTF_8));
@@ -600,8 +633,9 @@ void springMessagingMessageListenerWithHeaders() throws Exception {
600633
@Configuration
601634
static class PulsarListenerWithHeadersConfig {
602635

603-
@ReactivePulsarListener(subscriptionName = "simple-listener-with-headers-sub",
604-
topics = "simpleListenerWithHeaders", consumerCustomizer = "subscriptionInitialPositionEarliest")
636+
@ReactivePulsarListener(topics = "rplt-simpleListenerWithHeaders",
637+
subscriptionName = "rplt-simple-listener-with-headers-sub",
638+
consumerCustomizer = "subscriptionInitialPositionEarliest")
605639
Mono<Void> simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
606640
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
607641
@Header("foo") String foo) {
@@ -614,8 +648,23 @@ Mono<Void> simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_
614648
return Mono.empty();
615649
}
616650

617-
@ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub",
618-
topics = "pulsarMessageListenerWithHeaders",
651+
@ReactivePulsarListener(topics = "rplt-simpleListenerWithPojoHeader",
652+
subscriptionName = "simpleListenerWithPojoHeader-sub",
653+
consumerCustomizer = "subscriptionInitialPositionEarliest")
654+
Mono<Void> simpleListenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
655+
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
656+
@Header("user") UserRecord user) {
657+
capturedData.set(data);
658+
PulsarHeadersTest.messageId.set(messageId);
659+
PulsarHeadersTest.topicName.set(topicName);
660+
pojoValue.set(user);
661+
PulsarHeadersTest.rawData.set(rawData);
662+
simpleListenerPojoLatch.countDown();
663+
return Mono.empty();
664+
}
665+
666+
@ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub",
667+
topics = "rplt-pulsarMessageListenerWithHeaders",
619668
consumerCustomizer = "subscriptionInitialPositionEarliest")
620669
Mono<Void> pulsarMessageListenerWithHeaders(Message<String> data,
621670
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@@ -630,8 +679,8 @@ Mono<Void> pulsarMessageListenerWithHeaders(Message<String> data,
630679
return Mono.empty();
631680
}
632681

633-
@ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub",
634-
topics = "springMessagingMessageListenerWithHeaders",
682+
@ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub",
683+
topics = "rplt-springMessagingMessageListenerWithHeaders",
635684
consumerCustomizer = "subscriptionInitialPositionEarliest")
636685
Mono<Void> springMessagingMessageListenerWithHeaders(org.springframework.messaging.Message<String> data,
637686
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@@ -650,6 +699,62 @@ Mono<Void> springMessagingMessageListenerWithHeaders(org.springframework.messagi
650699

651700
}
652701

702+
@Nested
703+
@ContextConfiguration(classes = PulsarHeadersCustomObjectMapperTestConfig.class)
704+
class PulsarHeadersCustomObjectMapperTest {
705+
706+
private static final String TOPIC = "rplt-listenerWithPojoHeader-custom";
707+
708+
private static final CountDownLatch listenerLatch = new CountDownLatch(1);
709+
710+
private static UserRecord userPassedIntoListener;
711+
712+
@Test
713+
void whenPulsarHeaderObjectMapperIsDefinedThenItIsUsedToDeserializeHeaders() throws Exception {
714+
var msg = "hello-%s".formatted(TOPIC);
715+
// In order to send complex headers (pojo) must manually map and set each
716+
// header as follows
717+
var user = new UserRecord("that", 100);
718+
var headers = new HashMap<String, Object>();
719+
headers.put("user", user);
720+
var headerMapper = JsonPulsarHeaderMapper.builder().build();
721+
var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers));
722+
pulsarTemplate.newMessage(msg)
723+
.withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property))
724+
.withTopic(TOPIC)
725+
.send();
726+
// Custom deser adds suffix to name and bumps age + 5
727+
var expectedUser = new UserRecord(user.name() + "-deser", user.age() + 5);
728+
assertThat(listenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
729+
assertThat(userPassedIntoListener).isEqualTo(expectedUser);
730+
}
731+
732+
@Configuration(proxyBeanMethods = false)
733+
static class PulsarHeadersCustomObjectMapperTestConfig {
734+
735+
@Bean(name = "pulsarHeaderObjectMapper")
736+
ObjectMapper customObjectMapper() {
737+
var objectMapper = new ObjectMapper();
738+
var module = new SimpleModule();
739+
module.addDeserializer(UserRecord.class, new UserRecordDeserializer());
740+
objectMapper.registerModule(module);
741+
return objectMapper;
742+
}
743+
744+
@ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub",
745+
consumerCustomizer = "subscriptionInitialPositionEarliest")
746+
Mono<Void> listenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
747+
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
748+
@Header("user") UserRecord user) {
749+
userPassedIntoListener = user;
750+
listenerLatch.countDown();
751+
return Mono.empty();
752+
}
753+
754+
}
755+
756+
}
757+
653758
@Nested
654759
@ContextConfiguration(classes = PulsarListenerConcurrencyTestCases.TestPulsarListenersForConcurrency.class)
655760
class PulsarListenerConcurrencyTestCases {

0 commit comments

Comments
 (0)