@@ -836,6 +836,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
836836 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
837837 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
838838 connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
839+ connectorConfigProps .put (MQSourceConnector .DLQ_CONTEXT_HEADERS_ENABLE_CONFIG , "true" );
839840 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
840841 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
841842 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -921,6 +922,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
921922 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
922923 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
923924 connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
925+ connectorConfigProps .put (MQSourceConnector .DLQ_CONTEXT_HEADERS_ENABLE_CONFIG , "true" );
924926 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
925927 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER , "true" );
926928 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
@@ -1031,6 +1033,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10311033 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
10321034 connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
10331035 connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1036+ connectorConfigProps .put (MQSourceConnector .DLQ_CONTEXT_HEADERS_ENABLE_CONFIG , "true" );
10341037 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
10351038 connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
10361039 "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
@@ -1109,6 +1112,56 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
11091112 .toString ().contains ("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord" )).isTrue ();
11101113 }
11111114
1115+ @ Test
1116+ public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled () throws Exception {
1117+ connectTask = getSourceTaskWithEmptyKafkaOffset ();
1118+
1119+ final Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
1120+ connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_TOPIC , "mytopic" );
1121+ connectorConfigProps .put (ConnectorConfig .ERRORS_TOLERANCE_CONFIG , "all" );
1122+ connectorConfigProps .put (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG , "__dlq.mq.source" );
1123+ connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_MESSAGE_BODY_JMS , "true" );
1124+ connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_RECORD_BUILDER ,
1125+ "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
1126+ connectorConfigProps .put (MQSourceConnector .CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER , "true" );
1127+
1128+ connectTask .start (connectorConfigProps );
1129+
1130+ final TextMessage message = getJmsContext ().createTextMessage ("Invalid JSON message" );
1131+ message .setStringProperty ("teststring" , "myvalue" );
1132+ message .setIntProperty ("volume" , 11 );
1133+ message .setDoubleProperty ("decimalmeaning" , 42.0 );
1134+
1135+ // Both invalid and valid messages are received
1136+ final List <Message > testMessages = Arrays .asList (
1137+ message , // Poison message
1138+ getJmsContext ().createTextMessage ("{ \" i\" : 0 }" ) // Valid message
1139+ );
1140+ putAllMessagesToQueue (DEFAULT_SOURCE_QUEUE , testMessages );
1141+
1142+ final List <SourceRecord > processedRecords = connectTask .poll ();
1143+
1144+ assertThat (processedRecords ).hasSize (2 );
1145+
1146+ final SourceRecord dlqRecord = processedRecords .get (0 );
1147+ assertThat (dlqRecord .topic ()).isEqualTo ("__dlq.mq.source" );
1148+
1149+ final Headers headers = dlqRecord .headers ();
1150+
1151+ // Actual headers
1152+ assertThat (headers .lastWithName ("teststring" ).value ()).isEqualTo ("myvalue" );
1153+ assertThat (headers .lastWithName ("volume" ).value ()).isEqualTo ("11" );
1154+ assertThat (headers .lastWithName ("decimalmeaning" ).value ()).isEqualTo ("42.0" );
1155+
1156+ assertThat (headers .lastWithName ("__connect.errors.topic" )).isNull ();
1157+ assertThat (headers .lastWithName ("__connect.errors.class.name" )).isNull ();
1158+ assertThat (headers .lastWithName ("__connect.errors.exception.message" )).isNull ();
1159+ assertThat (headers .lastWithName ("__connect.errors.timestamp" )).isNull ();
1160+ assertThat (headers .lastWithName ("__connect.errors.cause.message" )).isNull ();
1161+ assertThat (headers .lastWithName ("__connect.errors.cause.class" )).isNull ();
1162+ assertThat (headers .lastWithName ("__connect.errors.exception.stacktrace" )).isNull ();
1163+ }
1164+
11121165 @ Test
11131166 public void verifyLoggingWarningWithErrorTolerance () throws Exception {
11141167 connectTask = getSourceTaskWithEmptyKafkaOffset ();
0 commit comments