1717import org .apache .kafka .connect .header .ConnectHeaders ;
1818import org .apache .kafka .connect .header .Headers ;
1919import org .apache .kafka .connect .runtime .ConnectorConfig ;
20- import org .apache .kafka .connect .runtime .errors .DeadLetterQueueReporter ;
2120import org .apache .kafka .connect .runtime .errors .ToleranceType ;
2221import org .apache .kafka .connect .source .SourceRecord ;
2322import org .slf4j .Logger ;
@@ -35,12 +34,6 @@ public class ErrorHandler {
3534
3635 public static final String HEADER_PREFIX = "__connect.errors." ;
3736 public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic" ;
38- public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition" ;
39- public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset" ;
40- public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name" ;
41- public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id" ;
42- public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage" ;
43- public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name" ;
4437 public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name" ;
4538 public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message" ;
4639 public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace" ;
@@ -113,7 +106,6 @@ private void initializeErrorTolerance(final Map<String, String> props) {
113106 dlqTopic = props .get (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG );
114107 if (dlqTopic != null && !dlqTopic .isEmpty ()) {
115108 dlqTopic = dlqTopic .trim ();
116- // TODO: Check if DLQ topic exists
117109 }
118110
119111 queueName = props .get (MQSourceConnector .CONFIG_NAME_MQ_QUEUE );
@@ -282,9 +274,9 @@ private Headers createErrorHeaders(final Message message, final String originalT
282274 }
283275
284276 // Basic error information
285- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_ORIG_TOPIC , originalTopic );
286- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXECUTING_CLASS , exception .getClass ().getName ());
287- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
277+ headers .addString (ERROR_HEADER_ORIG_TOPIC , originalTopic );
278+ headers .addString (ERROR_HEADER_EXCEPTION , exception .getClass ().getName ());
279+ headers .addString (ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
288280
289281 try {
290282 headers .addString (ERROR_HEADER_JMS_MESSAGE_ID , message .getJMSMessageID ());
@@ -298,14 +290,14 @@ private Headers createErrorHeaders(final Message message, final String originalT
298290
299291 // Add cause if available
300292 if (exception .getCause () != null ) {
301- headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
302293 headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_CLASS , exception .getCause ().getClass ().getName ());
294+ headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
303295 }
304296
305297 // Add first few lines of stack trace (full stack trace might be too large)
306298 final String stackTrace = getStackTrace (exception );
307299 if (stackTrace != null ) {
308- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
300+ headers .addString (ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
309301 }
310302
311303 return headers ;
0 commit comments