Skip to content

Commit 62c5692

Browse files
garyrussellartembilan
authored andcommitted
Json Type Determination Using Methods
1 parent 0692a8b commit 62c5692

File tree

6 files changed

+342
-108
lines changed

6 files changed

+342
-108
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.function.BiFunction;
2324

2425
import org.apache.kafka.common.errors.SerializationException;
2526
import org.apache.kafka.common.header.Headers;
@@ -92,20 +93,32 @@ public class JsonDeserializer<T> implements Deserializer<T> {
9293
*/
9394
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";
9495

96+
/**
97+
* A method name to determine the {@link JavaType} to deserialize the key to.
98+
*/
99+
public static final String KEY_TYPE_METHOD = "spring.json.key.type.method";
100+
101+
/**
102+
* A method name to determine the {@link JavaType} to deserialize the key to.
103+
*/
104+
public static final String VALUE_TYPE_METHOD = "spring.json.value.type.method";
105+
95106
protected final ObjectMapper objectMapper; // NOSONAR
96107

97108
protected JavaType targetType; // NOSONAR
98109

99110
protected Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); // NOSONAR
100111

101-
private volatile ObjectReader reader;
112+
private ObjectReader reader;
102113

103114
private boolean typeMapperExplicitlySet = false;
104115

105116
private boolean removeTypeHeaders = true;
106117

107118
private boolean useTypeHeaders = true;
108119

120+
private BiFunction<byte[], Headers, JavaType> typeFunction;
121+
109122
/**
110123
* Construct an instance with a default {@link ObjectMapper}.
111124
*/
@@ -242,19 +255,6 @@ public JsonDeserializer(@Nullable JavaType targetType, ObjectMapper objectMapper
242255
initialize(targetType, useHeadersIfPresent);
243256
}
244257

245-
private void initialize(@Nullable JavaType type, boolean useHeadersIfPresent) {
246-
this.targetType = type;
247-
Assert.isTrue(this.targetType != null || useHeadersIfPresent,
248-
"'targetType' cannot be null if 'useHeadersIfPresent' is false");
249-
250-
if (this.targetType != null) {
251-
this.reader = this.objectMapper.readerFor(this.targetType);
252-
}
253-
254-
addTargetPackageToTrusted();
255-
this.typeMapper.setTypePrecedence(useHeadersIfPresent ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
256-
}
257-
258258
public Jackson2JavaTypeMapper getTypeMapper() {
259259
return this.typeMapper;
260260
}
@@ -307,6 +307,16 @@ public void setUseTypeHeaders(boolean useTypeHeaders) {
307307
}
308308
}
309309

310+
/**
311+
* Set a {@link BiFunction} that receives the data to be deserialized and the headers
312+
* and returns a JavaType.
313+
* @param typeFunction the function.
314+
* @since 2.5
315+
*/
316+
public void setTypeFunction(BiFunction<byte[], Headers, JavaType> typeFunction) {
317+
this.typeFunction = typeFunction;
318+
}
319+
310320
@Override
311321
public void configure(Map<String, ?> configs, boolean isKey) {
312322
setUseTypeMapperForKey(isKey);
@@ -325,6 +335,17 @@ public void configure(Map<String, ?> configs, boolean isKey) {
325335
if (configs.containsKey(REMOVE_TYPE_INFO_HEADERS)) {
326336
this.removeTypeHeaders = Boolean.parseBoolean(configs.get(REMOVE_TYPE_INFO_HEADERS).toString());
327337
}
338+
if (isKey && configs.containsKey(KEY_TYPE_METHOD)) {
339+
setUpTypeFuntion((String) configs.get(KEY_TYPE_METHOD));
340+
}
341+
else if (!isKey && configs.containsKey(VALUE_TYPE_METHOD)) {
342+
setUpTypeFuntion((String) configs.get(VALUE_TYPE_METHOD));
343+
}
344+
}
345+
346+
private void setUpTypeFuntion(String method) {
347+
this.typeFunction = SerializationUtils.propertyToMethodInvokingFunction(method, byte[].class,
348+
getClass().getClassLoader());
328349
}
329350

330351
private void setUpTypePrecedence(Map<String, ?> configs) {
@@ -355,6 +376,19 @@ else if (!isKey && configs.containsKey(VALUE_DEFAULT_TYPE)) {
355376
}
356377
}
357378

379+
private void initialize(@Nullable JavaType type, boolean useHeadersIfPresent) {
380+
this.targetType = type;
381+
Assert.isTrue(this.targetType != null || useHeadersIfPresent,
382+
"'targetType' cannot be null if 'useHeadersIfPresent' is false");
383+
384+
if (this.targetType != null) {
385+
this.reader = this.objectMapper.readerFor(this.targetType);
386+
}
387+
388+
addTargetPackageToTrusted();
389+
this.typeMapper.setTypePrecedence(useHeadersIfPresent ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
390+
}
391+
358392
private JavaType setupTargetType(Map<String, ?> configs, String key) throws ClassNotFoundException, LinkageError {
359393
if (configs.get(key) instanceof Class) {
360394
return TypeFactory.defaultInstance().constructType((Class<?>) configs.get(key));
@@ -401,11 +435,15 @@ public T deserialize(String topic, Headers headers, byte[] data) {
401435
return null;
402436
}
403437
ObjectReader deserReader = null;
404-
if (this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)) {
405-
JavaType javaType = this.typeMapper.toJavaType(headers);
406-
if (javaType != null) {
407-
deserReader = this.objectMapper.readerFor(javaType);
408-
}
438+
JavaType javaType = null;
439+
if (this.typeFunction != null) {
440+
javaType = this.typeFunction.apply(data, headers);
441+
}
442+
if (javaType == null && this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)) {
443+
javaType = this.typeMapper.toJavaType(headers);
444+
}
445+
if (javaType != null) {
446+
deserReader = this.objectMapper.readerFor(javaType);
409447
}
410448
if (this.removeTypeHeaders) {
411449
this.typeMapper.removeHeaders(headers);
@@ -428,9 +466,16 @@ public T deserialize(String topic, @Nullable byte[] data) {
428466
if (data == null) {
429467
return null;
430468
}
431-
Assert.state(this.reader != null, "No headers available and no default type provided");
469+
ObjectReader localReader = this.reader;
470+
if (this.typeFunction != null) {
471+
JavaType javaType = this.typeFunction.apply(data, null);
472+
if (javaType != null) {
473+
localReader = this.objectMapper.readerFor(javaType);
474+
}
475+
}
476+
Assert.state(localReader != null, "No headers available and no default type provided");
432477
try {
433-
return this.reader.readValue(data);
478+
return localReader.readValue(data);
434479
}
435480
catch (IOException e) {
436481
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
@@ -490,4 +535,28 @@ public JsonDeserializer<T> typeMapper(Jackson2JavaTypeMapper mapper) {
490535
return this;
491536
}
492537

538+
/**
539+
* Add trusted packages to the default type mapper.
540+
* @param packages the packages.
541+
* @return the deserializer.
542+
* @since 2,5
543+
*/
544+
public JsonDeserializer<T> trustedPackages(String... packages) {
545+
Assert.isTrue(!this.typeMapperExplicitlySet, "When using a custom type mapper, set the trusted packages there");
546+
this.typeMapper.addTrustedPackages(packages);
547+
return this;
548+
}
549+
550+
/**
551+
* Set a {@link BiFunction} that receives the data to be deserialized and the headers
552+
* and returns a JavaType.
553+
* @param typeFunction the function.
554+
* @return the deserializer.
555+
* @since 2.5
556+
*/
557+
public JsonDeserializer<T> typeFunction(BiFunction<byte[], Headers, JavaType> typeFunction) {
558+
setTypeFunction(typeFunction);
559+
return this;
560+
}
561+
493562
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19-
import java.lang.reflect.InvocationTargetException;
20-
import java.lang.reflect.Method;
2119
import java.nio.charset.Charset;
2220
import java.nio.charset.StandardCharsets;
2321
import java.util.Map;
@@ -28,7 +26,6 @@
2826
import org.apache.kafka.common.serialization.Deserializer;
2927

3028
import org.springframework.util.Assert;
31-
import org.springframework.util.ClassUtils;
3229

3330
/**
3431
* Generic {@link org.apache.kafka.common.serialization.Deserializer Deserializer} for deserialization of entity from
@@ -56,6 +53,7 @@ public class ParseStringDeserializer<T> implements Deserializer<T> {
5653
throw new IllegalStateException("A parser must be provided either via a constructor or consumer properties");
5754
};
5855

56+
@SuppressWarnings("unchecked")
5957
private BiFunction<String, Headers, T> parser = (BiFunction<String, Headers, T>) NO_PARSER;
6058

6159
private Charset charset = StandardCharsets.UTF_8;
@@ -84,64 +82,16 @@ public ParseStringDeserializer(BiFunction<String, Headers, T> parser) {
8482
this.parser = parser;
8583
}
8684

87-
@SuppressWarnings("unchecked")
8885
@Override
8986
public void configure(Map<String, ?> configs, boolean isKey) {
9087
if (NO_PARSER.equals(this.parser)) {
9188
String parserMethod = (String) configs.get(isKey ? KEY_PARSER : VALUE_PARSER);
9289
Assert.state(parserMethod != null,
9390
"A parser must be provided either via a constructor or consumer properties");
94-
int lastDotPosn = parserMethod.lastIndexOf(".");
95-
Assert.state(lastDotPosn > 1,
96-
"the parser method needs to be a class name followed by the method name, separated by '.'");
97-
Class<?> clazz;
98-
try {
99-
clazz = ClassUtils.forName(parserMethod.substring(0, lastDotPosn),
100-
getClass().getClassLoader());
101-
}
102-
catch (ClassNotFoundException | LinkageError e) {
103-
throw new IllegalStateException(e);
104-
}
105-
parserMethod = parserMethod.substring(lastDotPosn + 1);
106-
Method method;
107-
try {
108-
method = clazz.getDeclaredMethod(parserMethod, String.class, Headers.class);
109-
}
110-
catch (@SuppressWarnings("unused") NoSuchMethodException e) {
111-
try {
112-
method = clazz.getDeclaredMethod(parserMethod, String.class);
113-
}
114-
catch (NoSuchMethodException e1) {
115-
throw new IllegalStateException("the parser method must take '(String, Headers)' or '(String)'");
116-
}
117-
catch (SecurityException e1) {
118-
throw new IllegalStateException(e1);
119-
}
120-
}
121-
catch (SecurityException e) {
122-
throw new IllegalStateException(e);
123-
}
124-
Method parseMethod = method;
125-
if (method.getParameters().length > 1) {
126-
this.parser = (str, headers) -> {
127-
try {
128-
return (T) parseMethod.invoke(null, str, headers);
129-
}
130-
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
131-
throw new IllegalStateException(e);
132-
}
133-
};
134-
}
135-
else {
136-
this.parser = (str, headers) -> {
137-
try {
138-
return (T) parseMethod.invoke(null, str);
139-
}
140-
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
141-
throw new IllegalStateException(e);
142-
}
143-
};
144-
}
91+
BiFunction<String, Headers, T> parser;
92+
parser = SerializationUtils.propertyToMethodInvokingFunction(parserMethod, String.class,
93+
getClass().getClassLoader());
94+
this.parser = parser;
14595
}
14696
}
14797

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.lang.reflect.InvocationTargetException;
20+
import java.lang.reflect.Method;
21+
import java.util.function.BiFunction;
22+
23+
import org.apache.kafka.common.header.Headers;
24+
25+
import org.springframework.util.Assert;
26+
import org.springframework.util.ClassUtils;
27+
28+
/**
29+
* Utilities for serialization.
30+
*
31+
* @author Gary Russell
32+
* @since 2.5
33+
*
34+
*/
35+
public final class SerializationUtils {
36+
37+
private SerializationUtils() {
38+
}
39+
40+
/**
41+
* Convert a property value (FQCN.methodName) to a {@link BiFunction} that takes a
42+
* payload and headers and returns some value. The method must have parameters
43+
* {@code (P, Headers)} or {@code (P)} and be declared as static.
44+
* @param <P> The {@link BiFunction} first parameter type.
45+
* @param <T> The {@link BiFunction} return type.
46+
* @param methodProperty the method name property.
47+
* @param payloadType the {@link BiFunction} first parameter type.
48+
* @param classLoader the class loader.
49+
* @return the function.
50+
*/
51+
@SuppressWarnings("unchecked")
52+
public static <P, T> BiFunction<P, Headers, T> propertyToMethodInvokingFunction(String methodProperty,
53+
Class<P> payloadType, ClassLoader classLoader) {
54+
55+
int lastDotPosn = methodProperty.lastIndexOf(".");
56+
Assert.state(lastDotPosn > 1,
57+
"the method property needs to be a class name followed by the method name, separated by '.'");
58+
BiFunction<P, Headers, T> function;
59+
Class<?> clazz;
60+
try {
61+
clazz = ClassUtils.forName(methodProperty.substring(0, lastDotPosn), classLoader);
62+
}
63+
catch (ClassNotFoundException | LinkageError e) {
64+
throw new IllegalStateException(e);
65+
}
66+
String methodName = methodProperty.substring(lastDotPosn + 1);
67+
Method method;
68+
try {
69+
method = clazz.getDeclaredMethod(methodName, payloadType, Headers.class);
70+
}
71+
catch (@SuppressWarnings("unused") NoSuchMethodException e) {
72+
try {
73+
method = clazz.getDeclaredMethod(methodName, payloadType);
74+
}
75+
catch (@SuppressWarnings("unused") NoSuchMethodException e1) {
76+
throw new IllegalStateException("the parser method must take '(String, Headers)' or '(String)'");
77+
}
78+
catch (SecurityException e1) {
79+
throw new IllegalStateException(e1);
80+
}
81+
}
82+
catch (SecurityException e) {
83+
throw new IllegalStateException(e);
84+
}
85+
Method parseMethod = method;
86+
if (method.getParameters().length > 1) {
87+
function = (str, headers) -> {
88+
try {
89+
return (T) parseMethod.invoke(null, str, headers);
90+
}
91+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
92+
throw new IllegalStateException(e);
93+
}
94+
};
95+
}
96+
else {
97+
function = (str, headers) -> {
98+
try {
99+
return (T) parseMethod.invoke(null, str);
100+
}
101+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
102+
throw new IllegalStateException(e);
103+
}
104+
};
105+
}
106+
return function;
107+
}
108+
109+
}

0 commit comments

Comments
 (0)