diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
index 1eccc3f..4588606 100644
--- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
+++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
@@ -58,7 +58,7 @@ body:
description: What version of our software are you running?
options:
- latest
- - 2.6.0 (Default)
+ - 2.7.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
diff --git a/README.md b/README.md
index 50a1e4f..976a606 100644
--- a/README.md
+++ b/README.md
@@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
-| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
+| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
+| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | |
+| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |
### Using a CCDT file
diff --git a/pom.xml b/pom.xml
index ca65181..6de42e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-source
jar
- 2.6.0
+ 2.7.0
kafka-connect-mq-source
IBM Corporation
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
index 316a47c..090987f 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
@@ -19,6 +19,7 @@
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
+import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder;
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
@@ -102,6 +103,7 @@ public static Map getDefaultConnectorProperties() {
props.put("mq.channel.name", CHANNEL_NAME);
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
+ props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName());
props.put("topic", "mytopic");
return props;
}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
index 50d2651..51c38ac 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
@@ -50,6 +50,7 @@
import javax.jms.TextMessage;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
}
}
+ // verify that user can use the standard approach for the JsonConverter
+ // of embedding schemas in message payloads (enabling this using a
+ // record builder config option)
+ @Test
+ public void verifyJmsSchemaMessages() throws Exception {
+ connectTask = getSourceTaskWithEmptyKafkaOffset();
+
+ final Map connectorConfigProps = createDefaultConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
+ connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
+
+ connectTask.start(connectorConfigProps);
+
+ final List messages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ messages.add(getJmsContext().createTextMessage(
+ "{\n" +
+ "\"schema\": {\n" +
+ " \"type\": \"struct\", \n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"field\": \"idx\", \n" +
+ " \"type\": \"int64\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"test\", \n" +
+ " \"type\": \"string\"\n" +
+ " }" +
+ " ]\n" +
+ "}, " +
+ "\"payload\": { " +
+ " \"idx\": " + i + ", " +
+ " \"test\" : \"abcdef\" " +
+ "}" +
+ "}"));
+ }
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
+
+ final List kafkaMessages = connectTask.poll();
+ assertEquals(5, kafkaMessages.size());
+
+ for (int i = 0; i < 5; i++) {
+ final SourceRecord kafkaMessage = kafkaMessages.get(i);
+ assertNull(kafkaMessage.key());
+
+ assertNotNull(kafkaMessage.valueSchema());
+ assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
+ assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());
+
+ final Struct value = (Struct) kafkaMessage.value();
+ assertEquals(Long.valueOf(i), value.getInt64("idx"));
+ assertEquals("abcdef", value.getString("test"));
+
+ connectTask.commitRecord(kafkaMessage, null);
+ }
+ }
+
+ // verify that a reusable schema can be provided to the JSON record builder
+ // as part of the connector config, so that this can be reused across
+ // multiple MQ messages
+ @Test
+ public void verifyJmsReusableSchemaMessages() throws Exception {
+ connectTask = getSourceTaskWithEmptyKafkaOffset();
+
+ final String SCHEMA = "{\n" +
+ " \"type\": \"struct\", \n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"field\": \"idx\", \n" +
+ " \"type\": \"int32\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"a\", \n" +
+ " \"type\": \"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"b\", \n" +
+ " \"type\": \"int64\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"c\", \n" +
+ " \"type\": \"double\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"d\", \n" +
+ " \"type\": \"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"e\", \n" +
+ " \"type\": \"float\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"f\", \n" +
+ " \"type\": \"array\",\n" +
+ " \"items\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"g\", \n" +
+ " \"type\": \"array\", \n" +
+ " \"items\": {\n" +
+ " \"type\": \"int32\"\n" +
+ " }\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"h\", \n" +
+ " \"type\": \"struct\", \n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"field\": \"innerstr\", \n" +
+ " \"type\": \"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"field\": \"innernum\", \n" +
+ " \"type\": \"int64\"\n" +
+ " }\n" +
+ " ]\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ final Map connectorConfigProps = createDefaultConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
+ connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
+ connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);
+
+ connectTask.start(connectorConfigProps);
+
+ final List messages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ messages.add(getJmsContext().createTextMessage(
+ "{ " +
+ "\"idx\": " + i + ", \n" +
+ "\"a\" : \"test\", \n" +
+ "\"b\" : 1234, \n" +
+ "\"c\" : 5.67, \n" +
+ "\"d\" : false, \n" +
+ "\"e\" : 12.34, \n" +
+ "\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
+ "\"g\" : [ 1, 2, 3 ], \n" +
+ "\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
+ "}"));
+ }
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
+
+ final List kafkaMessages = connectTask.poll();
+ assertEquals(5, kafkaMessages.size());
+
+ for (int i = 0; i < 5; i++) {
+ final SourceRecord kafkaMessage = kafkaMessages.get(i);
+ assertNull(kafkaMessage.key());
+
+ assertNotNull(kafkaMessage.valueSchema());
+ assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
+ assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
+ assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
+ assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
+ assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
+ assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
+ assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
+ assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
+ assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
+ assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());
+
+ final Struct value = (Struct) kafkaMessage.value();
+ assertEquals(Integer.valueOf(i), value.getInt32("idx"));
+ assertEquals("test", value.getString("a"));
+ assertEquals(Long.valueOf(1234), value.getInt64("b"));
+ assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
+ assertEquals(false, value.getBoolean("d"));
+ assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
+ assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
+ assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
+ assertEquals("testing", value.getStruct("h").getString("innerstr"));
+ assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));
+
+ connectTask.commitRecord(kafkaMessage, null);
+ }
+ }
+
@Test
public void verifyMQMessage() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java
index 93e33df..0fa7746 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java
@@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception {
// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
+ builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
// verify the Kafka record
@@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception {
// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
+ builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
// verify the Kafka record
@@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception {
// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
+ builder.configure(getDefaultConnectorProperties());
final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> {
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
});
@@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception {
// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
+ builder.configure(getDefaultConnectorProperties());
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
}
@@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception {
// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
- final HashMap config = new HashMap();
+ final Map config = getDefaultConnectorProperties();
config.put("errors.tolerance", "none");
config.put("mq.message.body.jms", "true");
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception
assertThat(sourceRecord).isNotNull();
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
assertNull(sourceRecord.valueSchema()); // JSON with no schema
-
+
// Verify JSON data
@SuppressWarnings("unchecked")
Map value = (Map) sourceRecord.value();
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
index 0337954..359e966 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
@@ -19,6 +19,7 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -33,12 +34,19 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.ConverterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+
public class MQSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class);
@@ -212,6 +220,15 @@ public class MQSourceConnector extends SourceConnector {
"keys, all error context header keys will start with __connect.errors.";
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
+ public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "mq.record.builder.json.schemas.enable";
+ public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Include schemas within the Kafka messages produced by the JSON record builder.";
+ public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Enable Schemas";
+
+ public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "mq.record.builder.json.schema.content";
+ public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "When set, this is used as the schema for all messages. This must be a Kafka Connect schema, as used by JsonConverter.";
+ public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "Schema Content";
+
+
// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -224,7 +241,7 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};
- public static String version = "2.6.0";
+ public static String version = "2.7.0";
private Map configProps;
@@ -666,6 +683,22 @@ null, new ReadableFile(),
32,
ConfigDef.Width.MEDIUM,
CONFIG_DISPLAY_MAX_POLL_TIME);
+ CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE,
+ Type.BOOLEAN,
+ false, new ConfigDef.NonNullValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE,
+ CONFIG_GROUP_MQ, 33,
+ Width.SHORT,
+ CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE);
+ CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT,
+ Type.STRING,
+ null, new SchemaValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT,
+ CONFIG_GROUP_MQ, 34,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT);
CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
@@ -716,6 +749,46 @@ public void ensureValid(final String name, final Object value) {
}
}
+ private static class SchemaValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String strValue = (String) value;
+ if (value == null || strValue.trim().isEmpty()) {
+ // only validate non-empty schemas
+ return;
+ }
+
+ // Start with a quick and simple "sniff test" on the provided schema
+ // by checking if it starts and ends in curly-parentheses
+ // This will quickly catch obvious configuration errors, such as
+ // providing a name, id, or file location for a schema
+ final String trimmedStr = strValue.trim();
+ if (!trimmedStr.startsWith("{") || !trimmedStr.endsWith("}")) {
+ throw new ConfigException(name, value, "Value should be a Kafka Connect schema");
+ }
+
+ // Create a temporary JsonDeserializer/JsonConverter to parse the
+ // provided schema.
+ // The aim for doing this is to catch any invalid schemas at
+ // startup time, rather than allow this to go unnoticed until
+ // the first MQ message is received (potentially a long time
+ // later).
+ try (
+ final JsonDeserializer deserializer = new JsonDeserializer();
+ final JsonConverter conv = new JsonConverter()
+ ) {
+ final Map converterConfig = new HashMap<>();
+ converterConfig.put(JsonConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+ conv.configure(converterConfig);
+
+ final JsonNode jsonStr = deserializer.deserialize(trimmedStr, trimmedStr.getBytes());
+ conv.asConnectSchema(jsonStr);
+ } catch (final DataException exc) {
+ throw new ConfigException(name, value, exc.getMessage());
+ }
+ }
+ }
+
/**
* Signals that this connector is not capable of defining other transaction boundaries.
* A new transaction will be started and committed for every batch of records returned by {@link MQSourceTask#poll()}.
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
index cf25d03..f3ae82b 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
@@ -18,17 +18,23 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.HashMap;
+import java.util.Map;
+
import javax.jms.BytesMessage;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
+
/**
* Builds Kafka Connect SourceRecords from messages. It parses the bytes of the payload of JMS
* BytesMessage and TextMessage as JSON and creates a SourceRecord with a null schema.
@@ -38,13 +44,53 @@ public class JsonRecordBuilder extends BaseRecordBuilder {
private JsonConverter converter;
+ // From Kafka Connect 4.2 onwards, JsonConverter includes schema support
+ // To support earlier versions of the dependency, the record builder includes a
+ // workaround implementation.
+ // This variable should be true where the workaround implementation is required.
+ private boolean recordBuilderSchemaSupport = false;
+
+ // Workaround for supporting schemas is to embed the schema in the message payload
+ // given to the JsonConverter. This variable contains a String to concatenate with
+ // the string received from MQ in order to achieve this.
+ private String schemaSupportEnvelope = null;
+
public JsonRecordBuilder() {
log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
converter = new JsonConverter();
+ }
+
+ /**
+ * Configure this class. In addition to the MQ message handling config
+ * used by BaseRecordBuilder, this also configures the JsonConverter
+ * used by this record builder to parse JSON messages from MQ.
+ */
+ @Override
+ public void configure(final Map props) {
+ super.configure(props);
+
+ final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
+ final boolean schemasEnable = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE);
+ String schemaContent = null;
+ if (schemasEnable) {
+ schemaContent = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT);
+ if (schemaContent != null) {
+ schemaContent = schemaContent.trim();
+ }
+ }
+
+ if (schemasEnable && schemaContent != null &&
+ !JsonConverterConfig.configDef().names().contains("schema.content")) {
- // We just want the payload, not the schema in the output message
- final HashMap m = new HashMap<>();
- m.put("schemas.enable", "false");
+ // support for schemas provided separately from message payloads is requested
+ // but not available natively within the JsonConverter present in the classpath
+ recordBuilderSchemaSupport = true;
+ schemaSupportEnvelope = "{\"schema\": " + schemaContent + ", \"payload\": ";
+ }
+
+ final Map m = new HashMap<>();
+ m.put("schemas.enable", Boolean.toString(schemasEnable));
+ m.put("schema.content", schemaContent);
// Convert the value, not the key (isKey == false)
converter.configure(m, false);
@@ -77,6 +123,14 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin
throw new RecordBuilderException("Unsupported JMS message type");
}
- return converter.toConnectData(topic, payload);
+ if (recordBuilderSchemaSupport) {
+ return converter.toConnectData(topic,
+ // embed schema in the event payload
+ (schemaSupportEnvelope + new String(payload) + "}").getBytes(UTF_8));
+ } else {
+ return converter.toConnectData(topic,
+ // submit the payload as-is to the converter
+ payload);
+ }
}
}
\ No newline at end of file
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java
index a7a10a8..e9ee351 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java
@@ -32,7 +32,7 @@ public static RecordBuilder getRecordBuilder(final Map props) {
);
}
- protected static RecordBuilder getRecordBuilder(final String builderClass, final Map props) {
+ private static RecordBuilder getRecordBuilder(final String builderClass, final Map props) {
final RecordBuilder builder;
@@ -40,7 +40,7 @@ protected static RecordBuilder getRecordBuilder(final String builderClass, final
final Class extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
builder = c.newInstance();
builder.configure(props);
- } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) {
+ } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException exc) {
log.error("Could not instantiate message builder {}", builderClass);
throw new RecordBuilderException("Could not instantiate message builder", exc);
}
diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
index 654465f..c792863 100644
--- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
+++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
@@ -186,4 +186,105 @@ public void testValidateRetryDelayConfigWithDefaultValues() {
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
}
+
+ // verify that valid JSON schema config will be accepted
+ @Test
+ public void testValidJsonSchemaConfig() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.queue.manager", "placeholder");
+ configProps.put("mq.queue", "placeholder");
+ configProps.put("topic", "placeholder");
+ configProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
+ configProps.put("mq.record.builder.json.schemas.enable", "true");
+ configProps.put("mq.record.builder.json.schema.content", "{\n" +
+ " \"type\": \"struct\", \n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"field\": \"test\", \n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+ assertTrue(config.configValues().stream().allMatch(cv -> cv.errorMessages().size() == 0));
+ }
+
+ // verify that providing a schema that isn't JSON will be rejected
+ @Test
+ public void testValidateJsonSchemaConfig() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.record.builder.json.schemas.enable", "true");
+ configProps.put("mq.record.builder.json.schema.content", "Hello world");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("should be a Kafka Connect schema")));
+ }
+
+ // verify that providing JSON (such as JSON schema) that isn't a
+ // Kafka Connect JSON Converter schema will be rejected
+ @Test
+ public void testValidateJsonConnectSchemaConfig() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.record.builder.json.schemas.enable", "true");
+ configProps.put("mq.record.builder.json.schema.content", "{\n" +
+ " \"$id\": \"https:example.com/person.schema.json\",\n" +
+ " \"$schema\": \"https:json-schema.org/draft/2020-12/schema\",\n" +
+ " \"title\": \"Person\",\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"firstName\": {\n" +
+ " \"type\": \"string\",\n" +
+ " \"description\": \"The person's first name.\"\n" +
+ " },\n" +
+ " \"lastName\": {\n" +
+ " \"type\": \"string\",\n" +
+ " \"description\": \"The person's last name.\"\n" +
+ " },\n" +
+ " \"age\": {\n" +
+ " \"description\": \"Age in years which must be equal to or greater than zero.\",\n" +
+ " \"type\": \"integer\",\n" +
+ " \"minimum\": 0\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("Unknown schema type")));
+ }
+
+ // verify that Kafka Connect JSON Converter schemas containing
+ // invalid types will be rejected
+ @Test
+ public void testValidateJsonSchemaTypesConfig() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.record.builder.json.schemas.enable", "true");
+ configProps.put("mq.record.builder.json.schema.content", "{\n" +
+ " \"type\": \"struct\", \n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"field\": \"test\", \n" +
+ " \"type\": \"not-a-real-type\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("Unknown schema type: not-a-real-type")));
+ }
}
diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java
index c79bc5f..fabd1ef 100644
--- a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java
+++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java
@@ -16,34 +16,48 @@
package com.ibm.eventstreams.connect.mqsource.builders;
import org.assertj.core.api.Assertions;
+import org.junit.Before;
import org.junit.Test;
+import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
+
import java.util.HashMap;
import java.util.Map;
public class RecordBuilderFactoryTest {
- final Map emptyProps = new HashMap<>();
+ final Map placeholderProps = new HashMap<>();
+
+ @Before
+ public void prepareProperties() {
+ placeholderProps.put("mq.queue.manager", "placeholder");
+ placeholderProps.put("mq.queue", "placeholder");
+ placeholderProps.put("topic", "placeholder");
+ }
@Test
public void testGetRecordBuilder_ForJsonRecordBuilder() {
- RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder("com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder", emptyProps);
+ Map props = new HashMap<>(placeholderProps);
+ props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
+
+ RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder(props);
Assertions.assertThat(recordBuilder).isInstanceOf(JsonRecordBuilder.class);
}
@Test
public void testGetRecordBuilder_ForDefaultRecordBuilder() {
- RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder("com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder", emptyProps);
+ Map props = new HashMap<>(placeholderProps);
+ props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+
+ RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder(props);
Assertions.assertThat(recordBuilder).isInstanceOf(DefaultRecordBuilder.class);
}
@Test(expected = RecordBuilderException.class)
public void testGetRecordBuilder_JunkClass() {
- RecordBuilderFactory.getRecordBuilder("casjsajhasdhusdo;iasd", emptyProps);
- }
+ Map props = new HashMap<>(placeholderProps);
+ props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "casjsajhasdhusdo;iasd");
- @Test(expected = RecordBuilderException.class)
- public void testGetRecordBuilder_NullProps() {
- RecordBuilderFactory.getRecordBuilder("casjsajhasdhusdo;iasd", null);
+ RecordBuilderFactory.getRecordBuilder(props);
}
}
\ No newline at end of file