Skip to content

Commit d9870e9

Browse files
committed
ConversionException Improvements
- add consumer record or `Message<?>` if available.
1 parent 137b8a1 commit d9870e9

File tree

6 files changed

+72
-11
lines changed

6 files changed

+72
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@ protected Object convertPayload(Message<?> message) {
4747
return getObjectMapper().writeValueAsBytes(message.getPayload());
4848
}
4949
catch (JsonProcessingException e) {
50-
throw new ConversionException("Failed to convert to JSON", e);
50+
throw new ConversionException("Failed to convert to JSON", message, e);
5151
}
5252
}
5353

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,7 +49,7 @@ protected Object convertPayload(Message<?> message) {
4949
return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload()));
5050
}
5151
catch (JsonProcessingException e) {
52-
throw new ConversionException("Failed to convert to JSON", e);
52+
throw new ConversionException("Failed to convert to JSON", message, e);
5353
}
5454
}
5555

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ConversionException.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,11 @@
1616

1717
package org.springframework.kafka.support.converter;
1818

19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
1921
import org.springframework.kafka.KafkaException;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.messaging.Message;
2024

2125
/**
2226
* Exception for conversions.
@@ -27,8 +31,65 @@
2731
@SuppressWarnings("serial")
2832
public class ConversionException extends KafkaException {
2933

34+
private final ConsumerRecord<?, ?> record;
35+
36+
private final Message<?> message;
37+
38+
/**
39+
* Construct an instance with the provided properties.
40+
* @param message A text message describing the reason.
41+
* @param cause the cause.
42+
*/
3043
public ConversionException(String message, Throwable cause) {
3144
super(message, cause);
45+
this.record = null;
46+
this.message = null;
47+
}
48+
49+
/**
50+
* Construct an instance with the provided properties.
51+
* @param message A text message describing the reason.
52+
* @param record the consumer record.
53+
* @param cause the cause.
54+
* @since 2.7.2
55+
*/
56+
public ConversionException(String message, ConsumerRecord<?, ?> record, Throwable cause) {
57+
super(message, cause);
58+
this.record = record;
59+
this.message = null;
60+
}
61+
62+
/**
63+
* Construct an instance with the provided properties.
64+
* @param message A text message describing the reason.
65+
* @param msg a {@link Message} converted from a consumer record.
66+
* @param cause the cause.
67+
* @since 2.7.2
68+
*/
69+
public ConversionException(String message, Message<?> msg, Throwable cause) {
70+
super(message, cause);
71+
this.record = null;
72+
this.message = msg;
73+
}
74+
75+
/**
76+
* Return the consumer record, if available.
77+
* @return the record.
78+
* @since 2.7.2
79+
*/
80+
@Nullable
81+
public ConsumerRecord<?, ?> getRecord() {
82+
return this.record;
83+
}
84+
85+
/**
86+
* Return the {@link Message}, if available.
87+
* @return the message.
88+
* @since 2.7.2
89+
*/
90+
@Nullable
91+
public Message<?> getMsg() {
92+
return this.message;
3293
}
3394

3495
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,15 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
111111
return this.objectMapper.readValue((String) value, javaType);
112112
}
113113
catch (IOException e) {
114-
throw new ConversionException("Failed to convert from JSON", e);
114+
throw new ConversionException("Failed to convert from JSON", record, e);
115115
}
116116
}
117117
else if (value instanceof byte[]) {
118118
try {
119119
return this.objectMapper.readValue((byte[]) value, javaType);
120120
}
121121
catch (IOException e) {
122-
throw new ConversionException("Failed to convert from JSON", e);
122+
throw new ConversionException("Failed to convert from JSON", record, e);
123123
}
124124
}
125125
else {

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MappingJacksonParameterizedConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,15 @@ protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @
9393
return getObjectMapper().readValue((String) value, javaType);
9494
}
9595
catch (IOException e) {
96-
throw new ConversionException("Failed to convert from JSON", e);
96+
throw new ConversionException("Failed to convert from JSON", message, e);
9797
}
9898
}
9999
else if (value instanceof byte[]) {
100100
try {
101101
return getObjectMapper().readValue((byte[]) value, javaType);
102102
}
103103
catch (IOException e) {
104-
throw new ConversionException("Failed to convert from JSON", e);
104+
throw new ConversionException("Failed to convert from JSON", message, e);
105105
}
106106
}
107107
else {

spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ protected Object convertPayload(Message<?> message) {
4848
.writeValueAsString(message.getPayload());
4949
}
5050
catch (JsonProcessingException e) {
51-
throw new ConversionException("Failed to convert to JSON", e);
51+
throw new ConversionException("Failed to convert to JSON", message, e);
5252
}
5353
}
5454

0 commit comments

Comments
 (0)