Skip to content

Commit 9ae454a

Browse files
garyrussellartembilan
authored andcommitted
GH-769: Add JSON Type Mapping Property
Resolves #769 Allow configuration of type mapping using Kafka properties. GH-768: Override JSON Type Headers Resolves #768 Polishing - PR Comments * Fix Copyrights * Some code improvements in `Json(De)Serializer`
1 parent 0474eb9 commit 9ae454a

File tree

7 files changed

+362
-44
lines changed

7 files changed

+362
-44
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/DefaultJackson2JavaTypeMapper.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,7 +50,7 @@ public class DefaultJackson2JavaTypeMapper extends AbstractJavaTypeMapper
5050
"java.lang"
5151
);
5252

53-
private final Set<String> trustedPackages = new LinkedHashSet<String>(TRUSTED_PACKAGES);
53+
private final Set<String> trustedPackages = new LinkedHashSet<>(TRUSTED_PACKAGES);
5454

5555
private volatile TypePrecedence typePrecedence = TypePrecedence.INFERRED;
5656

@@ -64,21 +64,7 @@ public TypePrecedence getTypePrecedence() {
6464
return this.typePrecedence;
6565
}
6666

67-
/**
68-
* Set the precedence for evaluating type information in message properties.
69-
* When using {@code @KafkaListener} at the method level, the framework attempts
70-
* to determine the target type for payload conversion from the method signature.
71-
* If so, this type is provided by the {@code MessagingMessageListenerAdapter}.
72-
* <p>
73-
* By default, if the type is concrete (not abstract, not an interface), this will
74-
* be used ahead of type information provided in the {@code __TypeId__} and
75-
* associated headers provided by the sender.
76-
* <p>
77-
* If you wish to force the use of the {@code __TypeId__} and associated headers
78-
* (such as when the actual type is a subclass of the method argument type),
79-
* set the precedence to {@link Jackson2JavaTypeMapper.TypePrecedence#TYPE_ID}.
80-
* @param typePrecedence the precedence.
81-
*/
67+
@Override
8268
public void setTypePrecedence(TypePrecedence typePrecedence) {
8369
Assert.notNull(typePrecedence, "'typePrecedence' cannot be null");
8470
this.typePrecedence = typePrecedence;

spring-kafka/src/main/java/org/springframework/kafka/support/converter/Jackson2JavaTypeMapper.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,24 @@ enum TypePrecedence {
4848

4949
TypePrecedence getTypePrecedence();
5050

51+
/**
52+
* Set the precedence for evaluating type information in message properties.
53+
* When using {@code @KafkaListener} at the method level, the framework attempts
54+
* to determine the target type for payload conversion from the method signature.
55+
* If so, this type is provided by the {@code MessagingMessageListenerAdapter}.
56+
* <p> By default, if the type is concrete (not abstract, not an interface), this will
57+
* be used ahead of type information provided in the {@code __TypeId__} and
58+
* associated headers provided by the sender.
59+
* <p> If you wish to force the use of the {@code __TypeId__} and associated headers
60+
* (such as when the actual type is a subclass of the method argument type),
61+
* set the precedence to {@link Jackson2JavaTypeMapper.TypePrecedence#TYPE_ID}.
62+
* @param typePrecedence the precedence.
63+
* @since 2.2
64+
*/
65+
default void setTypePrecedence(TypePrecedence typePrecedence) {
66+
throw new UnsupportedOperationException("This mapper does not support this method");
67+
}
68+
5169
void addTrustedPackages(String... packages);
5270

5371
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 93 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
3030
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
3131
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
32+
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
33+
import org.springframework.lang.Nullable;
3234
import org.springframework.util.Assert;
3335
import org.springframework.util.ClassUtils;
3436
import org.springframework.util.StringUtils;
@@ -81,6 +83,12 @@ public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
8183
*/
8284
public static final String TRUSTED_PACKAGES = "spring.json.trusted.packages";
8385

86+
/**
87+
* Kafka config property to add type mappings to the type mapper:
88+
* 'foo=com.Foo,bar=com.Bar'.
89+
*/
90+
public static final String TYPE_MAPPINGS = JsonSerializer.TYPE_MAPPINGS;
91+
8492
protected final ObjectMapper objectMapper;
8593

8694
protected Class<T> targetType;
@@ -91,29 +99,79 @@ public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
9199

92100
private boolean typeMapperExplicitlySet = false;
93101

102+
/**
103+
* Construct an instance with a default {@link ObjectMapper}.
104+
*/
94105
public JsonDeserializer() {
95-
this((Class<T>) null);
106+
this(null, true);
96107
}
97108

98-
protected JsonDeserializer(ObjectMapper objectMapper) {
99-
this(null, objectMapper);
109+
/**
110+
* Construct an instance with the provided {@link ObjectMapper}.
111+
* @param objectMapper a custom object mapper.
112+
*/
113+
public JsonDeserializer(ObjectMapper objectMapper) {
114+
this(null, objectMapper, true);
100115
}
101116

117+
/**
118+
* Construct an instance with the provided target type, and a default
119+
* {@link ObjectMapper}.
120+
* @param targetType the target type to use if no type info headers are present.
121+
*/
102122
public JsonDeserializer(Class<T> targetType) {
103-
this(targetType, new ObjectMapper());
123+
this(targetType, true);
124+
}
125+
126+
/**
127+
* Construct an instance with the provided target type, and
128+
* useHeadersIfPresent with a default {@link ObjectMapper}.
129+
* @param targetType the target type.
130+
* @param useHeadersIfPresent true to use headers if present and fall back to target
131+
* type if not.
132+
* @since 2.2
133+
*/
134+
public JsonDeserializer(Class<T> targetType, boolean useHeadersIfPresent) {
135+
this(targetType, new ObjectMapper(), useHeadersIfPresent);
104136
this.objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
105137
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
106138
}
107139

108-
@SuppressWarnings("unchecked")
140+
/**
141+
* Construct an instance with the provided target type, and {@link ObjectMapper}.
142+
* @param targetType the target type to use if no type info headers are present.
143+
* @param objectMapper the mapper. type if not.
144+
*/
109145
public JsonDeserializer(Class<T> targetType, ObjectMapper objectMapper) {
146+
this(targetType, objectMapper, true);
147+
}
148+
149+
/**
150+
* Construct an instance with the provided target type, {@link ObjectMapper} and
151+
* useHeadersIfPresent.
152+
* @param targetType the target type.
153+
* @param objectMapper the mapper.
154+
* @param useHeadersIfPresent true to use headers if present and fall back to target
155+
* type if not.
156+
* @since 2.2
157+
*/
158+
@SuppressWarnings("unchecked")
159+
public JsonDeserializer(@Nullable Class<T> targetType, ObjectMapper objectMapper, boolean useHeadersIfPresent) {
110160
Assert.notNull(objectMapper, "'objectMapper' must not be null.");
111161
this.objectMapper = objectMapper;
112-
if (targetType == null) {
113-
targetType = (Class<T>) ResolvableType.forClass(getClass()).getSuperType().resolveGeneric(0);
114-
}
115162
this.targetType = targetType;
163+
if (this.targetType == null) {
164+
this.targetType = (Class<T>) ResolvableType.forClass(getClass()).getSuperType().resolveGeneric(0);
165+
}
166+
Assert.isTrue(this.targetType != null || useHeadersIfPresent,
167+
"'targetType' cannot be null if 'useHeadersIfPresent' is false");
168+
169+
if (this.targetType != null) {
170+
this.reader = this.objectMapper.readerFor(this.targetType);
171+
}
172+
116173
addTargetPackageToTrusted();
174+
this.typeMapper.setTypePrecedence(useHeadersIfPresent ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
117175
}
118176

119177
public Jackson2JavaTypeMapper getTypeMapper() {
@@ -185,6 +243,10 @@ else if (configs.get(VALUE_DEFAULT_TYPE) instanceof String) {
185243
throw new IllegalStateException(VALUE_DEFAULT_TYPE + " must be Class or String");
186244
}
187245
}
246+
247+
if (this.targetType != null) {
248+
this.reader = this.objectMapper.readerFor(this.targetType);
249+
}
188250
addTargetPackageToTrusted();
189251
}
190252
catch (ClassNotFoundException | LinkageError e) {
@@ -196,6 +258,11 @@ else if (configs.get(VALUE_DEFAULT_TYPE) instanceof String) {
196258
StringUtils.commaDelimitedListToStringArray((String) configs.get(TRUSTED_PACKAGES)));
197259
}
198260
}
261+
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
262+
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
263+
((AbstractJavaTypeMapper) this.typeMapper).setIdClassMapping(
264+
JsonSerializer.createMappings((String) configs.get(JsonSerializer.TYPE_MAPPINGS)));
265+
}
199266
}
200267

201268
/**
@@ -218,30 +285,32 @@ public T deserialize(String topic, Headers headers, byte[] data) {
218285
if (data == null) {
219286
return null;
220287
}
221-
JavaType javaType = this.typeMapper.toJavaType(headers);
222-
if (javaType == null) {
223-
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
224-
return deserialize(topic, data);
225-
}
226-
else {
227-
try {
228-
return this.objectMapper.readerFor(javaType).readValue(data);
229-
}
230-
catch (IOException e) {
231-
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
232-
"] from topic [" + topic + "]", e);
288+
ObjectReader reader = null;
289+
if (this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)) {
290+
JavaType javaType = this.typeMapper.toJavaType(headers);
291+
if (javaType != null) {
292+
reader = this.objectMapper.readerFor(javaType);
233293
}
234294
}
295+
if (reader == null) {
296+
reader = this.reader;
297+
}
298+
Assert.state(reader != null, "No type information in headers and no default type provided");
299+
try {
300+
return reader.readValue(data);
301+
}
302+
catch (IOException e) {
303+
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
304+
"] from topic [" + topic + "]", e);
305+
}
235306
}
236307

237308
@Override
238309
public T deserialize(String topic, byte[] data) {
239310
if (data == null) {
240311
return null;
241312
}
242-
if (this.reader == null) {
243-
this.reader = this.objectMapper.readerFor(this.targetType);
244-
}
313+
Assert.state(this.reader != null, "No headers available and no default type provided");
245314
try {
246315
T result = null;
247316
if (data != null) {
@@ -259,4 +328,5 @@ public T deserialize(String topic, byte[] data) {
259328
public void close() {
260329
// No-op
261330
}
331+
262332
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.serializer;
1818

1919
import java.io.IOException;
20+
import java.util.HashMap;
2021
import java.util.Map;
2122

2223
import org.apache.kafka.common.errors.SerializationException;
@@ -28,6 +29,8 @@
2829
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
2930
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
3031
import org.springframework.util.Assert;
32+
import org.springframework.util.ClassUtils;
33+
import org.springframework.util.StringUtils;
3134

3235
import com.fasterxml.jackson.databind.DeserializationFeature;
3336
import com.fasterxml.jackson.databind.MapperFeature;
@@ -46,10 +49,16 @@
4649
public class JsonSerializer<T> implements ExtendedSerializer<T> {
4750

4851
/**
49-
* Kafka config property for to disable adding type headers.
52+
* Kafka config property for disabling adding type headers.
5053
*/
5154
public static final String ADD_TYPE_INFO_HEADERS = "spring.json.add.type.headers";
5255

56+
/**
57+
* Kafka config property to add type mappings to the type mapper:
58+
* 'foo:com.Foo,bar:com.Bar'.
59+
*/
60+
public static final String TYPE_MAPPINGS = "spring.json.type.mapping";
61+
5362
protected final ObjectMapper objectMapper;
5463

5564
protected boolean addTypeInfo = true;
@@ -126,6 +135,28 @@ else if (config instanceof String) {
126135
throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String");
127136
}
128137
}
138+
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
139+
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
140+
((AbstractJavaTypeMapper) this.typeMapper)
141+
.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
142+
}
143+
}
144+
145+
protected static Map<String, Class<?>> createMappings(String mappings) {
146+
Map<String, Class<?>> mappingsMap = new HashMap<>();
147+
String[] array = StringUtils.commaDelimitedListToStringArray(mappings);
148+
for (String entry : array) {
149+
String[] split = entry.split(":");
150+
Assert.isTrue(split.length == 2, "Each comma-delimited mapping entry must have exactly one ':'");
151+
try {
152+
mappingsMap.put(split[0].trim(),
153+
ClassUtils.forName(split[1].trim(), JsonSerializer.class.getClassLoader()));
154+
}
155+
catch (ClassNotFoundException | LinkageError e) {
156+
throw new IllegalArgumentException(e);
157+
}
158+
}
159+
return mappingsMap;
129160
}
130161

131162
@Override
@@ -160,4 +191,5 @@ public byte[] serialize(String topic, T data) {
160191
public void close() {
161192
// No-op
162193
}
194+
163195
}

0 commit comments

Comments
 (0)