Skip to content

Commit 96b4369

Browse files
committed
Support keys in Delegating(De)Serializer
Previously, a single config and selector header was available making it impossible to use these (De)Serializers for both key and value. **I will backport after merge as needed due to conflicts** * Fix javadoc.
1 parent 9a6398d commit 96b4369

File tree

4 files changed

+168
-36
lines changed

4 files changed

+168
-36
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22+
import org.apache.kafka.common.header.Header;
2223
import org.apache.kafka.common.header.Headers;
2324
import org.apache.kafka.common.serialization.Deserializer;
2425

@@ -40,16 +41,23 @@ public class DelegatingDeserializer implements Deserializer<Object> {
4041
/**
4142
* Name of the configuration property containing the serialization selector map with
4243
* format {@code selector:class,...}.
44+
* @deprecated Use {@link DelegatingSerializer#VALUE_SERIALIZATION_SELECTOR} or
45+
* {@link DelegatingSerializer#KEY_SERIALIZATION_SELECTOR}.
4346
*/
47+
@Deprecated
4448
public static final String SERIALIZATION_SELECTOR_CONFIG = DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG;
4549

4650

4751
private final Map<String, Deserializer<?>> delegates = new HashMap<>();
4852

53+
54+
private boolean forKeys;
55+
4956
/**
5057
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
51-
* with a consumer property
52-
* {@link #SERIALIZATION_SELECTOR_CONFIG}.
58+
* with consumer properties
59+
* {@link DelegatingSerializer#KEY_SERIALIZATION_SELECTOR_CONFIG} and
60+
* {@link DelegatingSerializer#VALUE_SERIALIZATION_SELECTOR_CONFIG}.
5361
*/
5462
public DelegatingDeserializer() {
5563
}
@@ -67,7 +75,9 @@ public DelegatingDeserializer(Map<String, Deserializer<?>> delegates) {
6775
@SuppressWarnings("unchecked")
6876
@Override
6977
public void configure(Map<String, ?> configs, boolean isKey) {
70-
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
78+
this.forKeys = isKey;
79+
String configKey = configKey();
80+
Object value = configs.get(configKey);
7181
if (value == null) {
7282
return;
7383
}
@@ -83,7 +93,7 @@ else if (deser instanceof String) {
8393
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
8494
}
8595
else {
86-
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
96+
throw new IllegalStateException(configKey
8797
+ " map entries must be Serializers or class names, not " + value.getClass());
8898
}
8999
});
@@ -92,11 +102,16 @@ else if (value instanceof String) {
92102
this.delegates.putAll(createDelegates((String) value, configs, isKey));
93103
}
94104
else {
95-
throw new IllegalStateException(
96-
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
105+
throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass());
97106
}
98107
}
99108

109+
private String configKey() {
110+
return this.forKeys
111+
? DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
112+
: DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG;
113+
}
114+
100115
protected static Map<String, Deserializer<?>> createDelegates(String mappings, Map<String, ?> configs,
101116
boolean isKey) {
102117

@@ -151,9 +166,14 @@ public Object deserialize(String topic, byte[] data) {
151166

152167
@Override
153168
public Object deserialize(String topic, Headers headers, byte[] data) {
154-
byte[] value = headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value();
169+
byte[] value = null;
170+
String selectorKey = selectorKey();
171+
Header header = headers.lastHeader(selectorKey);
172+
if (header != null) {
173+
value = header.value();
174+
}
155175
if (value == null) {
156-
throw new IllegalStateException("No '" + DelegatingSerializer.SERIALIZATION_SELECTOR + "' header present");
176+
throw new IllegalStateException("No '" + selectorKey + "' header present");
157177
}
158178
String selector = new String(value).replaceAll("\"", "");
159179
@SuppressWarnings("unchecked")
@@ -166,6 +186,12 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
166186
}
167187
}
168188

189+
private String selectorKey() {
190+
return this.forKeys
191+
? DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
192+
: DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR;
193+
}
194+
169195
@Override
170196
public void close() {
171197
this.delegates.values().forEach(deser -> deser.close());

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingSerializer.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22+
import org.apache.kafka.common.header.Header;
2223
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.serialization.Serdes;
2325
import org.apache.kafka.common.serialization.Serializer;
2426

2527
import org.springframework.lang.Nullable;
@@ -38,30 +40,59 @@
3840
public class DelegatingSerializer implements Serializer<Object> {
3941

4042
/**
41-
* Name of the header containing the serialization selector.
43+
* Synonym for {@link #VALUE_SERIALIZATION_SELECTOR}.
44+
* @deprecated in favor of {@link #VALUE_SERIALIZATION_SELECTOR}.
4245
*/
46+
@Deprecated
4347
public static final String SERIALIZATION_SELECTOR = "spring.kafka.serialization.selector";
4448

4549
/**
46-
* Name of the configuration property containing the serialization selector map with
47-
* format {@code selector:class,...}.
50+
* Name of the header containing the serialization selector for values.
4851
*/
52+
public static final String VALUE_SERIALIZATION_SELECTOR = "spring.kafka.serialization.selector";
53+
54+
/**
55+
* Name of the header containing the serialization selector for keys.
56+
*/
57+
public static final String KEY_SERIALIZATION_SELECTOR = "spring.kafka.key.serialization.selector";
58+
59+
/**
60+
* Synonym for {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG}.
61+
* @deprecated in favor of {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG}.
62+
*/
63+
@Deprecated
4964
public static final String SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";
5065

66+
/**
67+
* Name of the configuration property containing the serialization selector map for
68+
* values with format {@code selector:class,...}.
69+
*/
70+
public static final String VALUE_SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";
71+
72+
/**
73+
* Name of the configuration property containing the serialization selector map for
74+
* keys with format {@code selector:class,...}.
75+
*/
76+
public static final String KEY_SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.key.serialization.selector.config";
77+
5178
private final Map<String, Serializer<?>> delegates = new HashMap<>();
5279

80+
private boolean forKeys;
81+
5382
/**
5483
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
55-
* with a producer property
56-
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR_CONFIG}.
84+
* with producer properties {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG} and
85+
* {@link #KEY_SERIALIZATION_SELECTOR_CONFIG}.
5786
*/
5887
public DelegatingSerializer() {
5988
}
6089

6190
/**
6291
* Construct an instance with the supplied mapping of selectors to delegate
6392
* serializers. The selector must be supplied in the
64-
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header.
93+
* {@link #KEY_SERIALIZATION_SELECTOR} and/or {@link #VALUE_SERIALIZATION_SELECTOR}
94+
* headers. It is not necessary to configure standard serializers supported by
95+
* {@link Serdes}.
6596
* @param delegates the map of delegates.
6697
*/
6798
public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
@@ -71,7 +102,9 @@ public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
71102
@SuppressWarnings("unchecked")
72103
@Override
73104
public void configure(Map<String, ?> configs, boolean isKey) {
74-
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
105+
this.forKeys = isKey;
106+
String configKey = configKey();
107+
Object value = configs.get(configKey);
75108
if (value == null) {
76109
return;
77110
}
@@ -87,7 +120,7 @@ else if (serializer instanceof String) {
87120
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) serializer);
88121
}
89122
else {
90-
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
123+
throw new IllegalStateException(configKey
91124
+ " map entries must be Serializers or class names, not " + value.getClass());
92125
}
93126
});
@@ -97,10 +130,14 @@ else if (value instanceof String) {
97130
}
98131
else {
99132
throw new IllegalStateException(
100-
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
133+
configKey + " must be a map or String, not " + value.getClass());
101134
}
102135
}
103136

137+
private String configKey() {
138+
return this.forKeys ? KEY_SERIALIZATION_SELECTOR_CONFIG : VALUE_SERIALIZATION_SELECTOR_CONFIG;
139+
}
140+
104141
protected static Map<String, Serializer<?>> createDelegates(String mappings, Map<String, ?> configs,
105142
boolean isKey) {
106143

@@ -156,20 +193,29 @@ public byte[] serialize(String topic, Object data) {
156193

157194
@Override
158195
public byte[] serialize(String topic, Headers headers, Object data) {
159-
byte[] value = headers.lastHeader(SERIALIZATION_SELECTOR).value();
196+
byte[] value = null;
197+
String selectorKey = selectorKey();
198+
Header header = headers.lastHeader(selectorKey);
199+
if (header != null) {
200+
value = header.value();
201+
}
160202
if (value == null) {
161-
throw new IllegalStateException("No '" + SERIALIZATION_SELECTOR + "' header present");
203+
throw new IllegalStateException("No '" + selectorKey + "' header present");
162204
}
163205
String selector = new String(value).replaceAll("\"", "");
164206
@SuppressWarnings("unchecked")
165207
Serializer<Object> serializer = (Serializer<Object>) this.delegates.get(selector);
166208
if (serializer == null) {
167209
throw new IllegalStateException(
168-
"No serializer found for '" + SERIALIZATION_SELECTOR + "' header with value '" + selector + "'");
210+
"No serializer found for '" + selectorKey + "' header with value '" + selector + "'");
169211
}
170212
return serializer.serialize(topic, headers, data);
171213
}
172214

215+
private String selectorKey() {
216+
return this.forKeys ? KEY_SERIALIZATION_SELECTOR : VALUE_SERIALIZATION_SELECTOR;
217+
}
218+
173219
@Override
174220
public void close() {
175221
this.delegates.values().forEach(ser -> ser.close());

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,14 +52,14 @@ void testWithMapConfig() {
5252
serializers.put("bytes", new BytesSerializer());
5353
serializers.put("int", IntegerSerializer.class);
5454
serializers.put("string", StringSerializer.class.getName());
55-
configs.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG, serializers);
55+
configs.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG, serializers);
5656
serializer.configure(configs, false);
5757
DelegatingDeserializer deserializer = new DelegatingDeserializer();
5858
Map<String, Object> deserializers = new HashMap<>();
5959
deserializers.put("bytes", new BytesDeserializer());
6060
deserializers.put("int", IntegerDeserializer.class);
6161
deserializers.put("string", StringDeserializer.class.getName());
62-
configs.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG, deserializers);
62+
configs.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG, deserializers);
6363
deserializer.configure(configs, false);
6464
doTest(serializer, deserializer);
6565
}
@@ -68,36 +68,96 @@ void testWithMapConfig() {
6868
void testWithPropertyConfig() {
6969
DelegatingSerializer serializer = new DelegatingSerializer();
7070
Map<String, Object> configs = new HashMap<>();
71-
configs.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesSerializer.class.getName()
71+
configs.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesSerializer.class.getName()
7272
+ ", int:" + IntegerSerializer.class.getName() + ", string: " + StringSerializer.class.getName());
7373
serializer.configure(configs, false);
7474
DelegatingDeserializer deserializer = new DelegatingDeserializer();
75-
configs.put(DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesDeserializer.class.getName()
75+
configs.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesDeserializer.class.getName()
7676
+ ", int:" + IntegerDeserializer.class.getName() + ", string: " + StringDeserializer.class.getName());
7777
deserializer.configure(configs, false);
7878
doTest(serializer, deserializer);
7979
}
8080

81+
@Test
82+
void testWithMapConfigKeys() {
83+
DelegatingSerializer serializer = new DelegatingSerializer();
84+
Map<String, Object> configs = new HashMap<>();
85+
Map<String, Object> serializers = new HashMap<>();
86+
serializers.put("bytes", new BytesSerializer());
87+
serializers.put("int", IntegerSerializer.class);
88+
serializers.put("string", StringSerializer.class.getName());
89+
configs.put(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG, serializers);
90+
serializer.configure(configs, true);
91+
DelegatingDeserializer deserializer = new DelegatingDeserializer();
92+
Map<String, Object> deserializers = new HashMap<>();
93+
deserializers.put("bytes", new BytesDeserializer());
94+
deserializers.put("int", IntegerDeserializer.class);
95+
deserializers.put("string", StringDeserializer.class.getName());
96+
configs.put(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG, deserializers);
97+
deserializer.configure(configs, true);
98+
doTestKeys(serializer, deserializer);
99+
}
100+
101+
@Test
102+
void testWithPropertyConfigKeys() {
103+
DelegatingSerializer serializer = new DelegatingSerializer();
104+
Map<String, Object> configs = new HashMap<>();
105+
configs.put(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesSerializer.class.getName()
106+
+ ", int:" + IntegerSerializer.class.getName() + ", string: " + StringSerializer.class.getName());
107+
serializer.configure(configs, true);
108+
DelegatingDeserializer deserializer = new DelegatingDeserializer();
109+
configs.put(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG, "bytes:" + BytesDeserializer.class.getName()
110+
+ ", int:" + IntegerDeserializer.class.getName() + ", string: " + StringDeserializer.class.getName());
111+
deserializer.configure(configs, true);
112+
doTestKeys(serializer, deserializer);
113+
}
114+
81115
private void doTest(DelegatingSerializer serializer, DelegatingDeserializer deserializer) {
82116
Headers headers = new RecordHeaders();
83-
headers.add(new RecordHeader(DelegatingSerializer.SERIALIZATION_SELECTOR, "bytes".getBytes()));
117+
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "bytes".getBytes()));
118+
byte[] bytes = new byte[] { 1, 2, 3, 4 };
119+
byte[] serialized = serializer.serialize("foo", headers, new Bytes(bytes));
120+
assertThat(serialized).isSameAs(bytes);
121+
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "int".getBytes()));
122+
serialized = serializer.serialize("foo", headers, 42);
123+
assertThat(serialized).isEqualTo(new byte[] { 0, 0, 0, 42 });
124+
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo(42);
125+
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "string".getBytes()));
126+
serialized = serializer.serialize("foo", headers, "bar");
127+
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
128+
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");
129+
130+
// The DKHM will jsonize the value; test that we ignore the quotes
131+
MessageHeaders messageHeaders = new MessageHeaders(
132+
Collections.singletonMap(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "string"));
133+
new DefaultKafkaHeaderMapper().fromHeaders(messageHeaders, headers);
134+
assertThat(headers.lastHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR).value())
135+
.isEqualTo(new byte[] { 's', 't', 'r', 'i', 'n', 'g' });
136+
serialized = serializer.serialize("foo", headers, "bar");
137+
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
138+
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");
139+
}
140+
141+
private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer deserializer) {
142+
Headers headers = new RecordHeaders();
143+
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "bytes".getBytes()));
84144
byte[] bytes = new byte[] { 1, 2, 3, 4 };
85145
byte[] serialized = serializer.serialize("foo", headers, new Bytes(bytes));
86146
assertThat(serialized).isSameAs(bytes);
87-
headers.add(new RecordHeader(DelegatingSerializer.SERIALIZATION_SELECTOR, "int".getBytes()));
147+
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "int".getBytes()));
88148
serialized = serializer.serialize("foo", headers, 42);
89149
assertThat(serialized).isEqualTo(new byte[] { 0, 0, 0, 42 });
90150
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo(42);
91-
headers.add(new RecordHeader(DelegatingSerializer.SERIALIZATION_SELECTOR, "string".getBytes()));
151+
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "string".getBytes()));
92152
serialized = serializer.serialize("foo", headers, "bar");
93153
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
94154
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");
95155

96156
// The DKHM will jsonize the value; test that we ignore the quotes
97157
MessageHeaders messageHeaders = new MessageHeaders(
98-
Collections.singletonMap(DelegatingSerializer.SERIALIZATION_SELECTOR, "string"));
158+
Collections.singletonMap(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "string"));
99159
new DefaultKafkaHeaderMapper().fromHeaders(messageHeaders, headers);
100-
assertThat(headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value())
160+
assertThat(headers.lastHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR).value())
101161
.isEqualTo(new byte[] { 's', 't', 'r', 'i', 'n', 'g' });
102162
serialized = serializer.serialize("foo", headers, "bar");
103163
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });

0 commit comments

Comments
 (0)