Skip to content

Commit a9b9d44

Browse files
authored
Support keys in Delegating(De)Serializer
Previously, a single config and selector header was available making it impossible to use these (De)Serializers for both key and value. **I will backport after merge as needed due to conflicts** * Fix javadoc.
1 parent ae03a53 commit a9b9d44

File tree

4 files changed

+166
-44
lines changed

4 files changed

+166
-44
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.kafka.common.serialization.Serde;
2727
import org.apache.kafka.common.serialization.Serdes;
2828

29-
import org.springframework.core.log.LogAccessor;
3029
import org.springframework.lang.Nullable;
3130
import org.springframework.util.Assert;
3231
import org.springframework.util.ClassUtils;
@@ -43,12 +42,13 @@
4342
*/
4443
public class DelegatingDeserializer implements Deserializer<Object> {
4544

46-
private static final LogAccessor LOGGER = new LogAccessor(DelegatingDeserializer.class);
47-
4845
/**
4946
* Name of the configuration property containing the serialization selector map with
5047
* format {@code selector:class,...}.
48+
* @deprecated Use {@link DelegatingSerializer#VALUE_SERIALIZATION_SELECTOR} or
49+
* {@link DelegatingSerializer#KEY_SERIALIZATION_SELECTOR}.
5150
*/
51+
@Deprecated
5252
public static final String SERIALIZATION_SELECTOR_CONFIG = DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG;
5353

5454

@@ -60,8 +60,9 @@ public class DelegatingDeserializer implements Deserializer<Object> {
6060

6161
/**
6262
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
63-
* with a consumer property
64-
* {@link #SERIALIZATION_SELECTOR_CONFIG}.
63+
* with consumer properties
64+
* {@link DelegatingSerializer#KEY_SERIALIZATION_SELECTOR_CONFIG} and
65+
* {@link DelegatingSerializer#VALUE_SERIALIZATION_SELECTOR_CONFIG}.
6566
*/
6667
public DelegatingDeserializer() {
6768
}
@@ -82,7 +83,8 @@ public DelegatingDeserializer(Map<String, Deserializer<?>> delegates) {
8283
public void configure(Map<String, ?> configs, boolean isKey) {
8384
this.autoConfigs.putAll(configs);
8485
this.forKeys = isKey;
85-
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
86+
String configKey = configKey();
87+
Object value = configs.get(configKey);
8688
if (value == null) {
8789
return;
8890
}
@@ -98,7 +100,7 @@ else if (deser instanceof String) {
98100
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
99101
}
100102
else {
101-
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
103+
throw new IllegalStateException(configKey
102104
+ " map entries must be Serializers or class names, not " + value.getClass());
103105
}
104106
});
@@ -107,11 +109,16 @@ else if (value instanceof String) {
107109
this.delegates.putAll(createDelegates((String) value, configs, isKey));
108110
}
109111
else {
110-
throw new IllegalStateException(
111-
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
112+
throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass());
112113
}
113114
}
114115

116+
private String configKey() {
117+
return this.forKeys
118+
? DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
119+
: DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG;
120+
}
121+
115122
protected static Map<String, Deserializer<?>> createDelegates(String mappings, Map<String, ?> configs,
116123
boolean isKey) {
117124

@@ -167,12 +174,13 @@ public Object deserialize(String topic, byte[] data) {
167174
@Override
168175
public Object deserialize(String topic, Headers headers, byte[] data) {
169176
byte[] value = null;
170-
Header header = headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR);
177+
String selectorKey = selectorKey();
178+
Header header = headers.lastHeader(selectorKey);
171179
if (header != null) {
172180
value = header.value();
173181
}
174182
if (value == null) {
175-
throw new IllegalStateException("No '" + DelegatingSerializer.SERIALIZATION_SELECTOR + "' header present");
183+
throw new IllegalStateException("No '" + selectorKey + "' header present");
176184
}
177185
String selector = new String(value).replaceAll("\"", "");
178186
Deserializer<? extends Object> deserializer = this.delegates.get(selector);
@@ -187,6 +195,12 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
187195
}
188196
}
189197

198+
private String selectorKey() {
199+
return this.forKeys
200+
? DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
201+
: DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR;
202+
}
203+
190204
/*
191205
* Package for testing.
192206
*/

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingSerializer.java

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,41 @@ public class DelegatingSerializer implements Serializer<Object> {
4747
private static final LogAccessor LOGGER = new LogAccessor(DelegatingDeserializer.class);
4848

4949
/**
50-
* Name of the header containing the serialization selector.
50+
* Synonym for {@link #VALUE_SERIALIZATION_SELECTOR}.
51+
* @deprecated in favor of {@link #VALUE_SERIALIZATION_SELECTOR}.
5152
*/
53+
@Deprecated
5254
public static final String SERIALIZATION_SELECTOR = "spring.kafka.serialization.selector";
5355

5456
/**
55-
* Name of the configuration property containing the serialization selector map with
56-
* format {@code selector:class,...}.
57+
* Name of the header containing the serialization selector for values.
5758
*/
59+
public static final String VALUE_SERIALIZATION_SELECTOR = "spring.kafka.serialization.selector";
60+
61+
/**
62+
* Name of the header containing the serialization selector for keys.
63+
*/
64+
public static final String KEY_SERIALIZATION_SELECTOR = "spring.kafka.key.serialization.selector";
65+
66+
/**
67+
* Synonym for {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG}.
68+
* @deprecated in favor of {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG}.
69+
*/
70+
@Deprecated
5871
public static final String SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";
5972

73+
/**
74+
* Name of the configuration property containing the serialization selector map for
75+
* values with format {@code selector:class,...}.
76+
*/
77+
public static final String VALUE_SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";
78+
79+
/**
80+
* Name of the configuration property containing the serialization selector map for
81+
* keys with format {@code selector:class,...}.
82+
*/
83+
public static final String KEY_SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.key.serialization.selector.config";
84+
6085
private final Map<String, Serializer<?>> delegates = new ConcurrentHashMap<>();
6186

6287
private final Map<String, Object> autoConfigs = new HashMap<>();
@@ -65,17 +90,18 @@ public class DelegatingSerializer implements Serializer<Object> {
6590

6691
/**
6792
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
68-
* with a producer property
69-
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR_CONFIG}.
93+
* with producer properties {@link #VALUE_SERIALIZATION_SELECTOR_CONFIG} and
94+
* {@link #KEY_SERIALIZATION_SELECTOR_CONFIG}.
7095
*/
7196
public DelegatingSerializer() {
7297
}
7398

7499
/**
75100
* Construct an instance with the supplied mapping of selectors to delegate
76101
* serializers. The selector must be supplied in the
77-
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header. It is not necessary to
78-
* configure standard serializers supported by {@link Serdes}.
102+
* {@link #KEY_SERIALIZATION_SELECTOR} and/or {@link #VALUE_SERIALIZATION_SELECTOR}
103+
* headers. It is not necessary to configure standard serializers supported by
104+
* {@link Serdes}.
79105
* @param delegates the map of delegates.
80106
*/
81107
public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
@@ -87,7 +113,8 @@ public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
87113
public void configure(Map<String, ?> configs, boolean isKey) {
88114
this.autoConfigs.putAll(configs);
89115
this.forKeys = isKey;
90-
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
116+
String configKey = configKey();
117+
Object value = configs.get(configKey);
91118
if (value == null) {
92119
return;
93120
}
@@ -103,7 +130,7 @@ else if (serializer instanceof String) {
103130
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) serializer);
104131
}
105132
else {
106-
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
133+
throw new IllegalStateException(configKey
107134
+ " map entries must be Serializers or class names, not " + value.getClass());
108135
}
109136
});
@@ -113,10 +140,14 @@ else if (value instanceof String) {
113140
}
114141
else {
115142
throw new IllegalStateException(
116-
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
143+
configKey + " must be a map or String, not " + value.getClass());
117144
}
118145
}
119146

147+
private String configKey() {
148+
return this.forKeys ? KEY_SERIALIZATION_SELECTOR_CONFIG : VALUE_SERIALIZATION_SELECTOR_CONFIG;
149+
}
150+
120151
protected static Map<String, Serializer<?>> createDelegates(String mappings, Map<String, ?> configs,
121152
boolean isKey) {
122153

@@ -173,19 +204,20 @@ public byte[] serialize(String topic, Object data) {
173204
@Override
174205
public byte[] serialize(String topic, Headers headers, Object data) {
175206
byte[] value = null;
176-
Header header = headers.lastHeader(SERIALIZATION_SELECTOR);
207+
String selectorKey = selectorKey();
208+
Header header = headers.lastHeader(selectorKey);
177209
if (header != null) {
178210
value = header.value();
179211
}
180212
if (value == null) {
181213
value = trySerdes(data);
182214
if (value == null) {
183-
throw new IllegalStateException("No '" + SERIALIZATION_SELECTOR
215+
throw new IllegalStateException("No '" + selectorKey
184216
+ "' header present and type (" + data.getClass().getName()
185217
+ ") is not supported by Serdes");
186218
}
187219
try {
188-
headers.add(new RecordHeader(SERIALIZATION_SELECTOR, value));
220+
headers.add(new RecordHeader(selectorKey, value));
189221
}
190222
catch (IllegalStateException e) {
191223
LOGGER.debug(e, () -> "Could not set header for type " + data.getClass());
@@ -196,11 +228,15 @@ public byte[] serialize(String topic, Headers headers, Object data) {
196228
Serializer<Object> serializer = (Serializer<Object>) this.delegates.get(selector);
197229
if (serializer == null) {
198230
throw new IllegalStateException(
199-
"No serializer found for '" + SERIALIZATION_SELECTOR + "' header with value '" + selector + "'");
231+
"No serializer found for '" + selectorKey + "' header with value '" + selector + "'");
200232
}
201233
return serializer.serialize(topic, headers, data);
202234
}
203235

236+
private String selectorKey() {
237+
return this.forKeys ? KEY_SERIALIZATION_SELECTOR : VALUE_SERIALIZATION_SELECTOR;
238+
}
239+
204240
/*
205241
* Package for testing.
206242
*/

0 commit comments

Comments
 (0)