Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ body:
description: What version of our software are you running?
options:
- latest
- 2.6.0 (Default)
- 2.7.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | |
| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.6.0</version>
<version>2.7.0</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder;
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
Expand Down Expand Up @@ -102,6 +103,7 @@ public static Map<String, String> getDefaultConnectorProperties() {
props.put("mq.channel.name", CHANNEL_NAME);
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName());
props.put("topic", "mytopic");
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import javax.jms.TextMessage;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectorConfig;
Expand Down Expand Up @@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
}
}

// verify that user can use the standard approach for the JsonConverter
// of embedding schemas in message payloads (enabling this using a
// record builder config option)
@Test
public void verifyJmsSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{\n" +
"\"schema\": {\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"test\", \n" +
" \"type\": \"string\"\n" +
" }" +
" ]\n" +
"}, " +
"\"payload\": { " +
" \"idx\": " + i + ", " +
" \"test\" : \"abcdef\" " +
"}" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Long.valueOf(i), value.getInt64("idx"));
assertEquals("abcdef", value.getString("test"));

connectTask.commitRecord(kafkaMessage, null);
}
}

// verify that a reusable schema can be provided to the JSON record builder
// as part of the connector config, so that this can be reused across
// multiple MQ messages
@Test
public void verifyJmsReusableSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final String SCHEMA = "{\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int32\"\n" +
" },\n" +
" {\n" +
" \"field\": \"a\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"b\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"c\", \n" +
" \"type\": \"double\"\n" +
" },\n" +
" {\n" +
" \"field\": \"d\", \n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" {\n" +
" \"field\": \"e\", \n" +
" \"type\": \"float\"\n" +
" },\n" +
" {\n" +
" \"field\": \"f\", \n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"g\", \n" +
" \"type\": \"array\", \n" +
" \"items\": {\n" +
" \"type\": \"int32\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"h\", \n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"innerstr\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"innernum\", \n" +
" \"type\": \"int64\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{ " +
"\"idx\": " + i + ", \n" +
"\"a\" : \"test\", \n" +
"\"b\" : 1234, \n" +
"\"c\" : 5.67, \n" +
"\"d\" : false, \n" +
"\"e\" : 12.34, \n" +
"\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
"\"g\" : [ 1, 2, 3 ], \n" +
"\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Integer.valueOf(i), value.getInt32("idx"));
assertEquals("test", value.getString("a"));
assertEquals(Long.valueOf(1234), value.getInt64("b"));
assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
assertEquals(false, value.getBoolean("d"));
assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
assertEquals("testing", value.getStruct("h").getString("innerstr"));
assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));

connectTask.commitRecord(kafkaMessage, null);
}
}

@Test
public void verifyMQMessage() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> {
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
});
Expand All @@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
}
Expand Down Expand Up @@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
final HashMap<String, String> config = new HashMap<String, String>();
final Map<String, String> config = getDefaultConnectorProperties();
config.put("errors.tolerance", "none");
config.put("mq.message.body.jms", "true");
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
Expand Down Expand Up @@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception
assertThat(sourceRecord).isNotNull();
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
assertNull(sourceRecord.valueSchema()); // JSON with no schema

// Verify JSON data
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) sourceRecord.value();
Expand Down
Loading