Skip to content

Commit b68e82a

Browse files
garyrussellartembilan
authored andcommitted
Configure ParseStringDeserializer with Properties
1 parent b977e34 commit b68e82a

File tree

4 files changed

+154
-3
lines changed

4 files changed

+154
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.lang.reflect.InvocationTargetException;
20+
import java.lang.reflect.Method;
1921
import java.nio.charset.Charset;
2022
import java.nio.charset.StandardCharsets;
23+
import java.util.Map;
2124
import java.util.function.BiFunction;
2225
import java.util.function.Function;
2326

2427
import org.apache.kafka.common.header.Headers;
2528
import org.apache.kafka.common.serialization.Deserializer;
2629

2730
import org.springframework.util.Assert;
31+
import org.springframework.util.ClassUtils;
2832

2933
/**
3034
* Generic {@link org.apache.kafka.common.serialization.Deserializer Deserializer} for deserialization of entity from
@@ -38,10 +42,32 @@
3842
*/
3943
public class ParseStringDeserializer<T> implements Deserializer<T> {
4044

41-
private final BiFunction<String, Headers, T> parser;
45+
/**
46+
* Property for the key parser method.
47+
*/
48+
public static final String KEY_PARSER = "spring.message.key.parser";
49+
50+
/**
51+
* Property for the key parser method.
52+
*/
53+
public static final String VALUE_PARSER = "spring.message.value.parser";
54+
55+
private static final BiFunction<String, Headers, ?> NO_PARSER = (str, headers) -> {
56+
throw new IllegalStateException("A parser must be provided either via a constructor or consumer properties");
57+
};
58+
59+
private BiFunction<String, Headers, T> parser = (BiFunction<String, Headers, T>) NO_PARSER;
4260

4361
private Charset charset = StandardCharsets.UTF_8;
4462

63+
/**
64+
* Construct an instance with no parser function; a static method name must be
65+
* provided in the consumer config {@link #KEY_PARSER} or {@link #VALUE_PARSER}
66+
* properties.
67+
*/
68+
public ParseStringDeserializer() {
69+
}
70+
4571
/**
4672
* Construct an instance with the supplied parser function.
4773
* @param parser the function.
@@ -58,6 +84,67 @@ public ParseStringDeserializer(BiFunction<String, Headers, T> parser) {
5884
this.parser = parser;
5985
}
6086

87+
@SuppressWarnings("unchecked")
88+
@Override
89+
public void configure(Map<String, ?> configs, boolean isKey) {
90+
if (NO_PARSER.equals(this.parser)) {
91+
String parserMethod = (String) configs.get(isKey ? KEY_PARSER : VALUE_PARSER);
92+
Assert.state(parserMethod != null,
93+
"A parser must be provided either via a constructor or consumer properties");
94+
int lastDotPosn = parserMethod.lastIndexOf(".");
95+
Assert.state(lastDotPosn > 1,
96+
"the parser method needs to be a class name followed by the method name, separated by '.'");
97+
Class<?> clazz;
98+
try {
99+
clazz = ClassUtils.forName(parserMethod.substring(0, lastDotPosn),
100+
getClass().getClassLoader());
101+
}
102+
catch (ClassNotFoundException | LinkageError e) {
103+
throw new IllegalStateException(e);
104+
}
105+
parserMethod = parserMethod.substring(lastDotPosn + 1);
106+
Method method;
107+
try {
108+
method = clazz.getDeclaredMethod(parserMethod, String.class, Headers.class);
109+
}
110+
catch (@SuppressWarnings("unused") NoSuchMethodException e) {
111+
try {
112+
method = clazz.getDeclaredMethod(parserMethod, String.class);
113+
}
114+
catch (NoSuchMethodException e1) {
115+
throw new IllegalStateException("the parser method must take '(String, Headers)' or '(String)'");
116+
}
117+
catch (SecurityException e1) {
118+
throw new IllegalStateException(e1);
119+
}
120+
}
121+
catch (SecurityException e) {
122+
throw new IllegalStateException(e);
123+
}
124+
Method parseMethod = method;
125+
if (method.getParameters().length > 1) {
126+
this.parser = (str, headers) -> {
127+
try {
128+
return (T) parseMethod.invoke(null, str, headers);
129+
}
130+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
131+
throw new IllegalStateException(e);
132+
}
133+
};
134+
}
135+
else {
136+
this.parser = (str, headers) -> {
137+
try {
138+
return (T) parseMethod.invoke(null, str);
139+
}
140+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
141+
throw new IllegalStateException(e);
142+
}
143+
};
144+
}
145+
}
146+
}
147+
61148
@Override
62149
public T deserialize(String topic, byte[] data) {
63150
return deserialize(topic, null, data);

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ToFromStringSerde.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.util.Map;
20+
1921
import org.apache.kafka.common.serialization.Deserializer;
2022
import org.apache.kafka.common.serialization.Serde;
2123
import org.apache.kafka.common.serialization.Serializer;
@@ -39,8 +41,7 @@ public class ToFromStringSerde<T> implements Serde<T> {
3941
private final ParseStringDeserializer<T> fromStringDeserializer;
4042

4143
/**
42-
* Construct an instance with the provided properties which must be previously
43-
* configured ({@link #configure(java.util.Map, boolean)} is not called).
44+
* Construct an instance with the provided properties.
4445
* @param toStringSerializer the {@link ToStringSerializer}.
4546
* @param fromStringDeserializer the {@link ParseStringDeserializer}.
4647
*/
@@ -53,6 +54,12 @@ public ToFromStringSerde(ToStringSerializer<T> toStringSerializer,
5354
this.fromStringDeserializer = fromStringDeserializer;
5455
}
5556

57+
@Override
58+
public void configure(Map<String, ?> configs, boolean isKey) {
59+
this.toStringSerializer.configure(configs, isKey);
60+
this.fromStringDeserializer.configure(configs, isKey);
61+
}
62+
5663
@Override
5764
public Serializer<T> serializer() {
5865
return this.toStringSerializer;

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.nio.charset.StandardCharsets;
2222
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.Map;
2325

2426
import org.apache.kafka.common.header.Headers;
2527
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -166,6 +168,27 @@ public void testSimpleDeserialization() {
166168
.hasFieldOrPropertyWithValue("third", true);
167169
}
168170

171+
@Test
172+
@DisplayName("Test simple deserialization without headers via config")
173+
public void testSimpleDeserializationViaConfig() {
174+
175+
Map<String, Object> configs = new HashMap<>();
176+
configs.put(ParseStringDeserializer.KEY_PARSER, TestEntity.class.getName() + ".parse");
177+
/* Given */
178+
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>();
179+
deserializer.configure(configs, true);
180+
byte[] data = "toto:123:true".getBytes();
181+
182+
/* When */
183+
Object entity = deserializer.deserialize("my-topic", data);
184+
185+
/* Then */
186+
assertThat(entity)
187+
.hasFieldOrPropertyWithValue("first", "toto")
188+
.hasFieldOrPropertyWithValue("second", 123)
189+
.hasFieldOrPropertyWithValue("third", true);
190+
}
191+
169192
@Test
170193
@DisplayName("Test deserialization using headers")
171194
public void testSerialization_usingHeaders() {
@@ -189,6 +212,32 @@ public void testSerialization_usingHeaders() {
189212
.hasFieldOrPropertyWithValue("intValue", 123);
190213
}
191214

215+
@Test
216+
@DisplayName("Test deserialization using headers via config")
217+
public void testSerialization_usingHeadersViaConfig() {
218+
219+
Map<String, Object> configs = new HashMap<>();
220+
configs.put(ParseStringDeserializer.VALUE_PARSER,
221+
"org.springframework.kafka.support.serializer.ToStringSerializationTests.parseWithHeaders");
222+
/* Given */
223+
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>();
224+
deserializer.configure(configs, false);
225+
226+
byte[] data = "toto:123:true".getBytes();
227+
Headers headers = new RecordHeaders();
228+
headers.add(ToStringSerializer.VALUE_TYPE, DummyEntity.class.getName().getBytes());
229+
230+
/* When */
231+
Object entity = deserializer.deserialize("my-topic", headers, data);
232+
233+
/* Then */
234+
assertThat(entity)
235+
.isNotNull()
236+
.isInstanceOf(DummyEntity.class)
237+
.hasFieldOrPropertyWithValue("stringValue", "toto")
238+
.hasFieldOrPropertyWithValue("intValue", 123);
239+
}
240+
192241
@Test
193242
@DisplayName("Test serialization/deserialization with provided charset")
194243
public void testSerializationDeserializationWithCharset() {

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3392,6 +3392,14 @@ ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((st
33923392

33933393
You can configure the `Charset` used to convert `String` to/from `byte[]` with the default being `UTF-8`.
33943394

3395+
You can configure the deserializer with the name of the parser method using `ConsumerConfig` properties:
3396+
3397+
* `ParseStringDeserializer.KEY_PARSER`
3398+
* `ParseStringDeserializer.VALUE_PARSER`
3399+
3400+
The properties must contain the fully qualified name of the class followed by the method name, separated by a period `.`.
3401+
The method must be static and have a signature of either `(String, Headers)` or `(String)`.
3402+
33953403
A `ToFromStringSerde` is also provided, for use with Kafka Streams.
33963404

33973405
===== JSON

0 commit comments

Comments
 (0)