1515 */
1616package com .ibm .eventstreams .connect .mqsource .builders ;
1717
18- import static java .nio .charset .StandardCharsets .UTF_8 ;
19-
20- import java .io .PrintWriter ;
21- import java .io .StringWriter ;
22- import java .util .Locale ;
2318import java .util .Map ;
2419import java .util .Optional ;
2520
26- import javax .jms .BytesMessage ;
2721import javax .jms .JMSContext ;
2822import javax .jms .JMSException ;
2923import javax .jms .Message ;
30- import javax .jms .TextMessage ;
3124
3225import org .apache .kafka .connect .data .Schema ;
3326import org .apache .kafka .connect .data .SchemaAndValue ;
34- import org .apache .kafka .connect .header .ConnectHeaders ;
35- import org .apache .kafka .connect .header .Headers ;
36- import org .apache .kafka .connect .runtime .ConnectorConfig ;
37- import org .apache .kafka .connect .runtime .errors .DeadLetterQueueReporter ;
38- import org .apache .kafka .connect .runtime .errors .ToleranceType ;
3927import org .apache .kafka .connect .source .SourceRecord ;
4028import org .slf4j .Logger ;
4129import org .slf4j .LoggerFactory ;
4230
4331import com .ibm .eventstreams .connect .mqsource .MQSourceConnector ;
4432import com .ibm .eventstreams .connect .mqsource .processor .JmsToKafkaHeaderConverter ;
33+ import com .ibm .eventstreams .connect .mqsource .util .ErrorHandler ;
4534
4635/**
4736 * Builds Kafka Connect SourceRecords from messages.
@@ -57,18 +46,7 @@ public enum KeyHeader {
5746
5847 private boolean copyJmsPropertiesFlag = Boolean .FALSE ;
5948 private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter ;
60- private boolean tolerateErrors ;
61- private boolean logErrors ;
62- private boolean logIncludeMessages ;
63- private String dlqTopic = "" ;
64- private String queueName = "" ;
65-
66- public static final String ERROR_HEADER_EXCEPTION_TIMESTAMP = DeadLetterQueueReporter .HEADER_PREFIX + "timestamp" ;
67- public static final String ERROR_HEADER_EXCEPTION_CAUSE_CLASS = DeadLetterQueueReporter .HEADER_PREFIX + "cause.class" ;
68- public static final String ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE = DeadLetterQueueReporter .HEADER_PREFIX + "cause.message" ;
69- public static final String ERROR_HEADER_JMS_MESSAGE_ID = DeadLetterQueueReporter .HEADER_PREFIX + "jms.message.id" ;
70- public static final String ERROR_HEADER_JMS_TIMESTAMP = DeadLetterQueueReporter .HEADER_PREFIX + "jms.timestamp" ;
71- public static final String ERROR_HEADER_QUEUE = DeadLetterQueueReporter .HEADER_PREFIX + "mq.queue" ;
49+ private ErrorHandler errorHandler = new ErrorHandler ();
7250
7351 /**
7452 * Configure this class.
@@ -82,8 +60,17 @@ public void configure(final Map<String, String> props) {
8260 log .trace ("[{}] Entry {}.configure, props={}" , Thread .currentThread ().getId (), this .getClass ().getName (),
8361 props );
8462
85- initializeErrorTolerance (props );
63+ configureKeyHeader (props );
64+ configureJmsProperties (props );
65+ configureErrorHandler (props );
66+
67+ log .trace ("[{}] Exit {}.configure" , Thread .currentThread ().getId (), this .getClass ().getName ());
68+ }
8669
70+ /**
71+ * Configure key header settings.
72+ */
73+ private void configureKeyHeader (final Map <String , String > props ) {
8774 final String kh = props .get (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER );
8875 if (kh != null ) {
8976 if (kh .equals (MQSourceConnector .CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID )) {
@@ -103,51 +90,22 @@ public void configure(final Map<String, String> props) {
10390 throw new RecordBuilderException ("Unsupported MQ record builder key header value" );
10491 }
10592 }
93+ }
10694
95+ /**
96+ * Configure JMS properties settings.
97+ */
98+ private void configureJmsProperties (final Map <String , String > props ) {
10799 final String str = props .get (MQSourceConnector .CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER );
108100 copyJmsPropertiesFlag = Boolean .parseBoolean (Optional .ofNullable (str ).orElse ("false" ));
109101 jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter ();
110-
111- log .trace ("[{}] Exit {}.configure" , Thread .currentThread ().getId (), this .getClass ().getName ());
112102 }
113103
114104 /**
115- * Initializes error tolerance configuration by reading directly from properties
116- * map
117- * instead of using AbstractConfig
105+ * Configure error handler.
118106 */
119- private void initializeErrorTolerance (final Map <String , String > props ) {
120- // Read tolerateErrors directly from props
121- final String errorToleranceValue = props .getOrDefault (
122- ConnectorConfig .ERRORS_TOLERANCE_CONFIG ,
123- ToleranceType .NONE .toString ()).toUpperCase (Locale .ROOT );
124-
125- tolerateErrors = ToleranceType .valueOf (errorToleranceValue ).equals (ToleranceType .ALL );
126-
127- // Read logErrors directly from props
128- if (tolerateErrors ) {
129- final String logErrorsValue = props .getOrDefault (
130- ConnectorConfig .ERRORS_LOG_ENABLE_CONFIG ,
131- String .valueOf (ConnectorConfig .ERRORS_LOG_ENABLE_DEFAULT ));
132- logErrors = Boolean .parseBoolean (logErrorsValue );
133- final String logIncludeMessagesValue = props .getOrDefault (
134- ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_CONFIG ,
135- String .valueOf (ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT ));
136- logIncludeMessages = Boolean .parseBoolean (logIncludeMessagesValue );
137-
138- dlqTopic = props .get (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG );
139- if (dlqTopic != null && !dlqTopic .isEmpty ()) {
140- dlqTopic = dlqTopic .trim ();
141- }
142-
143- queueName = props .get (MQSourceConnector .CONFIG_NAME_MQ_QUEUE );
144- if (queueName != null && !queueName .isEmpty ()) {
145- queueName = queueName .trim ();
146- }
147- } else {
148- logErrors = false ;
149- logIncludeMessages = false ;
150- }
107+ public void configureErrorHandler (final Map <String , String > props ) {
108+ errorHandler .configure (props , copyJmsPropertiesFlag , jmsToKafkaHeaderConverter );
151109 }
152110
153111 /**
@@ -271,220 +229,16 @@ public SourceRecord toSourceRecord(final JMSContext context, final String topic,
271229 value .value ());
272230 }
273231 } catch (final Exception e ) {
274- // Log the error
275- logError (e , topic , message );
232+ // Log the error using error handler
233+ errorHandler . logError (e , topic , message );
276234
277235 // If errors are not tolerated, rethrow
278- if (!tolerateErrors ) {
236+ if (!errorHandler . shouldTolerateErrors () ) {
279237 throw e ;
280238 }
281239
282240 // Handle the error based on configured error tolerance
283- return handleBuildException (message , sourceQueuePartition , sourceOffset , topic , key , e );
284- }
285- }
286-
287- /**
288- * Logs error based on `errors.log.enable` and `errors.log.include.messages` configurations.
289- *
290- * @param exception The exception that needs to be logged.
291- * @param topic The Kafka topic associated with the message.
292- * @param message The JMS message that caused the error.
293- */
294- private void logError (final Exception exception , final String topic , final Message message ) {
295- if (logErrors ) {
296- if (logIncludeMessages ) {
297- log .error ("Failed to process message on topic '{}'. Message content: {}. \n Exception: {}" ,
298- topic , message , exception .toString (), exception );
299- } else {
300- log .error ("Failed to process message on topic '{}'. \n Exception: {}" , topic , exception .toString (), exception );
301- }
302- } else {
303- log .warn ("Error during message processing on topic '{}', but logging is suppressed. \n Reason: {}" ,
304- topic , extractReason (exception ));
305- }
306- }
307-
308- private String extractReason (final Exception exception ) {
309- if (exception == null ) {
310- return "Unknown error" ;
311- }
312-
313- final String message = exception .getMessage ();
314- if (message == null || message .trim ().isEmpty ()) {
315- return "Unknown error" ;
316- }
317-
318- // Clean up trailing punctuation or whitespace (e.g., "error:" → "error")
319- return message .replaceAll ("[:\\ s]+$" , "" );
320- }
321-
322-
323- /**
324- *
325- * Handles conversion errors based on configuration
326- *
327- * @param message The actual MQ message
328- * @param sourceQueuePartition The Source Record queue partition
329- * @param sourceOffset The Source Record offset
330- * @param originalTopic The original topic name
331- * @param key The SchemaAndValue to include in the source
332- * record key
333- * @param exception The exception that needs to be stored in the
334- * header
335- * @return SourceRecord
336- */
337- private SourceRecord handleBuildException (final Message message , final Map <String , String > sourceQueuePartition ,
338- final Map <String , Long > sourceOffset , final String topic , final SchemaAndValue key ,
339- final Exception exception ) {
340-
341- // If errors are tolerated but no DLQ is configured, skip the message
342- if (dlqTopic == null ) {
343- log .debug (
344- "Skipping message due to conversion error: error tolerance is enabled but DLQ is not configured. Message will not be processed further." );
345- return null ;
346- }
347-
348- // Create DLQ record
349- return createDlqRecord (message , sourceQueuePartition , sourceOffset , topic , key , exception );
350- }
351-
352- /**
353- *
354- * Creates a DLQ record with error information
355- *
356- * @param message The actual MQ message
357- * @param sourceQueuePartition The Source Record queue partition
358- * @param sourceOffset The Source Record offset
359- * @param originalTopic The original topic name
360- * @param key The SchemaAndValue to include in the source
361- * record key
362- * @param exception The exception that needs to be stored in the
363- * header
364- * @return SourceRecord
365- */
366- private SourceRecord createDlqRecord (final Message message , final Map <String , String > sourceQueuePartition ,
367- final Map <String , Long > sourceOffset , final String originalTopic ,
368- final SchemaAndValue key , final Exception exception ) {
369-
370- try {
371- // Extract payload or return null if extraction fails
372- final Optional <byte []> maybePayload = extractPayload (message );
373- if (!maybePayload .isPresent ()) {
374- log .error ("Skipping message due to payload extraction failure" );
375- return null ;
376- }
377-
378- final byte [] payload = maybePayload .get ();
379-
380- // Create headers with error information
381- final Headers headers = createErrorHeaders (message , originalTopic , exception );
382-
383- return new SourceRecord (
384- sourceQueuePartition ,
385- sourceOffset ,
386- dlqTopic ,
387- null ,
388- key .schema (),
389- key .value (),
390- Schema .OPTIONAL_BYTES_SCHEMA ,
391- payload ,
392- message .getJMSTimestamp (),
393- headers );
394- } catch (final Exception dlqException ) {
395- // If DLQ processing itself fails, log and skip
396- log .error ("Failed to create DLQ record: {}" , dlqException .getMessage (), dlqException );
397- return null ;
398- }
399- }
400-
401- /**
402- *
403- * Extracts payload from a JMS message with improved error handling
404- *
405- * @param message The actual message coming from mq
406- *
407- * @return Optional<byte[]>
408- */
409- private Optional <byte []> extractPayload (final Message message ) {
410- try {
411- if (message instanceof BytesMessage ) {
412- log .debug ("Extracting payload from BytesMessage for DLQ" );
413- return Optional .ofNullable (message .getBody (byte [].class ));
414- } else if (message instanceof TextMessage ) {
415- log .debug ("Extracting payload from TextMessage for DLQ" );
416- final String text = message .getBody (String .class );
417- return Optional .ofNullable (text != null ? text .getBytes (UTF_8 ) : null );
418- } else {
419- log .warn ("Unsupported JMS message type '{}' encountered while extracting payload for DLQ. Falling back to message.toString()." ,
420- message .getClass ().getName ());
421- return Optional .ofNullable (message .toString ().getBytes (UTF_8 ));
422- }
423- } catch (final JMSException e ) {
424- log .error ("JMSException while extracting payload from message type '{}': {} for DLQ. Falling back to message.toString()." ,
425- message .getClass ().getName (), e .getMessage (), e );
426- return Optional .ofNullable (message .toString ().getBytes (UTF_8 ));
427- }
428- }
429-
430-
431- /**
432- *
433- * Creates enhanced headers with error information for DLQ records
434- * @param message The orginal message
435- *
436- * @param originalTopic The original topic name
437- * @param exception The execption that needs to be included in the header
438- *
439- * @return Headers
440- */
441- private Headers createErrorHeaders (final Message message , final String originalTopic , final Exception exception ) {
442- Headers headers = new ConnectHeaders ();
443- if (copyJmsPropertiesFlag ) {
444- headers = jmsToKafkaHeaderConverter .convertJmsPropertiesToKafkaHeaders (message );
445- }
446-
447- // Basic error information
448- headers .addString (DeadLetterQueueReporter .ERROR_HEADER_ORIG_TOPIC , originalTopic );
449- headers .addString (DeadLetterQueueReporter .ERROR_HEADER_EXECUTING_CLASS , exception .getClass ().getName ());
450- headers .addString (DeadLetterQueueReporter .ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
451-
452- try {
453- headers .addString (ERROR_HEADER_JMS_MESSAGE_ID , message .getJMSMessageID ());
454- headers .addLong (ERROR_HEADER_JMS_TIMESTAMP , message .getJMSTimestamp ());
455- } catch (final JMSException jmsException ) {
456- jmsException .printStackTrace ();
457- }
458-
459- headers .addString (ERROR_HEADER_QUEUE , queueName );
460- headers .addLong (ERROR_HEADER_EXCEPTION_TIMESTAMP , System .currentTimeMillis ());
461-
462- // Add cause if available
463- if (exception .getCause () != null ) {
464- headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
465- headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_CLASS , exception .getCause ().getClass ().getName ());
466- }
467-
468- // Add first few lines of stack trace (full stack trace might be too large)
469- headers .addString (DeadLetterQueueReporter .ERROR_HEADER_EXCEPTION_STACK_TRACE , stacktrace (exception ));
470-
471- return headers ;
472- }
473-
474- private String stacktrace (final Exception exception ) {
475- try {
476- final StringWriter sw = new StringWriter ();
477- final PrintWriter pw = new PrintWriter (sw );
478- exception .printStackTrace (pw );
479- final String stackTrace = sw .toString ();
480-
481- // First 500 characters or less to avoid overly large headers
482- final String truncatedStackTrace = stackTrace .length () <= 500 ? stackTrace
483- : stackTrace .substring (0 , 500 ) + "... [truncated]" ;
484- return truncatedStackTrace ;
485- } catch (final Exception e ) {
486- log .warn ("Could not add stack trace to DLQ headers" , e );
241+ return errorHandler .handleBuildException (message , sourceQueuePartition , sourceOffset , topic , key , e );
487242 }
488- return null ;
489243 }
490244}
0 commit comments