Skip to content

Commit 9758e46

Browse files
authored
GH-10264: Add Jackson 3 support to Kafka module
Fixes: #10264 * Update all Kafka components to conditionally use Jackson 3 classes * Migrate test configurations and XML beans to use `JsonKafkaHeaderMapper` * Suppress removal warning in `KafkaProducerMessageHandler` * Replace FQCN with simple reference for `JacksonMessagingUtils` Signed-off-by: Jooyoung Pyoung <[email protected]>
1 parent f1c4e61 commit 9758e46

File tree

10 files changed

+81
-26
lines changed

10 files changed

+81
-26
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.integration.dispatcher.MessageDispatcher;
2626
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2727
import org.springframework.integration.dispatcher.UnicastingDispatcher;
28+
import org.springframework.integration.support.json.JacksonMessagingUtils;
2829
import org.springframework.integration.support.management.ManageableSmartLifecycle;
2930
import org.springframework.kafka.config.KafkaListenerContainerFactory;
3031
import org.springframework.kafka.core.KafkaOperations;
@@ -34,6 +35,7 @@
3435
import org.springframework.kafka.support.Acknowledgment;
3536
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
3637
import org.springframework.kafka.support.JacksonPresent;
38+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
3739
import org.springframework.kafka.support.converter.MessagingMessageConverter;
3840
import org.springframework.kafka.support.converter.RecordMessageConverter;
3941
import org.springframework.messaging.MessageHandler;
@@ -45,6 +47,7 @@
4547
*
4648
* @author Gary Russell
4749
* @author Artem Bilan
50+
* @author Jooyoung Pyoung
4851
*
4952
* @since 5.4
5053
*
@@ -84,7 +87,16 @@ public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerCon
8487
Assert.notNull(factory, "'factory' cannot be null");
8588
this.factory = factory;
8689

87-
if (JacksonPresent.isJackson2Present()) {
90+
if (JacksonPresent.isJackson3Present()) {
91+
var messageConverter = new MessagingMessageConverter();
92+
var headerMapper = new JsonKafkaHeaderMapper();
93+
headerMapper.addTrustedPackages(
94+
JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES
95+
.toArray(new String[0]));
96+
messageConverter.setHeaderMapper(headerMapper);
97+
this.recordListener.setMessageConverter(messageConverter);
98+
}
99+
else if (JacksonPresent.isJackson2Present()) {
88100
var messageConverter = new MessagingMessageConverter();
89101
var headerMapper = new DefaultKafkaHeaderMapper();
90102
headerMapper.addTrustedPackages(

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3838
import org.springframework.integration.support.ErrorMessageUtils;
3939
import org.springframework.integration.support.MessageBuilder;
40+
import org.springframework.integration.support.json.JacksonMessagingUtils;
4041
import org.springframework.kafka.core.KafkaTemplate;
4142
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4243
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -45,6 +46,7 @@
4546
import org.springframework.kafka.support.Acknowledgment;
4647
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
4748
import org.springframework.kafka.support.JacksonPresent;
49+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
4850
import org.springframework.kafka.support.KafkaHeaders;
4951
import org.springframework.kafka.support.converter.ConversionException;
5052
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
@@ -68,6 +70,7 @@
6870
* @author Gary Russell
6971
* @author Artem Bilan
7072
* @author Urs Keller
73+
* @author Jooyoung Pyoung
7174
*
7275
* @since 5.4
7376
*
@@ -108,7 +111,16 @@ public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListene
108111
this.messageListenerContainer.setAutoStartup(false);
109112
this.kafkaTemplate = kafkaTemplate;
110113
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
111-
if (JacksonPresent.isJackson2Present()) {
114+
if (JacksonPresent.isJackson3Present()) {
115+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
116+
JsonKafkaHeaderMapper headerMapper = new JsonKafkaHeaderMapper();
117+
headerMapper.addTrustedPackages(
118+
JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES
119+
.toArray(new String[0]));
120+
messageConverter.setHeaderMapper(headerMapper);
121+
this.listener.setMessageConverter(messageConverter);
122+
}
123+
else if (JacksonPresent.isJackson2Present()) {
112124
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
113125
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
114126
headerMapper.addTrustedPackages(

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
4040
import org.springframework.integration.support.ErrorMessageUtils;
4141
import org.springframework.integration.support.MessageBuilder;
42+
import org.springframework.integration.support.json.JacksonMessagingUtils;
4243
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4344
import org.springframework.kafka.listener.BatchMessageListener;
4445
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -52,6 +53,7 @@
5253
import org.springframework.kafka.support.Acknowledgment;
5354
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5455
import org.springframework.kafka.support.JacksonPresent;
56+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
5557
import org.springframework.kafka.support.KafkaHeaders;
5658
import org.springframework.kafka.support.converter.BatchMessageConverter;
5759
import org.springframework.kafka.support.converter.ConversionException;
@@ -76,6 +78,7 @@
7678
* @author Gary Russell
7779
* @author Artem Bilan
7880
* @author Urs Keller
81+
* @author Jooyoung Pyoung
7982
*
8083
* @since 5.4
8184
*/
@@ -131,11 +134,21 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
131134
this.mode = mode;
132135
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
133136

134-
if (JacksonPresent.isJackson2Present()) {
135-
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
136-
// For consistency with the rest of Spring Integration channel adapters
137-
messageConverter.setGenerateMessageId(true);
138-
messageConverter.setGenerateTimestamp(true);
137+
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
138+
// For consistency with the rest of Spring Integration channel adapters
139+
messageConverter.setGenerateMessageId(true);
140+
messageConverter.setGenerateTimestamp(true);
141+
142+
if (JacksonPresent.isJackson3Present()) {
143+
JsonKafkaHeaderMapper headerMapper = new JsonKafkaHeaderMapper();
144+
headerMapper.addTrustedPackages(
145+
JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES
146+
.toArray(new String[0]));
147+
messageConverter.setHeaderMapper(headerMapper);
148+
this.recordListener.setMessageConverter(messageConverter);
149+
this.batchListener.setMessageConverter(messageConverter);
150+
}
151+
else if (JacksonPresent.isJackson2Present()) {
139152
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
140153
headerMapper.addTrustedPackages(
141154
org.springframework.integration.support.json.JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.integration.core.Pausable;
5959
import org.springframework.integration.endpoint.AbstractMessageSource;
6060
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
61+
import org.springframework.integration.support.json.JacksonMessagingUtils;
6162
import org.springframework.kafka.core.ConsumerFactory;
6263
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
6364
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
@@ -67,6 +68,7 @@
6768
import org.springframework.kafka.support.Acknowledgment;
6869
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
6970
import org.springframework.kafka.support.JacksonPresent;
71+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
7072
import org.springframework.kafka.support.KafkaHeaders;
7173
import org.springframework.kafka.support.KafkaUtils;
7274
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -105,6 +107,7 @@
105107
* @author Anshul Mehra
106108
* @author Christian Tzolov
107109
* @author Ngoc Nhan
110+
* @author Jooyoung Pyoung
108111
*
109112
* @since 5.4
110113
*
@@ -262,7 +265,14 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
262265
messagingMessageConverter.setGenerateMessageId(true);
263266
messagingMessageConverter.setGenerateTimestamp(true);
264267

265-
if (JacksonPresent.isJackson2Present()) {
268+
if (JacksonPresent.isJackson3Present()) {
269+
JsonKafkaHeaderMapper headerMapper = new JsonKafkaHeaderMapper();
270+
headerMapper.addTrustedPackages(
271+
JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES
272+
.toArray(new String[0]));
273+
messagingMessageConverter.setHeaderMapper(headerMapper);
274+
}
275+
else if (JacksonPresent.isJackson2Present()) {
266276
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
267277
headerMapper.addTrustedPackages(
268278
org.springframework.integration.support.json.JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.springframework.kafka.requestreply.RequestReplyFuture;
5454
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5555
import org.springframework.kafka.support.JacksonPresent;
56+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
5657
import org.springframework.kafka.support.KafkaHeaderMapper;
5758
import org.springframework.kafka.support.KafkaHeaders;
5859
import org.springframework.kafka.support.KafkaNull;
@@ -91,6 +92,7 @@
9192
* @author Biju Kunjummen
9293
* @author Tom van den Berge
9394
* @author Ryan Riley
95+
* @author Jooyoung Pyoung
9496
*
9597
* @since 5.4
9698
*/
@@ -169,7 +171,7 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
169171

170172
private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT;
171173

172-
@SuppressWarnings("this-escape")
174+
@SuppressWarnings({"this-escape", "removal"})
173175
public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
174176
Assert.notNull(kafkaTemplate, "kafkaTemplate cannot be null");
175177
this.kafkaTemplate = kafkaTemplate;
@@ -179,7 +181,10 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
179181
updateNotPropagatedHeaders(
180182
new String[] {KafkaHeaders.TOPIC, KafkaHeaders.PARTITION, KafkaHeaders.KEY}, false);
181183
}
182-
if (JacksonPresent.isJackson2Present()) {
184+
if (JacksonPresent.isJackson3Present()) {
185+
this.headerMapper = new JsonKafkaHeaderMapper();
186+
}
187+
else if (JacksonPresent.isJackson2Present()) {
183188
this.headerMapper = new DefaultKafkaHeaderMapper();
184189
}
185190
else {

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@
5656

5757
<int:channel id="successes" />
5858

59-
<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper" />
59+
<bean id="customHeaderMapper" class="org.springframework.kafka.support.JsonKafkaHeaderMapper" />
6060

6161
</beans>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@
3838

3939
<bean id="ems" class="org.springframework.integration.kafka.config.xml.KafkaOutboundGatewayParserTests$EMS"/>
4040

41-
<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper"/>
41+
<bean id="customHeaderMapper" class="org.springframework.kafka.support.JsonKafkaHeaderMapper"/>
4242

4343
</beans>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
7070
import org.springframework.kafka.listener.MessageListenerContainer;
7171
import org.springframework.kafka.support.Acknowledgment;
72-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
72+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
7373
import org.springframework.kafka.support.KafkaHeaders;
7474
import org.springframework.kafka.test.context.EmbeddedKafka;
7575
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -93,6 +93,7 @@
9393
* @author Biju Kunjummen
9494
* @author Gary Russell
9595
* @author Anshul Mehra
96+
* @author Jooyoung Pyoung
9697
*
9798
* @since 5.4
9899
*/
@@ -152,7 +153,7 @@ public class KafkaDslTests {
152153
private KafkaTemplate<?, ?> kafkaTemplateTopic2;
153154

154155
@Autowired
155-
private DefaultKafkaHeaderMapper mapper;
156+
private JsonKafkaHeaderMapper mapper;
156157

157158
@Autowired
158159
private ContextConfiguration config;
@@ -385,8 +386,8 @@ public IntegrationFlow sendToKafkaFlow(
385386
}
386387

387388
@Bean
388-
public DefaultKafkaHeaderMapper mapper() {
389-
return new DefaultKafkaHeaderMapper();
389+
public JsonKafkaHeaderMapper mapper() {
390+
return new JsonKafkaHeaderMapper();
390391
}
391392

392393
@Bean

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import org.springframework.kafka.listener.DefaultErrorHandler;
6565
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
6666
import org.springframework.kafka.support.Acknowledgment;
67-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
67+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
6868
import org.springframework.kafka.support.KafkaHeaders;
6969
import org.springframework.kafka.support.KafkaNull;
7070
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -73,7 +73,7 @@
7373
import org.springframework.kafka.support.converter.ConversionException;
7474
import org.springframework.kafka.support.converter.MessagingMessageConverter;
7575
import org.springframework.kafka.support.converter.RecordMessageConverter;
76-
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
76+
import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter;
7777
import org.springframework.kafka.test.EmbeddedKafkaBroker;
7878
import org.springframework.kafka.test.context.EmbeddedKafka;
7979
import org.springframework.kafka.test.utils.ContainerTestUtils;
@@ -108,6 +108,7 @@
108108
* @author Biju Kunjummen
109109
* @author Cameron Mayfield
110110
* @author Urs Keller
111+
* @author Jooyoung Pyoung
111112
*
112113
* @since 5.4
113114
*
@@ -500,7 +501,7 @@ void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
500501
KafkaMessageListenerContainer<Integer, String> container =
501502
new KafkaMessageListenerContainer<>(cf, containerProps);
502503
KafkaMessageDrivenChannelAdapter<Integer, String> adapter = new KafkaMessageDrivenChannelAdapter<>(container);
503-
adapter.setRecordMessageConverter(new StringJsonMessageConverter());
504+
adapter.setRecordMessageConverter(new StringJacksonJsonMessageConverter());
504505
QueueChannel out = new QueueChannel();
505506
adapter.setOutputChannel(out);
506507
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
@@ -514,7 +515,7 @@ void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
514515
template.setDefaultTopic(topic3);
515516
Headers kHeaders = new RecordHeaders();
516517
MessageHeaders siHeaders = new MessageHeaders(Collections.singletonMap("foo", "bar"));
517-
new DefaultKafkaHeaderMapper().fromHeaders(siHeaders, kHeaders);
518+
new JsonKafkaHeaderMapper().fromHeaders(siHeaders, kHeaders);
518519
template.send(new ProducerRecord<>(topic3, 0, 1487694048607L, 1, "{\"bar\":\"baz\"}", kHeaders));
519520

520521
Message<?> received = out.receive(10000);
@@ -546,7 +547,7 @@ void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
546547

547548
KafkaMessageDrivenChannelAdapter<Integer, Foo> adapter = Kafka
548549
.messageDrivenChannelAdapter(container, ListenerMode.record)
549-
.recordMessageConverter(new StringJsonMessageConverter())
550+
.recordMessageConverter(new StringJacksonJsonMessageConverter())
550551
.payloadType(Foo.class)
551552
.getObject();
552553
QueueChannel out = new QueueChannel();
@@ -562,7 +563,7 @@ void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
562563
template.setDefaultTopic(topic6);
563564
Headers kHeaders = new RecordHeaders();
564565
MessageHeaders siHeaders = new MessageHeaders(Collections.singletonMap("foo", "bar"));
565-
new DefaultKafkaHeaderMapper().fromHeaders(siHeaders, kHeaders);
566+
new JsonKafkaHeaderMapper().fromHeaders(siHeaders, kHeaders);
566567

567568
template.sendDefault(1, "{\"bar\":\"baz\"}");
568569

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
7373
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
7474
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
75-
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
75+
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
7676
import org.springframework.kafka.support.KafkaHeaders;
7777
import org.springframework.kafka.support.KafkaNull;
7878
import org.springframework.kafka.support.SendResult;
@@ -118,6 +118,7 @@
118118
* @author Artem Bilan
119119
* @author Tom van den Berge
120120
* @author Ryan Riley
121+
* @author Jooyoung Pyoung
121122
*
122123
* @since 5.4
123124
*/
@@ -239,7 +240,7 @@ void testOutboundWithTimestamp() {
239240
assertThat(record).has(value("foo"));
240241
assertThat(record).has(timestamp(1487694048607L));
241242
Map<String, Object> headers = new HashMap<>();
242-
new DefaultKafkaHeaderMapper().toHeaders(record.headers(), headers);
243+
new JsonKafkaHeaderMapper().toHeaders(record.headers(), headers);
243244
assertThat(headers.size()).isEqualTo(1);
244245
assertThat(headers.get("baz")).isEqualTo("qux");
245246

@@ -383,7 +384,7 @@ void testOutboundWithCustomHeaderMapper() {
383384
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(producerFactory);
384385
KafkaProducerMessageHandler<Integer, String> handler = new KafkaProducerMessageHandler<>(template);
385386
handler.setBeanFactory(TEST_INTEGRATION_CONTEXT);
386-
handler.setHeaderMapper(new DefaultKafkaHeaderMapper("!*"));
387+
handler.setHeaderMapper(new JsonKafkaHeaderMapper("!*"));
387388
handler.afterPropertiesSet();
388389

389390
Message<?> message = MessageBuilder.withPayload("foo")
@@ -459,7 +460,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
459460
assertThat(record).has(partition(1));
460461
assertThat(record).has(value("foo"));
461462
Map<String, Object> headers = new HashMap<>();
462-
new DefaultKafkaHeaderMapper().toHeaders(record.headers(), headers);
463+
new JsonKafkaHeaderMapper().toHeaders(record.headers(), headers);
463464
assertThat(headers.get(KafkaHeaders.REPLY_TOPIC)).isEqualTo(topic6.getBytes());
464465
ProducerRecord<Integer, String> pr = new ProducerRecord<>(topic6, 0, 1, "FOO", record.headers());
465466
template.send(pr);

0 commit comments

Comments
 (0)