From 3a7b5560229377f7b24db6231c08840300220710 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Wed, 12 Nov 2025 21:31:04 +0530 Subject: [PATCH 1/3] feat: support non jms message body for JSONRecordBuilder Signed-off-by: Joel Hanson --- .../connect/mqsource/MQSourceTaskIT.java | 75 ++++++++++++++- .../connect/mqsource/MQSourceConnector.java | 19 ++-- .../mqsource/builders/JsonRecordBuilder.java | 91 ++++++++++++++++++- 3 files changed, 174 insertions(+), 11 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 50d2651..df4ca4f 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -411,6 +411,35 @@ public void verifyCorrelationIdBytesAsKey() throws Exception { connectTask.commitRecord(kafkaMessage, null); } + + @Test + public void verifyCorrelationIdBytesAsKey_WithJMSDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "false"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationIDAsBytes"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes"); + message.setJMSCorrelationID("verifycorrelbytes"); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List kafkaMessages = connectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertArrayEquals("verifycorrelbytes".getBytes(), (byte[]) kafkaMessage.key()); + assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.keySchema()); + assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema()); + + assertThat(new String((byte[]) kafkaMessage.value(), StandardCharsets.UTF_8).endsWith("testmessagewithcorrelbytes")).isTrue(); + connectTask.commitRecord(kafkaMessage, null); + } + @Test public void verifyDestinationAsKey() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); @@ -1727,4 +1756,48 @@ public void shouldHandleMultipleJmsPropertiesWithDifferentTypesJsonBuilder() thr assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95"); assertThat(headers.lastWithName("enabled").value()).isEqualTo("true"); } -} + + @Test + public void shouldSetJmsPropertiesWithJsonRecordBuilderWhenJMSIsDisabled_InvalidJSON() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext() + .createTextMessage("{ \"id\": 123, \"name\": \"test\", \"active\": true }"); + message.setStringProperty("source", "system-a"); + message.setIntProperty("version", 2); + message.setLongProperty("timestamp", 1234567890L); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + SourceRecord record = processedRecords.get(0); + assertThat(processedRecords).hasSize(1); + assertThat(record).isNotNull(); + assertThat(record.topic()).isEqualTo("mytopic"); + assertThat(record.value()).isInstanceOf(Map.class); + assertNull(record.valueSchema()); // JSON with no schema + + // Verify JSON data + @SuppressWarnings("unchecked") + Map value = (Map) record.value(); + assertEquals(123L, value.get("id")); + assertEquals("test", value.get("name")); + assertEquals(true, value.get("active")); + + final Headers headers = record.headers(); + + // Verify JMS properties are copied even when JMS body processing is disabled + assertThat(headers.lastWithName("source").value()).isEqualTo("system-a"); + assertThat(headers.lastWithName("version").value()).isEqualTo("2"); + assertThat(headers.lastWithName("timestamp").value()).isEqualTo("1234567890"); + } +} \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 0337954..feeaa7c 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -17,7 +17,8 @@ import java.io.File; import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -244,7 +245,7 @@ public class MQSourceConnector extends SourceConnector { * @param props configuration settings */ @Override public void start(final Map props) { - log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); + log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().threadId(), this.getClass().getName(), props); configProps = props; for (final Entry entry : props.entrySet()) { @@ -257,7 +258,7 @@ public class MQSourceConnector extends SourceConnector { log.debug("Connector props entry {} : {}", entry.getKey(), value); } - log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), this.getClass().getName()); + log.trace("[{}] Exit {}.start", Thread.currentThread().threadId(), this.getClass().getName()); } /** @@ -277,7 +278,7 @@ public Class taskClass() { */ @Override public List> taskConfigs(final int maxTasks) { - log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(), + log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().threadId(), this.getClass().getName(), maxTasks); final List> taskConfigs = new ArrayList<>(); @@ -285,7 +286,7 @@ public List> taskConfigs(final int maxTasks) { taskConfigs.add(configProps); } - log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(), + log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().threadId(), this.getClass().getName(), taskConfigs); return taskConfigs; } @@ -295,8 +296,8 @@ public List> taskConfigs(final int maxTasks) { */ @Override public void stop() { - log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName()); - log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName()); + log.trace("[{}] Entry {}.stop", Thread.currentThread().threadId(), this.getClass().getName()); + log.trace("[{}] Exit {}.stop", Thread.currentThread().threadId(), this.getClass().getName()); } /** @@ -709,8 +710,8 @@ public void ensureValid(final String name, final Object value) { } try { - new URL(strValue); - } catch (final MalformedURLException exc) { + new URI(strValue).toURL(); + } catch (final MalformedURLException | URISyntaxException exc) { throw new ConfigException(name, value, "Value must be a valid URL"); } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index cf25d03..a54cf8b 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -17,6 +17,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.HashMap; import javax.jms.BytesMessage; import javax.jms.JMSContext; @@ -32,12 +34,21 @@ /** * Builds Kafka Connect SourceRecords from messages. It parses the bytes of the payload of JMS * BytesMessage and TextMessage as JSON and creates a SourceRecord with a null schema. + * + * When messageBodyJms is false, this builder can handle MQ messages with RFH2 headers by + * automatically detecting and skipping them to extract the JSON payload. */ public class JsonRecordBuilder extends BaseRecordBuilder { private static final Logger log = LoggerFactory.getLogger(JsonRecordBuilder.class); private JsonConverter converter; + // RFH2 header constants + private static final String RFH2_STRUC_ID = "RFH "; + private static final int RFH2_STRUCT_ID_LENGTH = 4; + private static final int RFH2_STRUC_LENGTH_OFFSET = 8; + private static final int RFH2_MIN_HEADER_SIZE = 36; + public JsonRecordBuilder() { log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); converter = new JsonConverter(); @@ -65,7 +76,7 @@ public JsonRecordBuilder() { @Override public SchemaAndValue getValue(final JMSContext context, final String topic, final boolean messageBodyJms, final Message message) throws JMSException { - final byte[] payload; + byte[] payload; if (message instanceof BytesMessage) { payload = message.getBody(byte[].class); @@ -77,6 +88,84 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin throw new RecordBuilderException("Unsupported JMS message type"); } + // When messageBodyJms is false, the message may contain RFH2 headers that need to be skipped + if (!messageBodyJms) { + payload = stripRFH2Header(payload); + } return converter.toConnectData(topic, payload); } + + /** + * Skips RFH2 (Rules and Formatting Header version 2) if present in the payload. + * RFH2 headers are used by IBM MQ to carry additional message properties and metadata. + * + * When messageBodyJms is false (WMQ_MESSAGE_BODY_MQ), JMS does not automatically + * strip RFH2 headers, so we need to parse and skip them manually. + * + * RFH2 structure: + * - StrucId (4 bytes): "RFH " (with trailing space) + * - Version (4 bytes): Version number (typically 2) + * - StrucLength (4 bytes): Total length of RFH2 header including all folders + * - Encoding (4 bytes): Numeric encoding + * - CodedCharSetId (4 bytes): Character set identifier + * - Format (8 bytes): Format name of data following the header + * - Flags (4 bytes): Flags + * - NameValueCCSID (4 bytes): CCSID of name-value data + * - Variable length folders containing name-value pairs + * + * Inspired from https://github.com/CommunityHiQ/Frends.Community.IBMMQ/blob/master/Frends.Community.IBMMQ/Helpers/IBMMQHelpers.cs + * + * @param payload the original message payload + * @return the payload with RFH2 header removed if present, otherwise the original payload + */ + private byte[] stripRFH2Header(final byte[] payload) { + if (payload == null || payload.length < RFH2_MIN_HEADER_SIZE) { + return payload; + } + + // Check if the message starts with RFH2 structure ID + final String strucId = new String(payload, 0, RFH2_STRUCT_ID_LENGTH, UTF_8); + if (!RFH2_STRUC_ID.equals(strucId)) { + log.debug("No RFH2 header detected"); + return payload; + } + + try { + // Read version to detect endianness + final ByteBuffer buffer = ByteBuffer.wrap(payload); + buffer.order(ByteOrder.LITTLE_ENDIAN); // Default to little-endian + buffer.position(RFH2_STRUCT_ID_LENGTH); // Skip StrucId (4 bytes) + int version = buffer.getInt(); + + // Detect endianness: if version is not 1 or 2, it's likely big-endian + if (version > 2 || version < 1) { + version = Integer.reverseBytes(version); + buffer.order(ByteOrder.BIG_ENDIAN); + log.debug("Detected big-endian RFH2 header"); + } else { + log.debug("Detected little-endian RFH2 header"); + } + + // Read the RFH2 structure length, Skip StrucId (4 bytes) and Version (4 bytes) + buffer.position(RFH2_STRUC_LENGTH_OFFSET); + final int strucLength = buffer.getInt(); + + if (strucLength < RFH2_MIN_HEADER_SIZE || strucLength > payload.length) { + log.warn("Invalid RFH2 structure length: {}. Treating entire payload as message.", strucLength); + return payload; + } + + log.debug("RFH2 header detected (version: {}, length: {} bytes). Stripping header.", version, strucLength); + + // Extract the actual message payload after the RFH2 header + final byte[] actualPayload = new byte[payload.length - strucLength]; + System.arraycopy(payload, strucLength, actualPayload, 0, actualPayload.length); + + return actualPayload; + + } catch (final Exception e) { + log.error("Error parsing RFH2 header: {}. Returning original payload.", e.getMessage()); + return payload; + } + } } \ No newline at end of file From 9f3d8fd708fbcc5c09daf68fc35477f79325d22f Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Thu, 13 Nov 2025 15:14:36 +0530 Subject: [PATCH 2/3] feat: handle rfh header and test dql with jms body disabled Signed-off-by: Joel Hanson --- .../connect/mqsource/MQSourceDLQIT.java | 922 ++++++++++++++++++ .../connect/mqsource/MQSourceTaskIT.java | 582 +---------- .../connect/mqsource/MQSourceConnector.java | 12 +- .../mqsource/builders/JsonRecordBuilder.java | 4 +- .../connect/mqsource/util/ErrorHandler.java | 3 + 5 files changed, 935 insertions(+), 588 deletions(-) create mode 100644 src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java new file mode 100644 index 0000000..6d68324 --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java @@ -0,0 +1,922 @@ +/** + * Copyright 2022, 2023, 2024 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource; + +import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskObjectMother.getSourceTaskWithEmptyKafkaOffset; +import static com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil.putAllMessagesToQueue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil; +import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper; + +public class MQSourceDLQIT extends AbstractJMSContextIT { + + private MQSourceTask connectTask = null; + + @After + public void after() throws InterruptedException { + final SourceTaskStopper stopper = new SourceTaskStopper(connectTask); + stopper.run(); + } + + @Before + public void before() throws JMSException { + MQTestUtil.removeAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); + MQTestUtil.removeAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + } + + private Map createDefaultConnectorProperties() { + final Map props = new HashMap<>(); + props.put("mq.queue.manager", QMGR_NAME); + props.put("mq.connection.mode", "client"); + props.put("mq.connection.name.list", DEFAULT_CONNECTION_NAME); + props.put("mq.channel.name", CHANNEL_NAME); + props.put("mq.queue", DEFAULT_SOURCE_QUEUE); + props.put("mq.user.authentication.mqcsp", "false"); + props.put("topic", "mytopic"); + props.put("mq.message.receive.timeout", "5000"); + props.put("mq.receive.subsequent.timeout.ms", "2000"); + props.put("mq.reconnect.delay.min.ms", "100"); + props.put("mq.reconnect.delay.max.ms", "10000"); + return props; + } + + @Test + public void verifyErrorToleranceMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // All messages are processed, with poison message routed to DLQ + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + assertThat(processedRecords.stream().filter(record -> record != null)).hasSize(3); + + final List nonNullProcesssedRecord = processedRecords.stream().filter(record -> record != null) + .collect(Collectors.toList()); + + for (int i = 0; i < 3; i++) { + final SourceRecord validRecord = nonNullProcesssedRecord.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i)); + + connectTask.commitRecord(validRecord, null); + } + } + + @Test + public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + + // Verify poison message goes to DLQ + final SourceRecord poisonRecord = processedRecords.get(0); + assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + assertThat(poisonRecord.value()).isEqualTo("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + + // Verify valid messages are processed correctly + for (int i = 1; i < 4; i++) { + final SourceRecord validRecord = processedRecords.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); + + connectTask.commitRecord(validRecord, null); + } + } + + + @Test + public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll_ByteMessage() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Both invalid and valid messages are received + final List testMessages = new ArrayList<>(); + + final BytesMessage message1 = getJmsContext().createBytesMessage(); + message1.writeBytes("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message1); + + final BytesMessage message2 = getJmsContext().createBytesMessage(); + message2.writeBytes("{ \"i\": 0 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message2); + + final BytesMessage message3 = getJmsContext().createBytesMessage(); + message3.writeBytes("{ \"i\": 1 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message3); + + final BytesMessage message4 = getJmsContext().createBytesMessage(); + message4.writeBytes("{ \"i\": 2 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message4); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + + // Verify poison message goes to DLQ + final SourceRecord poisonRecord = processedRecords.get(0); + assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + assertThat(poisonRecord.value()).isEqualTo("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + + // Verify valid messages are processed correctly + for (int i = 1; i < 4; i++) { + final SourceRecord validRecord = processedRecords.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); + + connectTask.commitRecord(validRecord, null); + } + } + + + @Test + public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll_JMSDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + + // Verify poison message goes to DLQ + final SourceRecord poisonRecord = processedRecords.get(0); + assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + final String dlqValue = new String((byte[]) poisonRecord.value(), StandardCharsets.UTF_8); + assertThat(dlqValue.endsWith("Invalid JSON message")).isTrue(); + + // Verify valid messages are processed correctly + for (int i = 1; i < 4; i++) { + final SourceRecord validRecord = processedRecords.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); + + connectTask.commitRecord(validRecord, null); + } + } + + + @Test + public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll_ByteMessage_JMSDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Both invalid and valid messages are received + final List testMessages = new ArrayList<>(); + + final BytesMessage message1 = getJmsContext().createBytesMessage(); + message1.writeBytes("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message1); + + final BytesMessage message2 = getJmsContext().createBytesMessage(); + message2.writeBytes("{ \"i\": 0 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message2); + + final BytesMessage message3 = getJmsContext().createBytesMessage(); + message3.writeBytes("{ \"i\": 1 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message3); + + final BytesMessage message4 = getJmsContext().createBytesMessage(); + message4.writeBytes("{ \"i\": 2 }".getBytes(StandardCharsets.UTF_8)); + testMessages.add(message4); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + + // Verify poison message goes to DLQ + final SourceRecord poisonRecord = processedRecords.get(0); + assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + final String dlqValue = new String((byte[]) poisonRecord.value(), StandardCharsets.UTF_8); + assertThat(dlqValue.endsWith("Invalid JSON message")); + + // Verify valid messages are processed correctly + for (int i = 1; i < 4; i++) { + final SourceRecord validRecord = processedRecords.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); + + connectTask.commitRecord(validRecord, null); + } + } + + @Test + public void shouldFailWhenErrorToleranceIsNone() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "none"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // Processing should fail on the poison message + assertThatThrownBy(() -> connectTask.poll()) + .isInstanceOfAny(ConnectException.class, RuntimeException.class) + .hasMessageContaining("Converting byte[] to Kafka Connect data failed due to serialization error:"); + } + + @Test + public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // An invalid message is received + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setJMSMessageID("message_id"); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, + Collections.singletonList(message)); + + // The message should be routed to DLQ with error headers + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + // Verify error headers are present + final Headers headers = dlqRecord.headers(); + assertThat(headers.lastWithName("__connect.errors.topic").value()) + .isEqualTo("mytopic"); + assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) + .isEqualTo("org.apache.kafka.connect.errors.DataException"); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() + .isEmpty()).isFalse(); + assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) + .contains( + "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); + assertThat(headers.lastWithName("__connect.errors.cause.class").value()) + .isEqualTo("org.apache.kafka.common.errors.SerializationException"); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() + .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); + assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID()); + assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp()); + assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE); + connectTask.commitRecord(dlqRecord, null); + } + + @Test + public void shouldHandleDifferentMessageTypesToDlq() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When different types of invalid messages are received + final List testMessages = new ArrayList<>(); + // Text message with invalid JSON + testMessages.add(getJmsContext().createTextMessage("Invalid JSON message")); + // BytesMessage with invalid content + final BytesMessage bytesMsg = getJmsContext().createBytesMessage(); + bytesMsg.writeBytes("Invalid binary data".getBytes(StandardCharsets.UTF_8)); + testMessages.add(bytesMsg); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(2); + + for (final SourceRecord dlqRecord : processedRecords) { + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + connectTask.commitRecord(dlqRecord, null); + } + } + + @Test + public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage invalidMessage = getJmsContext().createTextMessage("Invalid JSON message"); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Collections.singletonList(invalidMessage)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + + connectTask.commitRecord(dlqRecord, null); + } + + @Test + public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Mix of valid and invalid messages received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Invalid JSON + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid JSON + getJmsContext().createTextMessage("{ malformed json"), // Malformed JSON + getJmsContext().createTextMessage("{ \"i\": 1, \"text\": \"valid\" }"), // Valid JSON + getJmsContext().createTextMessage("{}") // Valid but empty JSON + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(5); + + int validCount = 0; + int dlqCount = 0; + + for (final SourceRecord record : processedRecords) { + if (record.topic().equals("__dlq.mq.source")) { + dlqCount++; + assertThat(record.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + } else { + validCount++; + assertThat(record.topic()).isEqualTo("mytopic"); + } + connectTask.commitRecord(record, null); + } + + assertThat(validCount).isEqualTo(3); + assertThat(dlqCount).isEqualTo(2); + } + + @Test + public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Multiple messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("First invalid message"), // Invalid message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("Second invalid message"), // Invalid message + getJmsContext().createTextMessage("{ \"i\": 1 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // Then: Processing should continue despite DLQ failure + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyHeadersWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message, // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = processedRecords.get(0).headers(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + // Expected DLQ Headers + /** + * ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, value=mytopic, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.class.name, value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] to Kafka Connect data failed due to serialization error: , schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.timestamp, value=1749036171558, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.cause.message, value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') + * at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 9], schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.cause.class, value=org.apache.kafka.common.errors.SerializationException, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.exception.stacktrace, value=org.apache.kafka.connect.errors.DataException: + * Converting byte[] to Kafka Connect data failed due to serialization error: + * at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) + * at com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) + * at com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) + * at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... [truncated], schema=Schema{STRING})]) + */ + assertThat(headers.lastWithName("__connect.errors.topic").value()) + .isEqualTo("mytopic"); + assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) + .isEqualTo("org.apache.kafka.connect.errors.DataException"); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() + .isEmpty()).isFalse(); + assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) + .contains( + "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); + assertThat(headers.lastWithName("__connect.errors.cause.class").value()) + .isEqualTo("org.apache.kafka.common.errors.SerializationException"); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() + .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); + } + + @Test + public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message, // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + assertThat(headers.lastWithName("__connect.errors.topic")).isNull(); + assertThat(headers.lastWithName("__connect.errors.class.name")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull(); + } + + @Test + public void verifyLoggingWarningWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors + // default; Do not log errors with message + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithMessage() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createObjectMessage("Invalid message"), // Poison message + getJmsContext().createTextMessage("Text") // Valid + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled_JMSBodyDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message, // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + final String dlqValue = new String((byte[]) dlqRecord.value(), StandardCharsets.UTF_8); + assertThat(dlqValue.endsWith("Invalid JSON message")).isTrue(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + assertThat(headers.lastWithName("__connect.errors.topic")).isNull(); + assertThat(headers.lastWithName("__connect.errors.class.name")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull(); + } + + @Test + public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled_JMSBodyDisabled_ByteMessage() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final BytesMessage message1 = getJmsContext().createBytesMessage(); + final BytesMessage message2 = getJmsContext().createBytesMessage(); + message1.writeBytes("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + message2.writeBytes("{ \"i\": 0 }".getBytes(StandardCharsets.UTF_8)); + + message1.setStringProperty("teststring", "myvalue"); + message1.setIntProperty("volume", 11); + message1.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message1, // Poison message + message2 // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + final String dlqValue = new String((byte[]) dlqRecord.value(), StandardCharsets.UTF_8); + assertThat(dlqValue.endsWith("Invalid JSON message")).isTrue(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + assertThat(headers.lastWithName("__connect.errors.topic")).isNull(); + assertThat(headers.lastWithName("__connect.errors.class.name")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull(); + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index df4ca4f..d55bf15 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -22,7 +22,6 @@ import static com.ibm.eventstreams.connect.mqsource.utils.MessagesObjectMother.createAListOfMessages; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -38,21 +37,15 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - -import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Headers; -import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; @@ -405,13 +398,13 @@ public void verifyCorrelationIdBytesAsKey() throws Exception { final SourceRecord kafkaMessage = kafkaMessages.get(0); assertArrayEquals("verifycorrelbytes".getBytes(), (byte[]) kafkaMessage.key()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.keySchema()); + assertThat(kafkaMessage.valueSchema()).isNull(); assertEquals("testmessagewithcorrelbytes", kafkaMessage.value()); connectTask.commitRecord(kafkaMessage, null); } - @Test public void verifyCorrelationIdBytesAsKey_WithJMSDisabled() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); @@ -743,579 +736,6 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t assertEquals(10000L, shared.getReconnectDelayMillisMax()); } - @Test - public void verifyErrorToleranceMessages() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - // All messages are processed, with poison message routed to DLQ - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(4); - assertThat(processedRecords.stream().filter(record -> record != null)).hasSize(3); - - final List nonNullProcesssedRecord = processedRecords.stream().filter(record -> record != null) - .collect(Collectors.toList()); - - for (int i = 0; i < 3; i++) { - final SourceRecord validRecord = nonNullProcesssedRecord.get(i); - assertThat(validRecord.topic()).isEqualTo("mytopic"); - assertThat(validRecord.valueSchema()).isNull(); - - final Map value = (Map) validRecord.value(); - assertThat(value.get("i")).isEqualTo(Long.valueOf(i)); - - connectTask.commitRecord(validRecord, null); - } - } - - @Test - public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // When: Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(4); - - // Verify poison message goes to DLQ - final SourceRecord poisonRecord = processedRecords.get(0); - assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); - assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); - assertThat(poisonRecord.value()).isEqualTo("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); - - // Verify valid messages are processed correctly - for (int i = 1; i < 4; i++) { - final SourceRecord validRecord = processedRecords.get(i); - assertThat(validRecord.topic()).isEqualTo("mytopic"); - assertThat(validRecord.valueSchema()).isNull(); - - final Map value = (Map) validRecord.value(); - assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); - - connectTask.commitRecord(validRecord, null); - } - } - - @Test - public void shouldFailWhenErrorToleranceIsNone() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "none"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - // Processing should fail on the poison message - assertThatThrownBy(() -> connectTask.poll()) - .isInstanceOfAny(ConnectException.class, RuntimeException.class) - .hasMessageContaining("Converting byte[] to Kafka Connect data failed due to serialization error:"); - } - - @Test - public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // An invalid message is received - final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); - message.setJMSMessageID("message_id"); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, - Collections.singletonList(message)); - - // The message should be routed to DLQ with error headers - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(1); - - final SourceRecord dlqRecord = processedRecords.get(0); - assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); - - // Verify error headers are present - final Headers headers = dlqRecord.headers(); - assertThat(headers.lastWithName("__connect.errors.topic").value()) - .isEqualTo("mytopic"); - assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) - .isEqualTo("org.apache.kafka.connect.errors.DataException"); - assertThat(headers.lastWithName("__connect.errors.exception.message").value()) - .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); - assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() - .isEmpty()).isFalse(); - assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) - .contains( - "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); - assertThat(headers.lastWithName("__connect.errors.cause.class").value()) - .isEqualTo("org.apache.kafka.common.errors.SerializationException"); - assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() - .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); - assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID()); - assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp()); - assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE); - connectTask.commitRecord(dlqRecord, null); - } - - @Test - public void shouldHandleDifferentMessageTypesToDlq() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // When different types of invalid messages are received - final List testMessages = new ArrayList<>(); - // Text message with invalid JSON - testMessages.add(getJmsContext().createTextMessage("Invalid JSON message")); - // BytesMessage with invalid content - final BytesMessage bytesMsg = getJmsContext().createBytesMessage(); - bytesMsg.writeBytes("Invalid binary data".getBytes(StandardCharsets.UTF_8)); - testMessages.add(bytesMsg); - - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(2); - - for (final SourceRecord dlqRecord : processedRecords) { - assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); - assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); - connectTask.commitRecord(dlqRecord, null); - } - } - - @Test - public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - final TextMessage invalidMessage = getJmsContext().createTextMessage("Invalid JSON message"); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Collections.singletonList(invalidMessage)); - - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(1); - - final SourceRecord dlqRecord = processedRecords.get(0); - assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); - - final Headers headers = dlqRecord.headers(); - assertThat(headers.lastWithName("__connect.errors.exception.message").value()) - .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); - - connectTask.commitRecord(dlqRecord, null); - } - - @Test - public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // When: Mix of valid and invalid messages received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Invalid JSON - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid JSON - getJmsContext().createTextMessage("{ malformed json"), // Malformed JSON - getJmsContext().createTextMessage("{ \"i\": 1, \"text\": \"valid\" }"), // Valid JSON - getJmsContext().createTextMessage("{}") // Valid but empty JSON - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - assertThat(processedRecords).hasSize(5); - - int validCount = 0; - int dlqCount = 0; - - for (final SourceRecord record : processedRecords) { - if (record.topic().equals("__dlq.mq.source")) { - dlqCount++; - assertThat(record.valueSchema().type()).isEqualTo(Schema.Type.BYTES); - } else { - validCount++; - assertThat(record.topic()).isEqualTo("mytopic"); - } - connectTask.commitRecord(record, null); - } - - assertThat(validCount).isEqualTo(3); - assertThat(dlqCount).isEqualTo(2); - } - - @Test - public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // When: Multiple messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("First invalid message"), // Invalid message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("Second invalid message"), // Invalid message - getJmsContext().createTextMessage("{ \"i\": 1 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - // Then: Processing should continue despite DLQ failure - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(4); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); - } - - @Test - public void verifyHeadersWithErrorTolerance() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); - - connectTask.start(connectorConfigProps); - - final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); - message.setStringProperty("teststring", "myvalue"); - message.setIntProperty("volume", 11); - message.setDoubleProperty("decimalmeaning", 42.0); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - message, // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(2); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - - final Headers headers = processedRecords.get(0).headers(); - - // Actual headers - assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); - assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); - assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); - - // Expected DLQ Headers - /** - * ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, value=mytopic, schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.class.name, value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] to Kafka Connect data failed due to serialization error: , schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.timestamp, value=1749036171558, schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.cause.message, value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') - * at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 9], schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.cause.class, value=org.apache.kafka.common.errors.SerializationException, schema=Schema{STRING}), - * ConnectHeader(key=__connect.errors.exception.stacktrace, value=org.apache.kafka.connect.errors.DataException: - * Converting byte[] to Kafka Connect data failed due to serialization error: - * at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) - * at com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) - * at com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) - * at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... [truncated], schema=Schema{STRING})]) - */ - assertThat(headers.lastWithName("__connect.errors.topic").value()) - .isEqualTo("mytopic"); - assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) - .isEqualTo("org.apache.kafka.connect.errors.DataException"); - assertThat(headers.lastWithName("__connect.errors.exception.message").value()) - .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); - assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() - .isEmpty()).isFalse(); - assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) - .contains( - "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); - assertThat(headers.lastWithName("__connect.errors.cause.class").value()) - .isEqualTo("org.apache.kafka.common.errors.SerializationException"); - assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() - .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); - } - - @Test - public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); - - connectTask.start(connectorConfigProps); - - final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); - message.setStringProperty("teststring", "myvalue"); - message.setIntProperty("volume", 11); - message.setDoubleProperty("decimalmeaning", 42.0); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - message, // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(2); - - final SourceRecord dlqRecord = processedRecords.get(0); - assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); - - final Headers headers = dlqRecord.headers(); - - // Actual headers - assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); - assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); - assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); - - assertThat(headers.lastWithName("__connect.errors.topic")).isNull(); - assertThat(headers.lastWithName("__connect.errors.class.name")).isNull(); - assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull(); - assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull(); - assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull(); - assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull(); - assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull(); - } - - @Test - public void verifyLoggingWarningWithErrorTolerance() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors - // default; Do not log errors with message - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(4); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); - } - - @Test - public void verifyLoggingErrorsWithErrorTolerance() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(4); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); - } - - @Test - public void verifyLoggingErrorsWithMessage() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createTextMessage("Invalid JSON message"), // Poison message - getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message - getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(4); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); - assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); - } - - @Test - public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception { - connectTask = getSourceTaskWithEmptyKafkaOffset(); - - final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); - connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors - connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message - connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, - "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); - connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); - connectTask.start(connectorConfigProps); - - // Both invalid and valid messages are received - final List testMessages = Arrays.asList( - getJmsContext().createObjectMessage("Invalid message"), // Poison message - getJmsContext().createTextMessage("Text") // Valid - ); - putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); - - final List processedRecords = connectTask.poll(); - - assertThat(processedRecords).hasSize(2); - - assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); - assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); - } - @Test public void verifyJmsMessageWithNullHeaders() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index feeaa7c..49e112c 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -245,7 +245,7 @@ public class MQSourceConnector extends SourceConnector { * @param props configuration settings */ @Override public void start(final Map props) { - log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().threadId(), this.getClass().getName(), props); + log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); configProps = props; for (final Entry entry : props.entrySet()) { @@ -258,7 +258,7 @@ public class MQSourceConnector extends SourceConnector { log.debug("Connector props entry {} : {}", entry.getKey(), value); } - log.trace("[{}] Exit {}.start", Thread.currentThread().threadId(), this.getClass().getName()); + log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), this.getClass().getName()); } /** @@ -278,7 +278,7 @@ public Class taskClass() { */ @Override public List> taskConfigs(final int maxTasks) { - log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().threadId(), this.getClass().getName(), + log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(), maxTasks); final List> taskConfigs = new ArrayList<>(); @@ -286,7 +286,7 @@ public List> taskConfigs(final int maxTasks) { taskConfigs.add(configProps); } - log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().threadId(), this.getClass().getName(), + log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(), taskConfigs); return taskConfigs; } @@ -296,8 +296,8 @@ public List> taskConfigs(final int maxTasks) { */ @Override public void stop() { - log.trace("[{}] Entry {}.stop", Thread.currentThread().threadId(), this.getClass().getName()); - log.trace("[{}] Exit {}.stop", Thread.currentThread().threadId(), this.getClass().getName()); + log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName()); + log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName()); } /** diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index a54cf8b..ff97e8e 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -46,7 +46,8 @@ public class JsonRecordBuilder extends BaseRecordBuilder { // RFH2 header constants private static final String RFH2_STRUC_ID = "RFH "; private static final int RFH2_STRUCT_ID_LENGTH = 4; - private static final int RFH2_STRUC_LENGTH_OFFSET = 8; + private static final int RFH2_STRUC_LENGTH = 4; + private static final int RFH2_STRUC_LENGTH_OFFSET = RFH2_STRUCT_ID_LENGTH + RFH2_STRUC_LENGTH; private static final int RFH2_MIN_HEADER_SIZE = 36; public JsonRecordBuilder() { @@ -114,6 +115,7 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin * - Variable length folders containing name-value pairs * * Inspired from https://github.com/CommunityHiQ/Frends.Community.IBMMQ/blob/master/Frends.Community.IBMMQ/Helpers/IBMMQHelpers.cs + * Header structure https://www.ibm.com/docs/en/integration-bus/10.0.0?topic=header-mqrfh2-structure * * @param payload the original message payload * @return the payload with RFH2 header removed if present, otherwise the original payload diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java index 49a096f..567a885 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java @@ -250,6 +250,9 @@ private Optional extractPayload(final Message message) { try { if (message instanceof BytesMessage) { log.debug("Extracting payload from BytesMessage for DLQ"); + // Without reset(), the second read would fail because the internal + // pointer is already at the end of the message from the first read attempt. + ((BytesMessage) message).reset(); // Reset read pointer return Optional.ofNullable(message.getBody(byte[].class)); } else if (message instanceof TextMessage) { log.debug("Extracting payload from TextMessage for DLQ"); From 1af46380c493b65785cb2395fc0bb94686b8619c Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Thu, 20 Nov 2025 14:02:57 +0530 Subject: [PATCH 3/3] chore: update copyright headers for the ITs Signed-off-by: Joel Hanson --- .../com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java | 2 +- .../com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java | 2 +- .../ibm/eventstreams/connect/mqsource/MQSourceConnector.java | 2 +- .../connect/mqsource/builders/JsonRecordBuilder.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java index 6d68324..4387551 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceDLQIT.java @@ -1,5 +1,5 @@ /** - * Copyright 2022, 2023, 2024 IBM Corporation + * Copyright 2022, 2023, 2024, 2025 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index d55bf15..532aa8b 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1,5 +1,5 @@ /** - * Copyright 2022, 2023, 2024 IBM Corporation + * Copyright 2022, 2023, 2024, 2025 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 49e112c..db1ba21 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -1,5 +1,5 @@ /** - * Copyright 2017, 2020, 2023, 2024 IBM Corporation + * Copyright 2017, 2020, 2023, 2024, 2025 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index ff97e8e..b91a303 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright 2017, 2018, 2019, 2023, 2024 IBM Corporation + * Copyright 2017, 2018, 2019, 2023, 2024, 2025 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.