Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ body:
description: What version of our software are you running?
options:
- latest
- 2.6.0 (Default)
- 2.7.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | |
| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.6.0</version>
<version>2.7.0</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder;
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
Expand Down Expand Up @@ -102,6 +103,7 @@ public static Map<String, String> getDefaultConnectorProperties() {
props.put("mq.channel.name", CHANNEL_NAME);
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName());
props.put("topic", "mytopic");
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import javax.jms.TextMessage;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectorConfig;
Expand Down Expand Up @@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
}
}

// verify that user can use the standard approach for the JsonConverter
// of embedding schemas in message payloads (enabling this using a
// record builder config option)
@Test
public void verifyJmsSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{\n" +
"\"schema\": {\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"test\", \n" +
" \"type\": \"string\"\n" +
" }" +
" ]\n" +
"}, " +
"\"payload\": { " +
" \"idx\": " + i + ", " +
" \"test\" : \"abcdef\" " +
"}" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Long.valueOf(i), value.getInt64("idx"));
assertEquals("abcdef", value.getString("test"));

connectTask.commitRecord(kafkaMessage, null);
}
}

// verify that a reusable schema can be provided to the JSON record builder
// as part of the connector config, so that this can be reused across
// multiple MQ messages
@Test
public void verifyJmsReusableSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final String SCHEMA = "{\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int32\"\n" +
" },\n" +
" {\n" +
" \"field\": \"a\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"b\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"c\", \n" +
" \"type\": \"double\"\n" +
" },\n" +
" {\n" +
" \"field\": \"d\", \n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" {\n" +
" \"field\": \"e\", \n" +
" \"type\": \"float\"\n" +
" },\n" +
" {\n" +
" \"field\": \"f\", \n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"g\", \n" +
" \"type\": \"array\", \n" +
" \"items\": {\n" +
" \"type\": \"int32\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"h\", \n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"innerstr\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"innernum\", \n" +
" \"type\": \"int64\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{ " +
"\"idx\": " + i + ", \n" +
"\"a\" : \"test\", \n" +
"\"b\" : 1234, \n" +
"\"c\" : 5.67, \n" +
"\"d\" : false, \n" +
"\"e\" : 12.34, \n" +
"\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
"\"g\" : [ 1, 2, 3 ], \n" +
"\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Integer.valueOf(i), value.getInt32("idx"));
assertEquals("test", value.getString("a"));
assertEquals(Long.valueOf(1234), value.getInt64("b"));
assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
assertEquals(false, value.getBoolean("d"));
assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
assertEquals("testing", value.getStruct("h").getString("innerstr"));
assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));

connectTask.commitRecord(kafkaMessage, null);
}
}

@Test
public void verifyMQMessage() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> {
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
});
Expand All @@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
}
Expand Down Expand Up @@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
final HashMap<String, String> config = new HashMap<String, String>();
final Map<String, String> config = getDefaultConnectorProperties();
config.put("errors.tolerance", "none");
config.put("mq.message.body.jms", "true");
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
Expand Down Expand Up @@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception
assertThat(sourceRecord).isNotNull();
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
assertNull(sourceRecord.valueSchema()); // JSON with no schema

// Verify JSON data
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) sourceRecord.value();
Expand Down
Loading