Skip to content

Commit 553eb74

Browse files
committed
feat: support for schemas in JSON record builder
This commit introduces support for emitting structured records from the JSON record builder. This will allow the MQ Source Connector to read JSON string messages from MQ, and produce them to Kafka using any standard Converter (e.g. to produce them in Avro or Protobuf formats if desired). The JsonConverter dependency used in JSON record builder has support for this from Kafka Connect v4.2, so the simplest implementation would be to update the dependency in pom.xml to version 4.2, and just pass through the schemas.enable and schema.content configuration properties to the converter and leave the Converter to do everything. This felt like an overly aggressive dependency jump, so in the interest of continuing to support Connect 3.x versions, I've implemented a fall-back implementation that reuses the schema "envelope" approach present in JsonConverter 3.x The additional string operations this will incur for every message will almost certainly impact performance, so I see this as a temporary workaround that we should remove as soon as we feel that Connect 4.x adoption is sufficient. Signed-off-by: Dale Lane <[email protected]>
1 parent b632743 commit 553eb74

File tree

2 files changed

+127
-4
lines changed

2 files changed

+127
-4
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,19 @@
3333
import org.apache.kafka.common.config.ConfigException;
3434
import org.apache.kafka.common.config.ConfigValue;
3535
import org.apache.kafka.connect.connector.Task;
36+
import org.apache.kafka.connect.errors.DataException;
37+
import org.apache.kafka.connect.json.JsonConverter;
38+
import org.apache.kafka.connect.json.JsonConverterConfig;
39+
import org.apache.kafka.connect.json.JsonDeserializer;
3640
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
3741
import org.apache.kafka.connect.source.ExactlyOnceSupport;
3842
import org.apache.kafka.connect.source.SourceConnector;
43+
import org.apache.kafka.connect.storage.ConverterType;
3944
import org.slf4j.Logger;
4045
import org.slf4j.LoggerFactory;
4146

47+
import com.fasterxml.jackson.databind.JsonNode;
48+
4249
public class MQSourceConnector extends SourceConnector {
4350
private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class);
4451

@@ -212,6 +219,15 @@ public class MQSourceConnector extends SourceConnector {
212219
"keys, all error context header keys will start with <code>__connect.errors.</code>";
213220
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
214221

222+
public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "mq.record.builder.json.schemas.enable";
223+
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Include schemas within the Kafka messages produced by the JSON record builder.";
224+
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Enable Schemas";
225+
226+
public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "mq.record.builder.json.schema.content";
227+
public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "When set, this is used as the schema for all messages. This should be a Kafka Connect schema, as used by JsonConverter.";
228+
public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "Schema Content";
229+
230+
215231
// Define valid reconnect options
216232
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
217233
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -666,6 +682,22 @@ null, new ReadableFile(),
666682
32,
667683
ConfigDef.Width.MEDIUM,
668684
CONFIG_DISPLAY_MAX_POLL_TIME);
685+
CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE,
686+
Type.BOOLEAN,
687+
false, new ConfigDef.NonNullValidator(),
688+
Importance.LOW,
689+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE,
690+
CONFIG_GROUP_MQ, 33,
691+
Width.SHORT,
692+
CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE);
693+
CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT,
694+
Type.STRING,
695+
null, new SchemaValidator(),
696+
Importance.LOW,
697+
CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT,
698+
CONFIG_GROUP_MQ, 34,
699+
Width.MEDIUM,
700+
CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT);
669701

670702
CONFIGDEF.define(CONFIG_NAME_TOPIC,
671703
Type.STRING,
@@ -716,6 +748,43 @@ public void ensureValid(final String name, final Object value) {
716748
}
717749
}
718750

751+
private static class SchemaValidator implements ConfigDef.Validator {
752+
@Override
753+
public void ensureValid(final String name, final Object value) {
754+
final String strValue = (String) value;
755+
if (value == null || strValue.trim().isEmpty()) {
756+
// only validate non-empty schemas
757+
return;
758+
}
759+
760+
// Start with a quick and simple "sniff test" on the provided schema
761+
// by checking if it starts and ends in curly-parentheses
762+
// This will quickly catch obvious configuration errors, such as
763+
// providing a name, id, or file location for a schema
764+
final String trimmedStr = strValue.trim();
765+
if (!trimmedStr.startsWith("{") || !trimmedStr.endsWith("}")) {
766+
throw new ConfigException(name, value, "Value should be a Kafka Connect schema");
767+
}
768+
769+
// Create a temporary JsonDeserializer/JsonConverter to parse the
770+
// provided schema.
771+
// The aim for doing this is to catch any invalid schemas at
772+
// startup time, rather than allow this to go unnoticed until
773+
// the first MQ message is received (potentially a long time
774+
// later).
775+
try (
776+
final JsonDeserializer deserializer = new JsonDeserializer();
777+
final JsonConverter conv = new JsonConverter()
778+
) {
779+
final JsonNode jsonStr = deserializer.deserialize(trimmedStr, trimmedStr.getBytes());
780+
conv.configure(Map.of(JsonConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()));
781+
conv.asConnectSchema(jsonStr);
782+
} catch (final DataException exc) {
783+
throw new ConfigException(name, value, exc.getMessage());
784+
}
785+
}
786+
}
787+
719788
/**
720789
* Signals that this connector is not capable of defining other transaction boundaries.
721790
* A new transaction will be started and committed for every batch of records returned by {@link MQSourceTask#poll()}.

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@
1818
import static java.nio.charset.StandardCharsets.UTF_8;
1919

2020
import java.util.HashMap;
21+
import java.util.Map;
22+
2123
import javax.jms.BytesMessage;
2224
import javax.jms.JMSContext;
2325
import javax.jms.JMSException;
2426
import javax.jms.Message;
2527
import javax.jms.TextMessage;
2628

29+
import org.apache.kafka.common.config.AbstractConfig;
2730
import org.apache.kafka.connect.data.SchemaAndValue;
2831
import org.apache.kafka.connect.json.JsonConverter;
32+
import org.apache.kafka.connect.json.JsonConverterConfig;
2933
import org.slf4j.Logger;
3034
import org.slf4j.LoggerFactory;
3135

36+
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
37+
3238
/**
3339
* Builds Kafka Connect SourceRecords from messages. It parses the bytes of the payload of JMS
3440
* BytesMessage and TextMessage as JSON and creates a SourceRecord with a null schema.
@@ -38,13 +44,53 @@ public class JsonRecordBuilder extends BaseRecordBuilder {
3844

3945
private JsonConverter converter;
4046

47+
// From Kafka Connect 4.2 onwards, JsonConverter includes schema support
48+
// To support earlier versions of the dependency, the record builder includes a
49+
// workaround implementation.
50+
// This variable should be true where the workaround implementation is required.
51+
private boolean recordBuilderSchemaSupport = false;
52+
53+
// Workaround for supporting schemas is to embed the schema in the message payload
54+
// given to the JsonConverter. This variable contains a String to concatenate with
55+
// the string received from MQ in order to achieve this.
56+
private String schemaSupportEnvelope = null;
57+
4158
public JsonRecordBuilder() {
4259
log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
4360
converter = new JsonConverter();
61+
}
62+
63+
/**
64+
* Configure this class. In addition to the MQ message handling config
65+
* used by BaseRecordBuilder, this also configures the JsonConverter
66+
* used by this record builder to parse JSON messages from MQ.
67+
*/
68+
@Override
69+
public void configure(final Map<String, String> props) {
70+
super.configure(props);
71+
72+
final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
73+
final boolean schemasEnable = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE);
74+
String schemaContent = null;
75+
if (schemasEnable) {
76+
schemaContent = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT);
77+
if (schemaContent != null) {
78+
schemaContent = schemaContent.trim();
79+
}
80+
}
81+
82+
if (schemasEnable && schemaContent != null &&
83+
!JsonConverterConfig.configDef().names().contains("schema.content")) {
4484

45-
// We just want the payload, not the schema in the output message
46-
final HashMap<String, String> m = new HashMap<>();
47-
m.put("schemas.enable", "false");
85+
// support for schemas provided separately from message payloads is requested
86+
// but not available natively within the JsonConverter present in the classpath
87+
recordBuilderSchemaSupport = true;
88+
schemaSupportEnvelope = "{\"schema\": " + schemaContent + ", \"payload\": ";
89+
}
90+
91+
final Map<String, String> m = new HashMap<>();
92+
m.put("schemas.enable", Boolean.toString(schemasEnable));
93+
m.put("schema.content", schemaContent);
4894

4995
// Convert the value, not the key (isKey == false)
5096
converter.configure(m, false);
@@ -77,6 +123,14 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin
77123
throw new RecordBuilderException("Unsupported JMS message type");
78124
}
79125

80-
return converter.toConnectData(topic, payload);
126+
if (recordBuilderSchemaSupport) {
127+
return converter.toConnectData(topic,
128+
// embed schema in the event payload
129+
(schemaSupportEnvelope + new String(payload) + "}").getBytes());
130+
} else {
131+
return converter.toConnectData(topic,
132+
// submit the payload as-is to the converter
133+
payload);
134+
}
81135
}
82136
}

0 commit comments

Comments
 (0)