Skip to content

Commit 020dfc9

Browse files
garyrussellartembilan
authored andcommitted
GH-1137: Fix Json Type Headers for KStream
Fixes #1137 - use `lastHeader()` to ensure we retrieve the latest header - clear the headers when serializing, if already present - also expose the existing deser `useTypeHeaders` Kafka property as a property directly **cherry-pick to 2.2.x**
1 parent 54a8588 commit 020dfc9

File tree

4 files changed

+75
-10
lines changed

4 files changed

+75
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/AbstractJavaTypeMapper.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.nio.charset.StandardCharsets;
2020
import java.util.Collections;
2121
import java.util.HashMap;
22-
import java.util.Iterator;
2322
import java.util.Map;
2423

2524
import org.apache.kafka.common.header.Header;
@@ -157,12 +156,11 @@ protected String retrieveHeader(Headers headers, String headerName) {
157156
}
158157

159158
protected String retrieveHeaderAsString(Headers headers, String headerName) {
160-
Iterator<Header> headerValues = headers.headers(headerName).iterator();
161-
if (headerValues.hasNext()) {
162-
Header headerValue = headerValues.next();
159+
Header header = headers.lastHeader(headerName);
160+
if (header != null) {
163161
String classId = null;
164-
if (headerValue.value() != null) {
165-
classId = new String(headerValue.value(), StandardCharsets.UTF_8);
162+
if (header.value() != null) {
163+
classId = new String(header.value(), StandardCharsets.UTF_8);
166164
}
167165
return classId;
168166
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/DefaultJackson2JavaTypeMapper.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,12 @@ private boolean isTrustedPackage(String requestedType) {
160160

161161
@Override
162162
public void fromJavaType(JavaType javaType, Headers headers) {
163-
addHeader(headers, getClassIdFieldName(), javaType.getRawClass());
163+
String classIdFieldName = getClassIdFieldName();
164+
if (headers.lastHeader(classIdFieldName) != null) {
165+
removeHeaders(headers);
166+
}
167+
168+
addHeader(headers, classIdFieldName, javaType.getRawClass());
164169

165170
if (javaType.isContainerType() && !javaType.isArrayType()) {
166171
addHeader(headers, getContentClassIdFieldName(), javaType.getContentType().getRawClass());

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.util.Arrays;
21+
import java.util.Collections;
2122
import java.util.Map;
2223
import java.util.function.Consumer;
2324

@@ -119,6 +120,8 @@ public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
119120

120121
private boolean removeTypeHeaders = true;
121122

123+
private boolean useTypeHeaders = true;
124+
122125
/**
123126
* Construct an instance with a default {@link ObjectMapper}.
124127
*/
@@ -239,6 +242,21 @@ public void setRemoveTypeHeaders(boolean removeTypeHeaders) {
239242
this.removeTypeHeaders = removeTypeHeaders;
240243
}
241244

245+
/**
246+
* Set to false to ignore type information in headers and use the configured
247+
* target type instead.
248+
* Only applies if the preconfigured type mapper is used.
249+
* Default true.
250+
* @param useTypeHeaders false to ignore type headers.
251+
* @since 2.2.8
252+
*/
253+
public void setUseTypeHeaders(boolean useTypeHeaders) {
254+
if (!this.typeMapperExplicitlySet) {
255+
this.useTypeHeaders = useTypeHeaders;
256+
setUpTypePrecedence(Collections.emptyMap());
257+
}
258+
}
259+
242260
@Override
243261
public void configure(Map<String, ?> configs, boolean isKey) {
244262
setUseTypeMapperForKey(isKey);
@@ -261,11 +279,10 @@ public void configure(Map<String, ?> configs, boolean isKey) {
261279

262280
private void setUpTypePrecedence(Map<String, ?> configs) {
263281
if (!this.typeMapperExplicitlySet) {
264-
boolean useTypeHeaders = true;
265282
if (configs.containsKey(USE_TYPE_INFO_HEADERS)) {
266-
useTypeHeaders = Boolean.parseBoolean(configs.get(USE_TYPE_INFO_HEADERS).toString());
283+
this.useTypeHeaders = Boolean.parseBoolean(configs.get(USE_TYPE_INFO_HEADERS).toString());
267284
}
268-
this.typeMapper.setTypePrecedence(useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
285+
this.typeMapper.setTypePrecedence(this.useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
269286
}
270287
}
271288

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/JsonSerializationTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@ public void testDeserTypeHeadersConfig() {
195195
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
196196
.isEqualTo(TypePrecedence.TYPE_ID);
197197
this.jsonReader.configure(Collections.singletonMap(JsonDeserializer.USE_TYPE_INFO_HEADERS, false), false);
198+
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
199+
.isEqualTo(TypePrecedence.INFERRED);
200+
this.jsonReader.setUseTypeHeaders(true);
198201
this.jsonReader.configure(Collections.emptyMap(), false);
199202
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
200203
.isEqualTo(TypePrecedence.TYPE_ID);
@@ -212,6 +215,36 @@ public void testDeserializerTypeInference() {
212215
assertThat(de.deserialize(topic, ser.serialize(topic, dummy))).isEqualTo(dummy);
213216
}
214217

218+
@Test
219+
public void testPreExistingHeaders() {
220+
JsonSerializer<? super Foo> ser = new JsonSerializer<>();
221+
Headers headers = new RecordHeaders();
222+
ser.serialize("", headers, new Foo());
223+
byte[] data = ser.serialize("", headers, new Bar());
224+
JsonDeserializer<? super Foo> deser = new JsonDeserializer<>();
225+
deser.setRemoveTypeHeaders(false);
226+
deser.addTrustedPackages(this.getClass().getPackage().getName());
227+
assertThat(deser.deserialize("", headers, data)).isInstanceOf(Bar.class);
228+
assertThat(headers.headers(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME)).hasSize(1);
229+
ser.close();
230+
deser.close();
231+
}
232+
233+
@Test
234+
public void testDontUseTypeHeaders() {
235+
JsonSerializer<? super Foo> ser = new JsonSerializer<>();
236+
Headers headers = new RecordHeaders();
237+
byte[] data = ser.serialize("", headers, new Bar());
238+
JsonDeserializer<? super Foo> deser = new JsonDeserializer<>(Foo.class);
239+
deser.setRemoveTypeHeaders(false);
240+
deser.setUseTypeHeaders(false);
241+
deser.addTrustedPackages(this.getClass().getPackage().getName());
242+
assertThat(deser.deserialize("", headers, data)).isExactlyInstanceOf(Foo.class);
243+
assertThat(headers.headers(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME)).hasSize(1);
244+
ser.close();
245+
deser.close();
246+
}
247+
215248
static class DummyEntityJsonDeserializer extends JsonDeserializer<DummyEntity> {
216249

217250
}
@@ -220,4 +253,16 @@ static class DummyEntityArrayJsonDeserializer extends JsonDeserializer<DummyEnti
220253

221254
}
222255

256+
public static class Foo {
257+
258+
public String foo = "foo";
259+
260+
}
261+
262+
public static class Bar extends Foo {
263+
264+
public String bar = "bar";
265+
266+
}
267+
223268
}

0 commit comments

Comments
 (0)