@@ -763,7 +763,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll()
763763 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
764764 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
765765 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
766- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
766+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
767767 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
768768 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
769769 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -808,7 +808,7 @@ public void shouldFailWhenErrorToleranceIsNone() throws Exception {
808808 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
809809 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
810810 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "none" );
811- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
811+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
812812 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
813813 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
814814 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -835,16 +835,18 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
835835 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
836836 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
837837 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
838- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
838+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
839839 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
840840 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
841841 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
842842
843843 connectTask .start (connectorConfigProps );
844844
845845 // An invalid message is received
846+ final TextMessage message = getJmsContext ().createTextMessage ("Invalid JSON message" );
847+ message .setJMSMessageID ("message_id" );
846848 putAllMessagesToQueue (DEFAULT_SOURCE_QUEUE ,
847- Collections .singletonList (getJmsContext (). createTextMessage ( "Invalid JSON message" ) ));
849+ Collections .singletonList (message ));
848850
849851 // The message should be routed to DLQ with error headers
850852 final List <SourceRecord > processedRecords = connectTask .poll ();
@@ -870,7 +872,9 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
870872 .isEqualTo ("org.apache.kafka.common.errors.SerializationException" );
871873 assertThat (headers .lastWithName ("__connect.errors.exception.stacktrace" ).value ()
872874 .toString ().contains ("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord" )).isTrue ();
873-
875+ assertEquals (headers .lastWithName ("__connect.errors.jms.message.id" ).value (), message .getJMSMessageID ());
876+ assertEquals (headers .lastWithName ("__connect.errors.jms.timestamp" ).value (), message .getJMSTimestamp ());
877+ assertEquals (headers .lastWithName ("__connect.errors.mq.queue" ).value (), DEFAULT_SOURCE_QUEUE );
874878 connectTask .commitRecord (dlqRecord );
875879 }
876880
@@ -881,7 +885,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception {
881885 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
882886 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
883887 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
884- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
888+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
885889 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
886890 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
887891 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -916,7 +920,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
916920 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
917921 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
918922 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
919- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
923+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
920924 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
921925 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER , "true" );
922926 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
@@ -947,7 +951,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw
947951 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
948952 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
949953 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
950- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
954+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
951955 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
952956 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
953957 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -992,7 +996,7 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
992996 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
993997 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
994998 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
995- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
999+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
9961000 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
9971001 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
9981002 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1026,7 +1030,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10261030 final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
10271031 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
10281032 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
1029- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1033+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
10301034 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
10311035 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
10321036 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1115,7 +1119,7 @@ public void verifyLoggingWarningWithErrorTolerance() throws Exception {
11151119 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_ENABLE_CONFIG , "false" ); // default; Do not log errors
11161120 // default; Do not log errors with message
11171121 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_CONFIG , "false" );
1118- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1122+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
11191123 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
11201124 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
11211125 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1150,7 +1154,7 @@ public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
11501154 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
11511155 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_ENABLE_CONFIG , "true" ); // Log errors enabled
11521156 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_CONFIG , "false" );
1153- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1157+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
11541158 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
11551159 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
11561160 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1185,7 +1189,7 @@ public void verifyLoggingErrorsWithMessage() throws Exception {
11851189 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
11861190 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_ENABLE_CONFIG , "true" ); // Log errors
11871191 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_CONFIG , "true" ); // Log errors with message
1188- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1192+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
11891193 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
11901194 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
11911195 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1220,7 +1224,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12201224 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
12211225 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_ENABLE_CONFIG , "true" ); // Log errors
12221226 connectorConfigProps .put (ConnectorConfig .ERRORS_LOG_INCLUDE_MESSAGES_CONFIG , "true" ); // Log errors with message
1223- connectorConfigProps .put (MQSourceConnector .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1227+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
12241228 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
12251229 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
12261230 "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
0 commit comments