Skip to content

Commit 2aa4230

Browse files
authored
GH-1789: Refactoring
Resolves #1789 Move the `SmartMessageConverter` invocations to a more logical place, based on their expected inputs for comversion. * Apply suggestions from code review
1 parent d360b19 commit 2aa4230

File tree

4 files changed

+109
-52
lines changed

4 files changed

+109
-52
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4205,6 +4205,36 @@ Again, using `byte[]` or `Bytes` is more efficient because they avoid a `String`
42054205
For convenience, starting with version 2.3, the framework also provides a `StringOrBytesSerializer` which can serialize all three value types so it can be used with any of the message converters.
42064206
====
42074207

4208+
Starting with version 2.7.1, message payload conversion can be delegated to a `spring-messaging` `SmartMessageConverter`; this enables conversion, for example, to be based on the `MessageHeaders.CONTENT_TYPE` header.
4209+
4210+
IMPORTANT: The `KafkaMessageConverter.fromMessage()` method is called for outbound conversion to a `ProducerRecord` with the message payload in the `ProducerRecord.value()` property.
4211+
The `KafkaMessageConverter.toMessage()` method is called for inbound conversion from `ConsumerRecord` with the payload being the `ConsumerRecord.value()` property.
4212+
The `SmartMessageConverter.toMessage()` method is called to create a new outbound `Message<?>` from the `Message` passed to`fromMessage()` (usually by `KafkaTemplate.send(Message<?> msg)`).
4213+
Similarly, in the `KafkaMessageConverter.toMessage()` method, after the converter has created a new `Message<?>` from the `ConsumerRecord`, the `SmartMessageConverter.fromMessage()` method is called and then the final inbound message is created with the newly converted payload.
4214+
In either case, if the `SmartMessageConverter` returns `null`, the original message is used.
4215+
4216+
When the default converter is used in the `KafkaTemplate` and listener container factory, you configure the `SmartMessageConverter` by calling `setMessagingConverter()` on the template and via the `contentMessageConverter` property on `@KafkaListener` methods.
4217+
4218+
Examples:
4219+
4220+
====
4221+
[source, java]
4222+
----
4223+
template.setMessagingConverter(mySmartConverter);
4224+
----
4225+
====
4226+
4227+
====
4228+
[source, java]
4229+
----
4230+
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
4231+
contentTypeConverter = "mySmartConverter")
4232+
public void smart(Thing thing) {
4233+
...
4234+
}
4235+
----
4236+
====
4237+
42084238
[[data-projection]]
42094239
====== Using Spring Data Projection Interfaces
42104240

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,9 @@ See <<streams-config>> for more information.
8585
New methods `createOrModifyTopics` and `describeTopics` have been added.
8686
`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean.
8787
See <<configuring-topics>> for more information.
88+
89+
[[x27-conv]]
90+
==== `MessageConverter` Changes
91+
92+
It is now possible to add a `spring-messaging` `SmartMessageConverter` to the `MessagingMessageConverter`, allowing content negotiation based on the `contentType` header.
93+
See <<messaging-message-conversion>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.Consumer;
2626
import org.apache.kafka.clients.consumer.ConsumerRecord;
2727
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.common.header.Header;
2829
import org.apache.kafka.common.header.Headers;
2930
import org.apache.kafka.common.header.internals.RecordHeaders;
3031

@@ -37,7 +38,6 @@
3738
import org.springframework.kafka.support.KafkaHeaders;
3839
import org.springframework.kafka.support.KafkaNull;
3940
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
40-
import org.springframework.lang.Nullable;
4141
import org.springframework.messaging.Message;
4242
import org.springframework.messaging.MessageHeaders;
4343
import org.springframework.messaging.converter.SmartMessageConverter;
@@ -124,8 +124,26 @@ protected org.springframework.messaging.converter.MessageConverter getMessagingC
124124

125125
/**
126126
* Set a spring-messaging {@link SmartMessageConverter} to convert the record value to
127-
* the desired type. This will also cause the {@link MessageHeaders#CONTENT_TYPE} to be
128-
* converted to String when mapped inbound.
127+
* the desired type. This will also cause the {@link MessageHeaders#CONTENT_TYPE} to
128+
* be converted to String when mapped inbound.
129+
* <p>
130+
* IMPORTANT: This converter's {@link #fromMessage(Message, String)} method is called
131+
* for outbound conversion to a {@link ProducerRecord} with the message payload in the
132+
* {@link ProducerRecord#value()} property.
133+
* {@link #toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)} is called for
134+
* inbound conversion from {@link ConsumerRecord} with the payload being the
135+
* {@link ConsumerRecord#value()} property.
136+
* <p>
137+
* The {@link SmartMessageConverter#toMessage(Object, MessageHeaders)} method is
138+
* called to create a new outbound {@link Message} from the {@link Message} passed to
139+
* {@link #fromMessage(Message, String)}. Similarly, in
140+
* {@link #toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)}, after this
141+
* converter has created a new {@link Message} from the {@link ConsumerRecord} the
142+
* {@link SmartMessageConverter#fromMessage(Message, Class)} method is called and then
143+
* the final inbound message is created with the newly converted payload.
144+
* <p>
145+
* In either case, if the {@link SmartMessageConverter} returns {@code null}, the
146+
* original message is used.
129147
* @param messagingConverter the converter.
130148
* @since 2.7.1
131149
*/
@@ -144,36 +162,51 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
144162
this.generateTimestamp);
145163

146164
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
147-
boolean removeNative = true;
148-
if (this.headerMapper != null && record.headers() != null) {
149-
this.headerMapper.toHeaders(record.headers(), rawHeaders);
150-
}
151-
else {
152-
this.logger.debug(() ->
153-
"No header mapper is available; Jackson is required for the default mapper; "
154-
+ "headers (if present) are not mapped but provided raw in "
155-
+ KafkaHeaders.NATIVE_HEADERS);
156-
removeNative = false;
165+
if (record.headers() != null) {
166+
if (this.headerMapper != null) {
167+
this.headerMapper.toHeaders(record.headers(), rawHeaders);
168+
}
169+
else {
170+
this.logger.debug(() ->
171+
"No header mapper is available; Jackson is required for the default mapper; "
172+
+ "headers (if present) are not mapped but provided raw in "
173+
+ KafkaHeaders.NATIVE_HEADERS);
174+
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers());
175+
Header contentType = record.headers().lastHeader(MessageHeaders.CONTENT_TYPE);
176+
if (contentType != null) {
177+
rawHeaders.put(MessageHeaders.CONTENT_TYPE,
178+
new String(contentType.value(), StandardCharsets.UTF_8));
179+
}
180+
}
157181
}
158-
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers());
159182
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
160183
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
161184
record.offset(), ttName, record.timestamp());
162185
if (this.rawRecordHeader) {
163186
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
164187
}
165-
Object value = this.messagingConverter == null
166-
? extractAndConvertValue(record, type)
167-
: extractAndConvertValue(record, type, kafkaMessageHeaders);
168-
if (removeNative) {
169-
rawHeaders.remove(KafkaHeaders.NATIVE_HEADERS);
188+
Message<?> message = MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
189+
if (this.messagingConverter != null && !message.getPayload().equals(KafkaNull.INSTANCE)) {
190+
Class<?> clazz = type instanceof Class ? (Class<?>) type : type instanceof ParameterizedType
191+
? (Class<?>) ((ParameterizedType) type).getRawType() : Object.class;
192+
Object payload = this.messagingConverter.fromMessage(message, clazz, type);
193+
if (payload != null) {
194+
message = new GenericMessage<>(payload, message.getHeaders());
195+
}
170196
}
171-
return MessageBuilder.createMessage(value, kafkaMessageHeaders);
197+
return message;
172198
}
173199

174200
@SuppressWarnings({ "unchecked", "rawtypes" })
175201
@Override
176-
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
202+
public ProducerRecord<?, ?> fromMessage(Message<?> messageArg, String defaultTopic) {
203+
Message<?> message = messageArg;
204+
if (this.messagingConverter != null) {
205+
Message<?> converted = this.messagingConverter.toMessage(message.getPayload(), message.getHeaders());
206+
if (converted != null) {
207+
message = converted;
208+
}
209+
}
177210
MessageHeaders headers = message.getHeaders();
178211
Object topicHeader = headers.get(KafkaHeaders.TOPIC);
179212
String topic = null;
@@ -223,12 +256,6 @@ protected Object convertPayload(Message<?> message) {
223256
return null;
224257
}
225258
else {
226-
if (this.messagingConverter != null) {
227-
Message<?> message2 = this.messagingConverter.toMessage(payload, message.getHeaders());
228-
if (message2 != null) {
229-
return message2.getPayload();
230-
}
231-
}
232259
return payload;
233260
}
234261
}
@@ -241,31 +268,7 @@ protected Object convertPayload(Message<?> message) {
241268
* @return the value.
242269
*/
243270
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
244-
return extractAndConvertValue(record, type, null);
245-
}
246-
247-
/**
248-
* Subclasses can convert the value; by default, it's returned as provided by Kafka
249-
* unless there is a {@link SmartMessageConverter} that can convert it.
250-
* @param record the record.
251-
* @param type the required type.
252-
* @param headers the mapped headers.
253-
* @return the value.
254-
*/
255-
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type, @Nullable MessageHeaders headers) {
256-
if (record.value() == null) {
257-
return KafkaNull.INSTANCE;
258-
}
259-
if (this.messagingConverter != null) {
260-
Class<?> clazz = type instanceof Class ? (Class<?>) type : type instanceof ParameterizedType
261-
? (Class<?>) ((ParameterizedType) type).getRawType() : Object.class;
262-
Object payload = this.messagingConverter
263-
.fromMessage(new GenericMessage<>(record.value(), headers), clazz, type);
264-
if (payload != null) {
265-
return payload;
266-
}
267-
}
268-
return record.value();
271+
return record.value() == null ? KafkaNull.INSTANCE : record.value();
269272
}
270273

271274
}

spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ void contentNegotiation() {
133133
assertThat(pr.value()).isEqualTo("qux");
134134
}
135135

136+
@Test
137+
void contentNegotiationNoHeaderMapper() {
138+
MessagingMessageConverter converter = new MessagingMessageConverter();
139+
converter.setHeaderMapper(null);
140+
Collection<MessageConverter> converters = Arrays.asList(new FooConverter(MimeType.valueOf("application/foo")),
141+
new BarConverter(MimeType.valueOf("application/bar")));
142+
converter.setMessagingConverter(new CompositeMessageConverter(converters));
143+
Headers headers = new RecordHeaders();
144+
headers.add(new RecordHeader(MessageHeaders.CONTENT_TYPE, "application/foo".getBytes()));
145+
ConsumerRecord<String, String> record =
146+
new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "qux", headers);
147+
Message<?> message = converter.toMessage(record, null, null, Foo.class);
148+
assertThat(message.getPayload()).isEqualTo(new Foo("bar"));
149+
headers.remove(MessageHeaders.CONTENT_TYPE);
150+
message = converter.toMessage(record, null, null, Foo.class);
151+
assertThat(message.getPayload()).isEqualTo(new Foo("bar")); // no contentType header
152+
}
153+
136154
static class FooConverter extends AbstractMessageConverter {
137155

138156
FooConverter(MimeType supportedMimeType) {

0 commit comments

Comments
 (0)