Skip to content

Commit 3a87c5e

Browse files
authored
GH-2000: Doc RetryTopic Publish with Deser. Errors
Resolves #2000 Backport `DelegatingByTypeSerializer`. This is for 2.7.x; cherry pick forward to main. * Doc polishing.
1 parent 3da65a8 commit 3a87c5e

File tree

3 files changed

+129
-0
lines changed

3 files changed

+129
-0
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4444,6 +4444,27 @@ consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.cl
44444444
----
44454445
====
44464446

4447+
IMPORTANT: If the consumer is configured with an `ErrorHandlingDeserializer` it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions.
4448+
The generic value type of the template should be `Object`.
4449+
One technique is to use the `DelegatingByTypeSerializer`; an example follows:
4450+
4451+
====
4452+
[source, java]
4453+
----
4454+
@Bean
4455+
public ProducerFactory<String, Object> producerFactory() {
4456+
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
4457+
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
4458+
MyNormalObject.class, new JsonSerializer<Object>())));
4459+
}
4460+
4461+
@Bean
4462+
public KafkaTemplate<String, Object> kafkaTemplate() {
4463+
return new KafkaTemplate<>(producerFactory());
4464+
}
4465+
----
4466+
====
4467+
44474468
[[payload-conversion-with-batch]]
44484469
===== Payload Conversion with Batch Listeners
44494470

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,27 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPo
127127

128128
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix. If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
129129

130+
IMPORTANT: If the consumer is configured with an <<error-handling-deserializer,`ErrorHandlingDeserializer`>>, to handle deserilialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions.
131+
The generic value type of the template should be `Object`.
132+
One technique is to use the `DelegatingByTypeSerializer`; an example follows:
133+
134+
====
135+
[source, java]
136+
----
137+
@Bean
138+
public ProducerFactory<String, Object> producerFactory() {
139+
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
140+
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
141+
MyNormalObject.class, new JsonSerializer<Object>())));
142+
}
143+
144+
@Bean
145+
public KafkaTemplate<String, Object> kafkaTemplate() {
146+
return new KafkaTemplate<>(producerFactory());
147+
}
148+
----
149+
====
150+
130151
==== Features
131152

132153
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.stream.Collectors;
22+
23+
import org.apache.kafka.common.errors.SerializationException;
24+
import org.apache.kafka.common.header.Headers;
25+
import org.apache.kafka.common.serialization.Serializer;
26+
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Delegates to a serializer based on type.
31+
*
32+
* @author Gary Russell
33+
* @since 2.7.9
34+
*
35+
*/
36+
public class DelegatingByTypeSerializer implements Serializer<Object> {
37+
38+
private static final String RAWTYPES = "rawtypes";
39+
40+
@SuppressWarnings(RAWTYPES)
41+
private final Map<Class<?>, Serializer> delegates = new HashMap<>();
42+
43+
/**
44+
* Construct an instance with the map of delegates.
45+
* @param delegates the delegates.
46+
*/
47+
@SuppressWarnings(RAWTYPES)
48+
public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates) {
49+
Assert.notNull(delegates, "'delegates' cannot be null");
50+
Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null");
51+
this.delegates.putAll(delegates);
52+
}
53+
54+
@SuppressWarnings("unchecked")
55+
@Override
56+
public void configure(Map<String, ?> configs, boolean isKey) {
57+
this.delegates.values().forEach(del -> del.configure(configs, isKey));
58+
}
59+
60+
@SuppressWarnings({ RAWTYPES, "unchecked" })
61+
@Override
62+
public byte[] serialize(String topic, Object data) {
63+
Serializer delegate = findDelegate(data);
64+
return delegate.serialize(topic, data);
65+
}
66+
67+
@SuppressWarnings({ "unchecked", RAWTYPES })
68+
@Override
69+
public byte[] serialize(String topic, Headers headers, Object data) {
70+
Serializer delegate = findDelegate(data);
71+
return delegate.serialize(topic, headers, data);
72+
}
73+
74+
@SuppressWarnings(RAWTYPES)
75+
private Serializer findDelegate(Object data) {
76+
Serializer delegate = this.delegates.get(data.getClass());
77+
if (delegate == null) {
78+
throw new SerializationException("No matching delegate for type: " + data.getClass().getName()
79+
+ "; supported types: " + this.delegates.keySet().stream()
80+
.map(clazz -> clazz.getName())
81+
.collect(Collectors.toList()));
82+
}
83+
return delegate;
84+
}
85+
86+
87+
}

0 commit comments

Comments
 (0)