Skip to content

Commit b632743

Browse files
committed
test: tests and docs to show intention for JSON schema support
I've chosen to support Kafka Connect's JSON schema support, rather than the different (and more widely understood) JSON schema. While supporting "standard" JSON schema would have simplified the user config in some respects, this would have left the MQ Connector with responsibility of performing the (ambiguous) conversion from the user-provided JSON schema to the schema used in Connect. As there is not a 1:1 mapping between these two schema types, I think it would be difficult to do such a conversion in a way that always meets user expectations. Instead, by making the user provide a Connect JSON schema, I'm proposing forcing the user to manually convert any json schema they may already have into a Connect schema - forcing them to make the appropriate choices in mapping between the two type systems. This was a difficult trade-off to make, as I'm favouring unambiguity of config over ease of config (if we assume that more users are comfortable writing "standard" JSON schemas than Connect JSON schemas). To try and catch confusions in this, I've included a unit test to ensure that we reject non-Connect schemas. Signed-off-by: Dale Lane <[email protected]>
1 parent c5e121f commit b632743

File tree

3 files changed

+265
-1
lines changed

3 files changed

+265
-1
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
306306
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
307307
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
308308
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
309-
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
309+
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
310310
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
311311
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
312312
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
313313
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
314314
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
315+
| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder | boolean | false | |
316+
| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |
315317

316318
### Using a CCDT file
317319

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import javax.jms.TextMessage;
5151

5252
import org.apache.kafka.connect.data.Schema;
53+
import org.apache.kafka.connect.data.Struct;
5354
import org.apache.kafka.connect.errors.ConnectException;
5455
import org.apache.kafka.connect.header.Headers;
5556
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
167168
}
168169
}
169170

171+
// verify that user can use the standard approach for the JsonConverter
172+
// of embedding schemas in message payloads (enabling this using a
173+
// record builder config option)
174+
@Test
175+
public void verifyJmsSchemaMessages() throws Exception {
176+
connectTask = getSourceTaskWithEmptyKafkaOffset();
177+
178+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
179+
connectorConfigProps.put("mq.message.body.jms", "true");
180+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
181+
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
182+
183+
connectTask.start(connectorConfigProps);
184+
185+
final List<Message> messages = new ArrayList<>();
186+
for (int i = 0; i < 5; i++) {
187+
messages.add(getJmsContext().createTextMessage(
188+
"{\n" +
189+
"\"schema\": {\n" +
190+
" \"type\": \"struct\", \n" +
191+
" \"fields\": [\n" +
192+
" {\n" +
193+
" \"field\": \"idx\", \n" +
194+
" \"type\": \"int64\"\n" +
195+
" },\n" +
196+
" {\n" +
197+
" \"field\": \"test\", \n" +
198+
" \"type\": \"string\"\n" +
199+
" }" +
200+
" ]\n" +
201+
"}, " +
202+
"\"payload\": { " +
203+
" \"idx\": " + i + ", " +
204+
" \"test\" : \"abcdef\" " +
205+
"}" +
206+
"}"));
207+
}
208+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
209+
210+
final List<SourceRecord> kafkaMessages = connectTask.poll();
211+
assertEquals(5, kafkaMessages.size());
212+
213+
for (int i = 0; i < 5; i++) {
214+
final SourceRecord kafkaMessage = kafkaMessages.get(i);
215+
assertNull(kafkaMessage.key());
216+
217+
assertNotNull(kafkaMessage.valueSchema());
218+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
219+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());
220+
221+
final Struct value = (Struct) kafkaMessage.value();
222+
assertEquals(Long.valueOf(i), value.getInt64("idx"));
223+
assertEquals("abcdef", value.getString("test"));
224+
225+
connectTask.commitRecord(kafkaMessage, null);
226+
}
227+
}
228+
229+
// verify that a reusable schema can be provided to the JSON record builder
230+
// as part of the connector config, so that this can be reused across
231+
// multiple MQ messages
232+
@Test
233+
public void verifyJmsReusableSchemaMessages() throws Exception {
234+
connectTask = getSourceTaskWithEmptyKafkaOffset();
235+
236+
final String SCHEMA = "{\n" +
237+
" \"type\": \"struct\", \n" +
238+
" \"fields\": [\n" +
239+
" {\n" +
240+
" \"field\": \"idx\", \n" +
241+
" \"type\": \"int32\"\n" +
242+
" },\n" +
243+
" {\n" +
244+
" \"field\": \"a\", \n" +
245+
" \"type\": \"string\"\n" +
246+
" },\n" +
247+
" {\n" +
248+
" \"field\": \"b\", \n" +
249+
" \"type\": \"int64\"\n" +
250+
" },\n" +
251+
" {\n" +
252+
" \"field\": \"c\", \n" +
253+
" \"type\": \"double\"\n" +
254+
" },\n" +
255+
" {\n" +
256+
" \"field\": \"d\", \n" +
257+
" \"type\": \"boolean\"\n" +
258+
" },\n" +
259+
" {\n" +
260+
" \"field\": \"e\", \n" +
261+
" \"type\": \"float\"\n" +
262+
" },\n" +
263+
" {\n" +
264+
" \"field\": \"f\", \n" +
265+
" \"type\": \"array\",\n" +
266+
" \"items\": {\n" +
267+
" \"type\": \"string\"\n" +
268+
" }\n" +
269+
" },\n" +
270+
" {\n" +
271+
" \"field\": \"g\", \n" +
272+
" \"type\": \"array\", \n" +
273+
" \"items\": {\n" +
274+
" \"type\": \"int32\"\n" +
275+
" }\n" +
276+
" },\n" +
277+
" {\n" +
278+
" \"field\": \"h\", \n" +
279+
" \"type\": \"struct\", \n" +
280+
" \"fields\": [\n" +
281+
" {\n" +
282+
" \"field\": \"innerstr\", \n" +
283+
" \"type\": \"string\"\n" +
284+
" },\n" +
285+
" {\n" +
286+
" \"field\": \"innernum\", \n" +
287+
" \"type\": \"int64\"\n" +
288+
" }\n" +
289+
" ]\n" +
290+
" }\n" +
291+
" ]\n" +
292+
"}";
293+
294+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
295+
connectorConfigProps.put("mq.message.body.jms", "true");
296+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
297+
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
298+
connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);
299+
300+
connectTask.start(connectorConfigProps);
301+
302+
final List<Message> messages = new ArrayList<>();
303+
for (int i = 0; i < 5; i++) {
304+
messages.add(getJmsContext().createTextMessage(
305+
"{ " +
306+
"\"idx\": " + i + ", \n" +
307+
"\"a\" : \"test\", \n" +
308+
"\"b\" : 1234, \n" +
309+
"\"c\" : 5.67, \n" +
310+
"\"d\" : false, \n" +
311+
"\"e\" : 12.34, \n" +
312+
"\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
313+
"\"g\" : [ 1, 2, 3 ], \n" +
314+
"\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
315+
"}"));
316+
}
317+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
318+
319+
final List<SourceRecord> kafkaMessages = connectTask.poll();
320+
assertEquals(5, kafkaMessages.size());
321+
322+
for (int i = 0; i < 5; i++) {
323+
final SourceRecord kafkaMessage = kafkaMessages.get(i);
324+
assertNull(kafkaMessage.key());
325+
326+
assertNotNull(kafkaMessage.valueSchema());
327+
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
328+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
329+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
330+
assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
331+
assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
332+
assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
333+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
334+
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
335+
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
336+
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());
337+
338+
final Struct value = (Struct) kafkaMessage.value();
339+
assertEquals(Integer.valueOf(i), value.getInt32("idx"));
340+
assertEquals("test", value.getString("a"));
341+
assertEquals(Long.valueOf(1234), value.getInt64("b"));
342+
assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
343+
assertEquals(false, value.getBoolean("d"));
344+
assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
345+
assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
346+
assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
347+
assertEquals("testing", value.getStruct("h").getString("innerstr"));
348+
assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));
349+
350+
connectTask.commitRecord(kafkaMessage, null);
351+
}
352+
}
353+
170354
@Test
171355
public void verifyMQMessage() throws Exception {
172356
connectTask = getSourceTaskWithEmptyKafkaOffset();

src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,82 @@ public void testValidateRetryDelayConfigWithDefaultValues() {
186186
.flatMap(cv -> cv.errorMessages().stream())
187187
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
188188
}
189+
190+
// verify that providing a schema that isn't JSON will be rejected
191+
@Test
192+
public void testValidateJsonSchemaConfig() {
193+
final Map<String, String> configProps = new HashMap<String, String>();
194+
configProps.put("mq.record.builder.json.schemas.enable", "true");
195+
configProps.put("mq.record.builder.json.schema.content", "Hello world");
196+
197+
final Config config = new MQSourceConnector().validate(configProps);
198+
199+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
200+
assertTrue(config.configValues().stream()
201+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
202+
.flatMap(cv -> cv.errorMessages().stream())
203+
.anyMatch(msg -> msg.contains("should be a Kafka Connect schema")));
204+
}
205+
206+
// verify that providing JSON (such as JSON schema) that isn't a
207+
// Kafka Connect JSON Converter schema will be rejected
208+
@Test
209+
public void testValidateJsonConnectSchemaConfig() {
210+
final Map<String, String> configProps = new HashMap<String, String>();
211+
configProps.put("mq.record.builder.json.schemas.enable", "true");
212+
configProps.put("mq.record.builder.json.schema.content", "{\n" +
213+
" \"$id\": \"https:example.com/person.schema.json\",\n" +
214+
" \"$schema\": \"https:json-schema.org/draft/2020-12/schema\",\n" +
215+
" \"title\": \"Person\",\n" +
216+
" \"type\": \"object\",\n" +
217+
" \"properties\": {\n" +
218+
" \"firstName\": {\n" +
219+
" \"type\": \"string\",\n" +
220+
" \"description\": \"The person's first name.\"\n" +
221+
" },\n" +
222+
" \"lastName\": {\n" +
223+
" \"type\": \"string\",\n" +
224+
" \"description\": \"The person's last name.\"\n" +
225+
" },\n" +
226+
" \"age\": {\n" +
227+
" \"description\": \"Age in years which must be equal to or greater than zero.\",\n" +
228+
" \"type\": \"integer\",\n" +
229+
" \"minimum\": 0\n" +
230+
" }\n" +
231+
" }\n" +
232+
"}");
233+
234+
final Config config = new MQSourceConnector().validate(configProps);
235+
236+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
237+
assertTrue(config.configValues().stream()
238+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
239+
.flatMap(cv -> cv.errorMessages().stream())
240+
.anyMatch(msg -> msg.contains("Unknown schema type")));
241+
}
242+
243+
// verify that Kafka Connect JSON Converter schemas containing
244+
// invalid types will be rejected
245+
@Test
246+
public void testValidateJsonSchemaTypesConfig() {
247+
final Map<String, String> configProps = new HashMap<String, String>();
248+
configProps.put("mq.record.builder.json.schemas.enable", "true");
249+
configProps.put("mq.record.builder.json.schema.content", "{\n" +
250+
" \"type\": \"struct\", \n" +
251+
" \"fields\": [\n" +
252+
" {\n" +
253+
" \"field\": \"test\", \n" +
254+
" \"type\": \"not-a-real-type\"\n" +
255+
" }\n" +
256+
" ]\n" +
257+
"}");
258+
259+
final Config config = new MQSourceConnector().validate(configProps);
260+
261+
assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
262+
assertTrue(config.configValues().stream()
263+
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT))
264+
.flatMap(cv -> cv.errorMessages().stream())
265+
.anyMatch(msg -> msg.contains("Unknown schema type: not-a-real-type")));
266+
}
189267
}

0 commit comments

Comments
 (0)