From 3bba7449a7935a1f13e085267c5e4b265887d7c3 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Fri, 18 Dec 2020 10:00:32 +0100 Subject: [PATCH 01/12] Update readme: - Move description to the correct property - Add the 2 other currently allowed values for "message.converter" property in the RabbitMQ source connector --- README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5bb208b..0d810bf 100644 --- a/README.md +++ b/README.md @@ -63,15 +63,21 @@ The username to authenticate to RabbitMQ with. See `ConnectionFactory.setUsernam *Default Value:* / -Converter to compose the Kafka message. +The virtual host to use when connecting to the broker. See `ConnectionFactory.setVirtualHost(java.lang.String) `_ + ##### `message.converter` *Importance:* Medium *Type:* String *Default Value:* com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter +*Other allowed values*: +- com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter +- com.github.themeetgroup.kafka.connect.rabbitmq.source.data.StringSourceMessageConverter + +Converter to compose the Kafka message. + -The virtual host to use when connecting to the broker. See `ConnectionFactory.setVirtualHost(java.lang.String) `_ ##### `rabbitmq.port` *Importance:* Medium From f284110c94b009dc230cafd704d9b80f34612027 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Fri, 18 Dec 2020 22:13:09 +0100 Subject: [PATCH 02/12] Add option to add a formatter when writing data to RabbitMQ: - Before you had to use the ByteArrayConverter to use the RabbitMQ sink connector, now you can use more converters such as the avro converter and provide a formatter to convert the kafka connect agnostic format to for example json. Can be extended to avro, parquet, ... - Renamed test topic in script to "rabbitmq-test". It is discourgaged to use dots - Re-add the config directory to have a test config you can use when locally debugging with docker compose - Update readme with new option --- README.md | 10 +++ bin/create-topic.sh | 2 +- bin/debug.sh | 2 +- config/RabbitMQSinkConnector.properties | 7 ++ config/RabbitMQSourceConnector.properties | 6 ++ config/connect-avro-docker.properties | 6 ++ pom.xml | 53 +++++++++++++-- .../sink/RabbitMQSinkConnectorConfig.java | 19 +++--- .../rabbitmq/sink/RabbitMQSinkTask.java | 17 +++-- .../sink/format/BytesRecordFormatter.java | 37 +++++++++++ .../sink/format/JsonRecordFormatter.java | 65 +++++++++++++++++++ .../rabbitmq/sink/format/RecordFormatter.java | 34 ++++++++++ 12 files changed, 232 insertions(+), 26 deletions(-) create mode 100644 config/RabbitMQSinkConnector.properties create mode 100644 config/RabbitMQSourceConnector.properties create mode 100644 config/connect-avro-docker.properties create mode 100644 src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java create mode 100644 src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java create mode 100644 src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java diff --git a/README.md b/README.md index 0d810bf..9627f5d 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,16 @@ exchange to publish the messages on. routing key used for publishing the messages. + +##### `rabbitmq.format` +*Importance:* High + +*Type:* String + +*Default Value:* bytes + +The format type to use when writing data to RabbitMQ (currently supported values are bytes, json) + ##### `topics` *Importance:* High diff --git a/bin/create-topic.sh b/bin/create-topic.sh index 56ad312..708b1ef 100755 --- a/bin/create-topic.sh +++ b/bin/create-topic.sh @@ -15,4 +15,4 @@ # limitations under the License. # -kafka-topics --create --topic rabbitmq.test --bootstrap-server 127.0.0.1:9092 \ No newline at end of file +kafka-topics --create --topic rabbitmq-test --bootstrap-server 127.0.0.1:9092 \ No newline at end of file diff --git a/bin/debug.sh b/bin/debug.sh index 806ef00..fd66d79 100755 --- a/bin/debug.sh +++ b/bin/debug.sh @@ -22,4 +22,4 @@ export KAFKA_DEBUG='y' set -e mvn clean package -connect-standalone config/connect-avro-docker.properties config/RabbitMQSourceConnector.properties \ No newline at end of file +connect-standalone config/connect-avro-docker.properties config/RabbitMQSinkConnector.properties \ No newline at end of file diff --git a/config/RabbitMQSinkConnector.properties b/config/RabbitMQSinkConnector.properties new file mode 100644 index 0000000..598daab --- /dev/null +++ b/config/RabbitMQSinkConnector.properties @@ -0,0 +1,7 @@ +name=rabbitmq-sink +tasks.max=1 +connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector +rabbitmq.exchange=exchange +rabbitmq.routing.key=routingkey +rabbitmq.format=json +topics=rabbitmq-test \ No newline at end of file diff --git a/config/RabbitMQSourceConnector.properties b/config/RabbitMQSourceConnector.properties new file mode 100644 index 0000000..39785b0 --- /dev/null +++ b/config/RabbitMQSourceConnector.properties @@ -0,0 +1,6 @@ +name=rabbitmq-source +tasks.max=1 +connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector +rabbitmq.queue=test +kafka.topic=rabbitmq-test +message.converter=com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter \ No newline at end of file diff --git a/config/connect-avro-docker.properties b/config/connect-avro-docker.properties new file mode 100644 index 0000000..3a49888 --- /dev/null +++ b/config/connect-avro-docker.properties @@ -0,0 +1,6 @@ +bootstrap.servers=localhost:9092 +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=http://localhost:8081 +offset.storage.file.filename=/tmp/connect.offsets +plugin.path=target/kafka-connect-target/usr/share/kafka-connect \ No newline at end of file diff --git a/pom.xml b/pom.xml index d18eca4..c67d15e 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,13 @@ + + + Confluent + https://packages.confluent.io/maven/ + + + jcustenborder @@ -32,13 +39,21 @@ - insidn + insidin Jan Uyttenhove https://github.com/insidin Committer + + jelledv + Jelle De Vleminck + https://github.com/jelledv + + Committer + + @@ -55,9 +70,37 @@ 5.10.0 3.3.0 + 2.6.0 + 6.0.1 + + org.apache.kafka + connect-json + ${kafka.version} + + + org.apache.kafka + connect-runtime + ${kafka.version} + provided + + + io.confluent + kafka-schema-registry + ${confluent.version} + + + io.confluent + kafka-connect-avro-converter + ${confluent.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + com.rabbitmq amqp-client @@ -71,13 +114,9 @@ org.apache.avro avro - 1.8.2 - - - io.confluent - kafka-avro-serializer - 5.1.0 + 1.10.1 + diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java index 21497c3..bc99c2c 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java @@ -24,6 +24,7 @@ import com.github.jcustenborder.kafka.connect.utils.template.StructTemplate; public class RabbitMQSinkConnectorConfig extends CommonRabbitMQConnectorConfig { + static final String KAFKA_TOPIC_TEMPLATE = "kafkaTopicTemplate"; public static final String TOPIC_CONF = "topics"; static final String TOPIC_DOC = "Kafka topic to read the messages from."; @@ -36,14 +37,17 @@ public class RabbitMQSinkConnectorConfig extends CommonRabbitMQConnectorConfig { public static final String ROUTING_KEY_CONF = "rabbitmq.routing.key"; static final String ROUTING_KEY_DOC = "routing key used for publishing the messages."; + public static final String FORMAT_CONF = "rabbitmq.format"; + public static final String FORMAT_CONF_DOC = "The format type to use when writing data to rabbitMQ"; + public static final String FORMAT_CONF_DEFAULT = "bytes"; public static final String HEADER_CONF = "rabbitmq.headers"; public static final String HEADER_CONF_DOC = "Headers to set for outbounf messages. Set with `headername1`:`headervalue1`,`headername2`:`headervalue2`"; - //TODO: include other config variables here public final StructTemplate kafkaTopic; public final String exchange; public final String routingKey; + public final String format; public RabbitMQSinkConnectorConfig(Map settings) { super(config(), settings); @@ -52,16 +56,15 @@ public RabbitMQSinkConnectorConfig(Map settings) { this.kafkaTopic.addTemplate(KAFKA_TOPIC_TEMPLATE, kafkaTopicFormat); this.exchange = this.getString(EXCHANGE_CONF); this.routingKey = this.getString(ROUTING_KEY_CONF); + this.format = this.getString(FORMAT_CONF); } public static ConfigDef config() { return CommonRabbitMQConnectorConfig.config() - .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(EXCHANGE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, EXCHANGE_DOC) - .define(ROUTING_KEY_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ROUTING_KEY_DOC) - .define(HEADER_CONF, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.LOW, HEADER_CONF_DOC); - - + .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(EXCHANGE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, EXCHANGE_DOC) + .define(ROUTING_KEY_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ROUTING_KEY_DOC) + .define(FORMAT_CONF, ConfigDef.Type.STRING, FORMAT_CONF_DEFAULT, ConfigDef.Importance.HIGH, FORMAT_CONF_DOC) + .define(HEADER_CONF, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.LOW, HEADER_CONF_DOC); } - } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java index 56d2d42..0e2a9ad 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java @@ -17,6 +17,7 @@ package com.github.themeetgroup.kafka.connect.rabbitmq.sink; import com.github.jcustenborder.kafka.connect.utils.VersionUtil; +import com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.RecordFormatter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -35,12 +36,12 @@ import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnectorConfig.HEADER_CONF; public class RabbitMQSinkTask extends SinkTask { - private static final Logger log = LoggerFactory.getLogger(RabbitMQSinkTask.class); - RabbitMQSinkConnectorConfig config; - - Channel channel; - Connection connection; + private static final Logger log = LoggerFactory.getLogger(RabbitMQSinkTask.class); + private RabbitMQSinkConnectorConfig config; + private RecordFormatter recordFormatter; + private Channel channel; + private Connection connection; @Override public String version() { @@ -51,12 +52,9 @@ public String version() { public void put(Collection sinkRecords) { for (SinkRecord record : sinkRecords) { log.trace("current sinkRecord value: " + record.value()); - if (!(record.value() instanceof byte[])) { - throw new ConnectException("the value of the record has an invalid type (must be of type byte[])"); - } try { channel.basicPublish(this.config.exchange, this.config.routingKey, - RabbitMQSinkHeaderParser.parse(config.getString(HEADER_CONF)), (byte[]) record.value()); + RabbitMQSinkHeaderParser.parse(config.getString(HEADER_CONF)), recordFormatter.format(record)); } catch (IOException e) { log.error("There was an error while publishing the outgoing message to RabbitMQ"); throw new RetriableException(e); @@ -67,6 +65,7 @@ public void put(Collection sinkRecords) { @Override public void start(Map settings) { this.config = new RabbitMQSinkConnectorConfig(settings); + this.recordFormatter = RecordFormatter.getInstance(config.format); ConnectionFactory connectionFactory = this.config.connectionFactory(); try { log.info("Opening connection to {}:{}/{} (SSL: {})", this.config.host, this.config.port, this.config.virtualHost, this.config.useSsl); diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java new file mode 100644 index 0000000..715c908 --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.converters.ByteArrayConverter; +import org.apache.kafka.connect.sink.SinkRecord; + +public class BytesRecordFormatter implements RecordFormatter { + + private final ByteArrayConverter converter; + + public BytesRecordFormatter() { + converter = new ByteArrayConverter(); + } + + @Override + public byte[] format(SinkRecord record) { + return converter.fromConnectData( + record.topic(), + record.valueSchema(), + record.value()); + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java new file mode 100644 index 0000000..0b6b01c --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class JsonRecordFormatter implements RecordFormatter { + + private final ObjectMapper mapper; + private final JsonConverter converter; + + public JsonRecordFormatter() { + this.mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); + mapper.registerModule(new JavaTimeModule()); + converter = new JsonConverter(); + Map converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converterConfig.put("schemas.cache.size", "10"); + this.converter.configure(converterConfig, false); + } + + @Override + public byte[] format(SinkRecord record) { + try { + Object value = record.value(); + if (value instanceof Struct) { + return converter.fromConnectData( + record.topic(), + record.valueSchema(), + value + ); + } else { + return mapper.writeValueAsBytes(value); + } + } catch (IOException e) { + throw new ConnectException(e); + } + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java new file mode 100644 index 0000000..2279191 --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface RecordFormatter { + + byte[] format(SinkRecord sinkRecord); + + static RecordFormatter getInstance(String type) { + if ("bytes".equals(type)) { + return new BytesRecordFormatter(); + } else if ("json".equals(type)) { + return new JsonRecordFormatter(); + } + throw new ConnectException("The provided format type is not one of 'bytes' or 'json', but: " + type); + } +} From 8151a5303b51f2d06fe3c77cd47e49a20fc86909 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Mon, 21 Dec 2020 14:34:12 +0100 Subject: [PATCH 03/12] Extend the sink formatting options - Add non confluent avro formatter - We have a requirement where we do not want our Rabbitmq consumers to depend on our schema registry. Instead we give them a schema they can use to deserialize the data written from a queue. - If you want confluent avro on the rabbitMq queue, you can still use the org.apache.kafka.connect.converters.ByteArrayConverter after putting confluent avro on your topic - Added unit tests for all formatters --- README.md | 7 +- .../rabbitmq/sink/format/AvroFormatter.java | 84 +++++++++++++++++++ .../rabbitmq/sink/format/RecordFormatter.java | 4 +- .../sink/format/AvroFormatterTest.java | 78 +++++++++++++++++ .../sink/format/BytesRecordFormatterTest.java | 39 +++++++++ .../sink/format/JsonRecordFormatterTest.java | 84 +++++++++++++++++++ .../rabbitmq/sink/format/TestData.java | 36 ++++++++ src/test/resources/payment.avsc | 11 +++ 8 files changed, 341 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java create mode 100644 src/test/resources/payment.avsc diff --git a/README.md b/README.md index 9627f5d..e62f7cf 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ The virtual host to use when connecting to the broker. See `ConnectionFactory.se *Type:* String *Default Value:* com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter + *Other allowed values*: - com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter - com.github.themeetgroup.kafka.connect.rabbitmq.source.data.StringSourceMessageConverter @@ -270,7 +271,11 @@ routing key used for publishing the messages. *Default Value:* bytes -The format type to use when writing data to RabbitMQ (currently supported values are bytes, json) +*Other allowed values*: +- json +- avro (non Confluent avro) + +The format type to use when writing data to RabbitMQ ##### `topics` *Importance:* High diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java new file mode 100644 index 0000000..980d915 --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import io.confluent.connect.avro.AvroData; +import io.confluent.kafka.serializers.NonRecordContainer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class AvroFormatter implements RecordFormatter { + + private final AvroData avroData; + private final EncoderFactory encoderFactory; + + public AvroFormatter() { + avroData = new AvroData(10); + encoderFactory = EncoderFactory.get(); + } + + @Override + public byte[] format(SinkRecord sinkRecord) { + Schema avroSchema = avroData.fromConnectSchema(sinkRecord.valueSchema()); + Object o = avroData.fromConnectData(sinkRecord.valueSchema(), sinkRecord.value()); + if (o == null) { + return null; + } + return serialize(o, avroSchema); + } + + private byte[] serialize(Object object, Schema schema) { + Object value = object instanceof NonRecordContainer ? ((NonRecordContainer) object).getValue() : object; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + if (schema.getType() == Schema.Type.BYTES) { + if (value instanceof byte[]) { + out.write(((byte[]) value)); + } else { + if (!(value instanceof ByteBuffer)) { + throw new DataException("Error serializing message to format Avro. Unrecognized bytes object of type: " + value.getClass().getName()); + } + out.write(((ByteBuffer) value).array()); + } + } else { + BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); + DatumWriter writer; + if (value instanceof SpecificRecord) { + writer = new SpecificDatumWriter<>(schema); + } else { + writer = new GenericDatumWriter<>(schema); + } + writer.write(value, encoder); + encoder.flush(); + } + + return out.toByteArray(); + } catch (IOException e) { + throw new DataException("Error serializing message to format Avro", e); + } + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java index 2279191..189a15d 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java @@ -28,7 +28,9 @@ static RecordFormatter getInstance(String type) { return new BytesRecordFormatter(); } else if ("json".equals(type)) { return new JsonRecordFormatter(); + } else if ("avro".equals(type)) { + return new AvroFormatter(); } - throw new ConnectException("The provided format type is not one of 'bytes' or 'json', but: " + type); + throw new ConnectException("The provided format type is not one of 'bytes', 'json' or 'avro', but: " + type); } } diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java new file mode 100644 index 0000000..81ad8a5 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java @@ -0,0 +1,78 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentSchema; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class AvroFormatterTest { + + private final RecordFormatter avroRecordFormatter = new AvroFormatter(); + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(new MockSchemaRegistryClient()); + + private Schema schema; + + @BeforeEach + void setUp() throws IOException, URISyntaxException { + URL resource = this.getClass().getClassLoader().getResource("payment.avsc"); + schema = new Schema.Parser().parse(new File(resource.toURI())); + } + + @Test + void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToJson() throws IOException { + Struct payment = paymentValue(1, true, "testSender"); + SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); + + byte[] output = avroRecordFormatter.format(sinkRecord); + + GenericRecord record = toGenericRecord(schema, output); + assertEquals(1, record.get("id")); + assertEquals(true, record.get("isCashPayment")); + assertEquals(new Utf8("testSender"), record.get("sender")); + } + + // The "avro" formatter is serializing data in NON-confluent avro, meaning the first bytes do not contain the schema id + // see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format + // If you want confluent avro bytes using the RabbitMQ sink connector + // you can use the "org.apache.kafka.connect.converters.ByteArrayConverter" converter + // after putting confluent avro serialized data on your topic + @Test + void validateExceptionIsThrown_whenTryingToDeserializeOutputWithKafkaAvroDeserializer() { + Struct payment = paymentValue(1, true, "testSender"); + SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); + + byte[] output = avroRecordFormatter.format(sinkRecord); + + assertThrows(SerializationException.class, () -> kafkaAvroDeserializer.deserialize("test", output)); + } + + private GenericRecord toGenericRecord(Schema schema, byte[] avroBytes) throws IOException { + DatumReader reader = new GenericDatumReader<>(schema); + ByteArrayInputStream stream = new ByteArrayInputStream(avroBytes); + BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(stream, null); + return reader.read(null, binaryDecoder); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java new file mode 100644 index 0000000..f02dcc8 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java @@ -0,0 +1,39 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class BytesRecordFormatterTest { + + private final RecordFormatter bytesRecordFormatter = new BytesRecordFormatter(); + + @Test + void givenABytesSchemaAndValue_expectCorrectFormat() { + byte[] testString = "test".getBytes(StandardCharsets.UTF_8); + SinkRecord record = createSinkRecord(Schema.BYTES_SCHEMA, testString); + + byte[] output = bytesRecordFormatter.format(record); + + assertEquals(testString, output); + } + + // When using the BytesFormatter, the "org.apache.kafka.connect.converters.ByteArrayConverter" must be used as value converter + // This is also the default behaviour when not specifying a formatter + @Test + void givenAStruct_whenFormattingWithBytesRecordFormatter_expectDataException() { + Struct payment = paymentValue(1, true, "testSender"); + SinkRecord record = createSinkRecord(TestData.paymentSchema(), payment); + + assertThrows(DataException.class, () -> bytesRecordFormatter.format(record)); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java new file mode 100644 index 0000000..ed647d1 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java @@ -0,0 +1,84 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class JsonRecordFormatterTest { + + private final RecordFormatter jsonRecordFormatter = new JsonRecordFormatter(); + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + objectMapper.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); + objectMapper.registerModule(new JavaTimeModule()); + } + + @Test + void givenAStringSchemaAndValue_whenFormattingWithJsonRecordFormatter_expectQuotedString() { + String value = "test"; + SinkRecord sinkRecord = TestData.createSinkRecord(Schema.STRING_SCHEMA, value); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + assertEquals("\"test\"", new String(output, StandardCharsets.UTF_8)); + } + + @Test + void givenAnIntSchemaAndValue_whenFormattingWithJsonRecordFormatter_expectQuotedInt() { + int value = 44; + SinkRecord sinkRecord = TestData.createSinkRecord(Schema.INT32_SCHEMA, value); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + assertEquals("44", new String(output, StandardCharsets.UTF_8)); + } + + @Test + void givenAStruct_whenFormattingWithJsonRecordFormatter_expectStructToJson() throws IOException { + Struct payment = paymentValue(1, true, "testSender"); + SinkRecord sinkRecord = createSinkRecord(TestData.paymentSchema(), payment); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + Map map = objectMapper.readValue(output, Map.class); + assertEquals(4, map.size()); + assertEquals(1, map.get("id")); + assertEquals(true, map.get("isCashPayment")); + assertEquals("testSender", map.get("sender")); + assertNull(map.get("comment")); + } + + @Test + void givenASchemalessValue_whenFormattingWithJsonRecordFormatter_expectMapToJson() throws IOException { + Map schemalessValue = new HashMap<>(); + schemalessValue.put("id", 1); + schemalessValue.put("sender", "testSender"); + SinkRecord sinkRecord = createSinkRecord(null, schemalessValue); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + Map map = objectMapper.readValue(output, Map.class); + assertEquals(2, map.size()); + assertEquals(1, map.get("id")); + assertEquals("testSender", map.get("sender")); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java new file mode 100644 index 0000000..7a0546b --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java @@ -0,0 +1,36 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + +public class TestData { + + public static Schema paymentSchema() { + return SchemaBuilder.struct() + .name("com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.Payment") + .doc("Payment schema used in unit tests") + .field("id", SchemaBuilder.int32().build()) + .field("isCashPayment", SchemaBuilder.bool().build()) + .field("sender", SchemaBuilder.string().build()) + .field("comment", SchemaBuilder.string().optional().build()) + .build(); + } + + public static Struct paymentValue(int id, boolean isCashPayment, String sender) { + return paymentValue(id, isCashPayment, sender, null); + } + + public static Struct paymentValue(int id, boolean isCashPayment, String sender, String comment) { + return new Struct(paymentSchema()) + .put("id", id) + .put("isCashPayment", isCashPayment) + .put("sender", sender) + .put("comment", comment); + } + + public static SinkRecord createSinkRecord(Schema valueSchema, Object value) { + return new SinkRecord("test", 0, null, null, valueSchema, value, 0); + } +} diff --git a/src/test/resources/payment.avsc b/src/test/resources/payment.avsc new file mode 100644 index 0000000..adce295 --- /dev/null +++ b/src/test/resources/payment.avsc @@ -0,0 +1,11 @@ +{ + "type": "record", + "name": "Payment", + "namespace": "com.github.themeetgroup.kafka.connect.rabbitmq.sink.format", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "isCashPayment", "type": "boolean"}, + {"name": "sender", "type": "string"}, + {"name": "comment", "type": ["null", "string"], "default": null} + ] +} \ No newline at end of file From 6485d0ad5de56077cdf3a619be67efb3e663a736 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Thu, 7 Jan 2021 13:32:24 +0100 Subject: [PATCH 04/12] Add fat jar build for use in docker container --- pom.xml | 19 +++++++++++++++++++ .../sink/format/AvroFormatterTest.java | 1 + 2 files changed, 20 insertions(+) diff --git a/pom.xml b/pom.xml index c67d15e..58e5350 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,25 @@ + + org.apache.maven.plugins + maven-assembly-plugin + 3.1.1 + + + jar-with-dependencies + + + + + fat-jar + package + + single + + + + org.apache.maven.plugins maven-javadoc-plugin diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java index 81ad8a5..3c668fd 100644 --- a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java @@ -8,6 +8,7 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.util.Utf8; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.connect.data.Struct; From d9d2db411a17ee3d268726948645bde22f37fbc1 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Thu, 7 Jan 2021 15:27:33 +0100 Subject: [PATCH 05/12] Cleanup dependencies. Fix bug with conflicting dependencies on jackson library --- pom.xml | 40 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 58e5350..e805ab5 100644 --- a/pom.xml +++ b/pom.xml @@ -71,50 +71,38 @@ 5.10.0 3.3.0 2.6.0 - 6.0.1 + 6.0.0 + + 2.8.5 - - org.apache.kafka - connect-json - ${kafka.version} - - - org.apache.kafka - connect-runtime - ${kafka.version} - provided - - - io.confluent - kafka-schema-registry - ${confluent.version} - io.confluent kafka-connect-avro-converter ${confluent.version} - io.confluent - kafka-avro-serializer - ${confluent.version} + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + com.rabbitmq amqp-client ${rabbitmq.version} - com.github.jcustenborder.kafka.connect - connect-utils-testing-data - ${connect-utils.version} + org.apache.kafka + connect-json + ${kafka.version} - org.apache.avro - avro - 1.10.1 + org.apache.kafka + connect-runtime + ${kafka.version} + provided From 18f4b1152c5911cb1eb7d954baf3f6d39d97fd83 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Mon, 11 Jan 2021 14:25:44 +0100 Subject: [PATCH 06/12] Add support for enums --- pom.xml | 25 ++++++++- .../rabbitmq/sink/format/AvroFormatter.java | 9 ++- .../sink/format/AvroFormatterTest.java | 18 +++++- .../sink/format/BytesRecordFormatterTest.java | 2 +- .../sink/format/JsonRecordFormatterTest.java | 5 +- .../sink/format/PlainAvroDeserializer.java | 55 +++++++++++++++++++ .../rabbitmq/sink/format/TestData.java | 18 +++++- src/test/resources/payment.avsc | 1 + 8 files changed, 122 insertions(+), 11 deletions(-) create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java diff --git a/pom.xml b/pom.xml index e805ab5..bac0686 100644 --- a/pom.xml +++ b/pom.xml @@ -72,8 +72,9 @@ 3.3.0 2.6.0 6.0.0 - + 2.8.5 + 1.9.2 @@ -164,6 +165,28 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + ${project.basedir}/src/test/resources + + ${project.basedir}/src/test/resources/payment.avsc + + true + private + + + + second + generate-sources + + schema + + + + diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java index 980d915..e24af0b 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java @@ -17,6 +17,7 @@ package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; import io.confluent.connect.avro.AvroData; +import io.confluent.connect.avro.AvroDataConfig; import io.confluent.kafka.serializers.NonRecordContainer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; @@ -31,6 +32,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; public class AvroFormatter implements RecordFormatter { @@ -38,7 +41,11 @@ public class AvroFormatter implements RecordFormatter { private final EncoderFactory encoderFactory; public AvroFormatter() { - avroData = new AvroData(10); + Map avroDataConfigMap = new HashMap() {{ + put(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true); + put(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 10); + }}; + avroData = new AvroData(new AvroDataConfig(avroDataConfigMap)); encoderFactory = EncoderFactory.get(); } diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java index 3c668fd..6db839c 100644 --- a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java @@ -3,6 +3,7 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; @@ -26,13 +27,16 @@ import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentSchema; import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class AvroFormatterTest { private final RecordFormatter avroRecordFormatter = new AvroFormatter(); private final DecoderFactory decoderFactory = DecoderFactory.get(); private final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(new MockSchemaRegistryClient()); + private final PlainAvroDeserializer plainAvroDeserializer = new PlainAvroDeserializer<>(Payment.class); private Schema schema; @@ -43,8 +47,8 @@ void setUp() throws IOException, URISyntaxException { } @Test - void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToJson() throws IOException { - Struct payment = paymentValue(1, true, "testSender"); + void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToAvro() throws IOException { + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); byte[] output = avroRecordFormatter.format(sinkRecord); @@ -53,6 +57,14 @@ void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToJson() thr assertEquals(1, record.get("id")); assertEquals(true, record.get("isCashPayment")); assertEquals(new Utf8("testSender"), record.get("sender")); + assertEquals(new GenericData.EnumSymbol(Currency.SCHEMA$, "EURO"), record.get("currency")); + + Payment specificOutput = plainAvroDeserializer.deserialize(null, output); + assertEquals(1, specificOutput.getId()); + assertTrue(specificOutput.getIsCashPayment()); + assertEquals("testSender", specificOutput.getSender().toString()); + assertNull(specificOutput.getComment()); + assertEquals(Currency.EURO, specificOutput.getCurrency()); } // The "avro" formatter is serializing data in NON-confluent avro, meaning the first bytes do not contain the schema id @@ -62,7 +74,7 @@ void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToJson() thr // after putting confluent avro serialized data on your topic @Test void validateExceptionIsThrown_whenTryingToDeserializeOutputWithKafkaAvroDeserializer() { - Struct payment = paymentValue(1, true, "testSender"); + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); byte[] output = avroRecordFormatter.format(sinkRecord); diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java index f02dcc8..d397f83 100644 --- a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java @@ -31,7 +31,7 @@ void givenABytesSchemaAndValue_expectCorrectFormat() { // This is also the default behaviour when not specifying a formatter @Test void givenAStruct_whenFormattingWithBytesRecordFormatter_expectDataException() { - Struct payment = paymentValue(1, true, "testSender"); + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); SinkRecord record = createSinkRecord(TestData.paymentSchema(), payment); assertThrows(DataException.class, () -> bytesRecordFormatter.format(record)); diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java index ed647d1..d454a40 100644 --- a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java @@ -54,17 +54,18 @@ void givenAnIntSchemaAndValue_whenFormattingWithJsonRecordFormatter_expectQuoted @Test void givenAStruct_whenFormattingWithJsonRecordFormatter_expectStructToJson() throws IOException { - Struct payment = paymentValue(1, true, "testSender"); + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); SinkRecord sinkRecord = createSinkRecord(TestData.paymentSchema(), payment); byte[] output = jsonRecordFormatter.format(sinkRecord); Map map = objectMapper.readValue(output, Map.class); - assertEquals(4, map.size()); + assertEquals(5, map.size()); assertEquals(1, map.get("id")); assertEquals(true, map.get("isCashPayment")); assertEquals("testSender", map.get("sender")); assertNull(map.get("comment")); + assertEquals("EURO", map.get("currency")); } @Test diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java new file mode 100644 index 0000000..86254b6 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java @@ -0,0 +1,55 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.avro.Schema; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +public class PlainAvroDeserializer implements Deserializer { + + static { + SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + SpecificData.get().addLogicalTypeConversion(new TimeConversions.DateConversion()); + + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + GenericData.get().addLogicalTypeConversion(new TimeConversions.DateConversion()); + } + + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final DatumReader datumReader; + + public PlainAvroDeserializer(Class cls) { + this(SpecificData.get().getSchema(cls)); + } + + public PlainAvroDeserializer(Schema schema) { + datumReader = new SpecificDatumReader<>(schema); + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + if (data == null) { + return null; + } + + try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { + BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(stream, null); + return datumReader.read(null, binaryDecoder); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java index 7a0546b..472a268 100644 --- a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java @@ -13,19 +13,31 @@ public static Schema paymentSchema() { .doc("Payment schema used in unit tests") .field("id", SchemaBuilder.int32().build()) .field("isCashPayment", SchemaBuilder.bool().build()) + .field("currency", SchemaBuilder.string() + .parameter( + "io.confluent.connect.avro.Enum", + "com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.Currency") + .parameter( + "io.confluent.connect.avro.Enum.EURO", + "EURO") + .parameter( + "io.confluent.connect.avro.Enum.DOLLAR", + "DOLLAR") + .build()) .field("sender", SchemaBuilder.string().build()) .field("comment", SchemaBuilder.string().optional().build()) .build(); } - public static Struct paymentValue(int id, boolean isCashPayment, String sender) { - return paymentValue(id, isCashPayment, sender, null); + public static Struct paymentValue(int id, boolean isCashPayment, Currency currency, String sender) { + return paymentValue(id, isCashPayment, currency, sender, null); } - public static Struct paymentValue(int id, boolean isCashPayment, String sender, String comment) { + public static Struct paymentValue(int id, boolean isCashPayment, Currency currency, String sender, String comment) { return new Struct(paymentSchema()) .put("id", id) .put("isCashPayment", isCashPayment) + .put("currency", currency.toString()) .put("sender", sender) .put("comment", comment); } diff --git a/src/test/resources/payment.avsc b/src/test/resources/payment.avsc index adce295..b54c4c6 100644 --- a/src/test/resources/payment.avsc +++ b/src/test/resources/payment.avsc @@ -5,6 +5,7 @@ "fields": [ {"name": "id", "type": "int"}, {"name": "isCashPayment", "type": "boolean"}, + {"name": "currency", "type": { "type": "enum", "name": "Currency", "symbols" : ["EURO","DOLLAR"]}}, {"name": "sender", "type": "string"}, {"name": "comment", "type": ["null", "string"], "default": null} ] From fe8568cc4faf5f3e52513b36f4ba50913379b8cd Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Mon, 11 Jan 2021 14:33:30 +0100 Subject: [PATCH 07/12] Add support for enums -> make checkstyle happy --- .../rabbitmq/sink/format/AvroFormatter.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java index e24af0b..e84369e 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java @@ -1,12 +1,12 @@ /** * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,10 +41,10 @@ public class AvroFormatter implements RecordFormatter { private final EncoderFactory encoderFactory; public AvroFormatter() { - Map avroDataConfigMap = new HashMap() {{ - put(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true); - put(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 10); - }}; + Map avroDataConfigMap = new HashMap() { { + put(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true); + put(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 10); + } }; avroData = new AvroData(new AvroDataConfig(avroDataConfigMap)); encoderFactory = EncoderFactory.get(); } From 4e469f3b98d456712f48c9ec4d46d622b40c5808 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Tue, 8 Jun 2021 17:27:56 +0200 Subject: [PATCH 08/12] Add support for queue to topic mapping. This way you can have a one on one mapping for queue to topic. Before it was only possible to have a Many (queues) to One (topic) mapping in the RabbitMQSourceConnector: - Use Map to represent the queue and topic config where the old config is backwards compatible - add queue variable in ConnectConsumer. It is the only way to know from which queue it is consuming. Use a new consumer per topic --- .../rabbitmq/source/ConnectConsumer.java | 11 ++++- .../source/RabbitMQSourceConnector.java | 2 + .../source/RabbitMQSourceConnectorConfig.java | 40 +++++++++++++++---- .../rabbitmq/source/RabbitMQSourceTask.java | 32 ++++----------- .../source/data/SourceRecordBuilder.java | 9 ++++- 5 files changed, 58 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java index f07d0b3..1ad01b2 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java @@ -32,10 +32,14 @@ class ConnectConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(ConnectConsumer.class); private final SourceRecordConcurrentLinkedDeque records; private final SourceRecordBuilder sourceRecordBuilder; + private final String queue; - ConnectConsumer(SourceRecordConcurrentLinkedDeque records, RabbitMQSourceConnectorConfig config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + ConnectConsumer(SourceRecordConcurrentLinkedDeque records, + RabbitMQSourceConnectorConfig config, + String queue) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { this.records = records; this.sourceRecordBuilder = new SourceRecordBuilder(config); + this.queue = queue; } @Override @@ -67,7 +71,10 @@ public void handleRecoverOk(String s) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { log.trace("handleDelivery({})", consumerTag); - SourceRecord sourceRecord = this.sourceRecordBuilder.sourceRecord(consumerTag, envelope, basicProperties, bytes); + log.info(envelope.toString()); + log.info(basicProperties.toString()); + + SourceRecord sourceRecord = this.sourceRecordBuilder.sourceRecord(queue, consumerTag, envelope, basicProperties, bytes); this.records.add(sourceRecord); } } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java index 079546c..e2e6e7f 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java @@ -29,6 +29,7 @@ public class RabbitMQSourceConnector extends SourceConnector { private Map settings; + private RabbitMQSourceConnectorConfig config; @Override public String version() { @@ -38,6 +39,7 @@ public String version() { @Override public void start(Map settings) { this.settings = settings; + this.config = new RabbitMQSourceConnectorConfig(settings); } @Override diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java index 3a1a83b..e7f42cf 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java @@ -17,9 +17,15 @@ import com.github.themeetgroup.kafka.connect.rabbitmq.CommonRabbitMQConnectorConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.ConnectException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toMap; public class RabbitMQSourceConnectorConfig extends CommonRabbitMQConnectorConfig { @@ -42,28 +48,48 @@ public class RabbitMQSourceConnectorConfig extends CommonRabbitMQConnectorConfig public static final String MESSAGE_CONVERTER_CLASSNAME_DOC = "Converter to compose the Kafka message. Optional, defaults to " + "com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter"; - public final String kafkaTopic; - public final List queues; + public static final String QUEUE_TOPIC_MAPPING_CONF = "rabbitmq.queue.topic.mapping"; + public static final String QUEUE_TOPIC_MAPPING_DOC = "A comma separated list containing a mapping between a RabbitMQ queue and a Kafka topic. " + + "This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting. This setting is mutual exclusive of the 'rabbitmq.queue' and 'kafka.topic' combination. " + + "When both settings are present. This combination of the original config will be used. " + + "example of mapping config: 'queue1:topic1,queue2:topic2'"; + public final int prefetchCount; public final boolean prefetchGlobal; public final String messageConverter; + public final Map queueToTopicMap; public RabbitMQSourceConnectorConfig(Map settings) { super(config(), settings); - this.kafkaTopic = this.getString(TOPIC_CONF); - this.queues = this.getList(QUEUE_CONF); this.prefetchCount = this.getInt(PREFETCH_COUNT_CONF); this.prefetchGlobal = this.getBoolean(PREFETCH_GLOBAL_CONF); this.messageConverter = this.getString(MESSAGE_CONVERTER_CLASSNAME_CONF); + + String topic = this.getString(TOPIC_CONF); + List queues = this.getList(QUEUE_CONF); + List queueTopicMappingList = this.getList(QUEUE_TOPIC_MAPPING_CONF); + + if (!queues.isEmpty() && !topic.isEmpty()) { + queueToTopicMap = queues.stream() + .collect(Collectors.toMap(x -> x, x -> topic)); + } else if (!queueTopicMappingList.isEmpty()) { + queueToTopicMap = queueTopicMappingList.stream() + .map(x -> x.split(":")) + .collect(toMap(x -> x[0], x -> x[1])); + } else { + throw new ConnectException("No valid queue / topic configuration has been found. Either use the combination of " + + "" + TOPIC_CONF + " and " + QUEUE_CONF + " or use the " + QUEUE_TOPIC_MAPPING_CONF + " setting."); + } } public static ConfigDef config() { return CommonRabbitMQConnectorConfig.config() - .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(TOPIC_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, TOPIC_DOC) .define(PREFETCH_COUNT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, PREFETCH_COUNT_DOC) .define(PREFETCH_GLOBAL_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, PREFETCH_GLOBAL_DOC) - .define(QUEUE_CONF, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUEUE_DOC) - .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC); + .define(QUEUE_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_DOC) + .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC) + .define(QUEUE_TOPIC_MAPPING_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_TOPIC_MAPPING_DOC); } } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java index 5f07045..54967cc 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java @@ -49,43 +49,25 @@ public String version() { public void start(Map settings) { RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); this.records = new SourceRecordConcurrentLinkedDeque(); - ConnectConsumer consumer; - try { - consumer = new ConnectConsumer(this.records, config); - } catch (Exception e) { - throw new ConnectException(e); - } - ConnectionFactory connectionFactory = config.connectionFactory(); try { + ConnectionFactory connectionFactory = config.connectionFactory(); log.info("Opening connection to {}:{}/{} (SSL: {})", config.host, config.port, config.virtualHost, config.useSsl); this.connection = connectionFactory.newConnection(); - } catch (IOException | TimeoutException e) { - throw new ConnectException(e); - } - try { log.info("Creating Channel"); this.channel = this.connection.createChannel(); - log.info("Declaring queues"); - for (String queue : config.queues) { - this.channel.queueDeclare(queue, true, false, false, null); - } - } catch (IOException e) { - throw new ConnectException(e); - } - for (String queue : config.queues) { - try { - log.info("Starting consumer"); + for (String queue : config.queueToTopicMap.keySet()) { + log.info("Declaring queue {} & starting consumer for queue", queue); + ConnectConsumer consumer = new ConnectConsumer(this.records, config, queue); + this.channel.queueDeclare(queue, true, false, false, null); this.channel.basicConsume(queue, consumer); - log.info("Setting channel.basicQos({}, {});", config.prefetchCount, config.prefetchGlobal); this.channel.basicQos(config.prefetchCount, config.prefetchGlobal); - } catch (IOException ex) { - throw new ConnectException(ex); } + } catch (Exception e) { + throw new ConnectException(e); } - } @Override diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java index 09b2fe1..bed7ce1 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java @@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import java.lang.reflect.InvocationTargetException; +import java.util.Optional; public class SourceRecordBuilder { @@ -41,13 +43,16 @@ public SourceRecordBuilder(RabbitMQSourceConnectorConfig config) throws ClassNot (SourceMessageConverter) (Class.forName(messageConverterClassName).getConstructor().newInstance()); } - public SourceRecord sourceRecord(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { + public SourceRecord sourceRecord(String queue, String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { Object key = this.messageConverter.key(consumerTag, envelope, basicProperties, bytes); Schema keySchema = this.messageConverter.keySchema(); Object value = this.messageConverter.value(consumerTag, envelope, basicProperties, bytes); Schema valueSchema = this.messageConverter.valueSchema(); Headers headers = this.messageConverter.headers(consumerTag, envelope, basicProperties, bytes); - String topic = this.config.kafkaTopic; + + String topic = Optional + .ofNullable(config.queueToTopicMap.get(queue)) + .orElseThrow(() -> new ConnectException("There was no Kafka topic found for the consumed queue '" + queue + "'")); return new SourceRecord( ImmutableMap.of("routingKey", envelope.getRoutingKey()), From 6e3c11ad8fa49555b70f87b17e27ae7a65facb6e Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Tue, 8 Jun 2021 17:28:08 +0200 Subject: [PATCH 09/12] Disable checkstyle --- bin/debug.sh | 4 ++-- pom.xml | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/bin/debug.sh b/bin/debug.sh index fd66d79..e52c42c 100755 --- a/bin/debug.sh +++ b/bin/debug.sh @@ -21,5 +21,5 @@ export KAFKA_DEBUG='y' set -e -mvn clean package -connect-standalone config/connect-avro-docker.properties config/RabbitMQSinkConnector.properties \ No newline at end of file +mvn clean package -Dcheckstyle.skip +connect-standalone config/connect-avro-docker.properties config/RabbitMQSourceConnector.properties \ No newline at end of file diff --git a/pom.xml b/pom.xml index bac0686..1f282e4 100644 --- a/pom.xml +++ b/pom.xml @@ -187,6 +187,13 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + From c663df82319d2b42ccbd228918bced516aa5e9a3 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Mon, 14 Jun 2021 11:53:34 +0200 Subject: [PATCH 10/12] Update taskConfigs method to split queue / topic mapping between multiple tasks. When the setting "rabbitmq.queue.topic.mapping" is not present. The previous implemention is used where every task consumes all the queues. --- bin/{create-topic.sh => create-topics.sh} | 3 +- bin/read-topic-with-headers.sh | 2 +- config/RabbitMQSourceConnector.properties | 5 +- .../source/RabbitMQSourceConnector.java | 24 ++++++++- .../source/RabbitMQSourceConnectorConfig.java | 2 +- .../rabbitmq/source/RabbitMQSourceTask.java | 3 ++ .../RabbitMQSourceConnectorConfigTest.java | 53 +++++++++++++++++++ .../source/RabbitMQSourceConnectorTest.java | 50 +++++++++++++++++ 8 files changed, 136 insertions(+), 6 deletions(-) rename bin/{create-topic.sh => create-topics.sh} (81%) create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java create mode 100644 src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java diff --git a/bin/create-topic.sh b/bin/create-topics.sh similarity index 81% rename from bin/create-topic.sh rename to bin/create-topics.sh index 708b1ef..bed1168 100755 --- a/bin/create-topic.sh +++ b/bin/create-topics.sh @@ -15,4 +15,5 @@ # limitations under the License. # -kafka-topics --create --topic rabbitmq-test --bootstrap-server 127.0.0.1:9092 \ No newline at end of file +kafka-topics --create --topic topic1 --bootstrap-server 127.0.0.1:9092 +kafka-topics --create --topic topic2 --bootstrap-server 127.0.0.1:9092 \ No newline at end of file diff --git a/bin/read-topic-with-headers.sh b/bin/read-topic-with-headers.sh index 3abe8cd..fbc7402 100755 --- a/bin/read-topic-with-headers.sh +++ b/bin/read-topic-with-headers.sh @@ -15,7 +15,7 @@ # limitations under the License. # -kafkacat -b localhost:9092 -t rabbitmq.test -C \ +kafkacat -b localhost:9092 -t topic1 -C \ -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T diff --git a/config/RabbitMQSourceConnector.properties b/config/RabbitMQSourceConnector.properties index 39785b0..49a6baa 100644 --- a/config/RabbitMQSourceConnector.properties +++ b/config/RabbitMQSourceConnector.properties @@ -1,6 +1,7 @@ name=rabbitmq-source tasks.max=1 connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector -rabbitmq.queue=test -kafka.topic=rabbitmq-test +#rabbitmq.queue=test1,test2 +#kafka.topic=rabbitmq-test +rabbitmq.queue.topic.mapping=test1:topic1,test2:topic2 message.converter=com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter \ No newline at end of file diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java index e2e6e7f..46b730a 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java @@ -21,9 +21,16 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.util.ConnectorUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; @Description("Connector is used to read from a RabbitMQ Queue or Topic.") public class RabbitMQSourceConnector extends SourceConnector { @@ -49,7 +56,22 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); + String queueToTopicMapping = settings.get(QUEUE_TOPIC_MAPPING_CONF); + if (queueToTopicMapping == null || queueToTopicMapping.isEmpty()) { + return TaskConfigs.multiple(this.settings, maxTasks); + } + + List listQueueToTopicMapping = Arrays.stream(queueToTopicMapping.split(",")).collect(Collectors.toList()); + List> partitionedQueueToTopicMapping = ConnectorUtils.groupPartitions(listQueueToTopicMapping, maxTasks); + + List> connectorConfig = new ArrayList<>(); + for (List partition : partitionedQueueToTopicMapping) { + Map taskConfig = new HashMap<>(settings); + taskConfig.put(QUEUE_TOPIC_MAPPING_CONF, String.join(",", partition)); + connectorConfig.add(taskConfig); + } + + return connectorConfig; } @Override diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java index e7f42cf..e02e437 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java @@ -89,7 +89,7 @@ public static ConfigDef config() { .define(PREFETCH_COUNT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, PREFETCH_COUNT_DOC) .define(PREFETCH_GLOBAL_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, PREFETCH_GLOBAL_DOC) .define(QUEUE_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_DOC) - .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC) + .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC) .define(QUEUE_TOPIC_MAPPING_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_TOPIC_MAPPING_DOC); } } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java index 54967cc..86a3d45 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java @@ -20,6 +20,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; @@ -29,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java new file mode 100644 index 0000000..8a91da5 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java @@ -0,0 +1,53 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.source; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.TOPIC_CONF; +import static org.junit.jupiter.api.Assertions.*; + +class RabbitMQSourceConnectorConfigTest { + + @Test + void givenNoTopicAndQueueAndQueueToTopicConfig_whenCreatingConfig_expectConnectException() { + Map settings = new HashMap<>(); + + assertThrows(ConnectException.class, () -> new RabbitMQSourceConnectorConfig(settings)); + } + + @Test + void givenTopicAndQueuesCombination_whenCreatingConfig_expectEveryQueueMappedToSameTopic() { + Map settings = new HashMap<>(); + settings.put(TOPIC_CONF, "test_topic"); + settings.put(QUEUE_CONF, "queue1,queue2,queue3"); + + RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); + + Map queueToTopicMap = config.queueToTopicMap; + assertEquals(3, queueToTopicMap.size()); + assertEquals("test_topic", queueToTopicMap.get("queue1")); + assertEquals("test_topic", queueToTopicMap.get("queue2")); + assertEquals("test_topic", queueToTopicMap.get("queue2")); + } + + @Test + void givenOnlyTopicToQueueMapping_whenCreatingConfig_expectTopicCorrectlyMapped() { + Map settings = new HashMap<>(); + settings.put(QUEUE_TOPIC_MAPPING_CONF, "queue1:topic1,queue2:topic2,queue3:topic2"); + + RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); + + Map queueToTopicMap = config.queueToTopicMap; + assertEquals(3, queueToTopicMap.size()); + assertEquals("topic1", queueToTopicMap.get("queue1")); + assertEquals("topic2", queueToTopicMap.get("queue2")); + assertEquals("topic2", queueToTopicMap.get("queue3")); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java new file mode 100644 index 0000000..22da381 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java @@ -0,0 +1,50 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.TOPIC_CONF; +import static org.junit.jupiter.api.Assertions.*; + +class RabbitMQSourceConnectorTest { + + private RabbitMQSourceConnector connector; + + @BeforeEach + void setUp() { + connector = new RabbitMQSourceConnector(); + } + + @Test + void givenNormalTopicAndQueueConfig_whenCreatingTaskConfig_expectSameConfigForEveryTask() { + Map settings = new HashMap<>(); + settings.put(TOPIC_CONF, "test_topic"); + settings.put(QUEUE_CONF, "queue1,queue2,queue3"); + + connector.start(settings); + List> taskConfigs = connector.taskConfigs(2); + + assertEquals(2, taskConfigs.size()); + assertEquals(taskConfigs.get(0), taskConfigs.get(1)); + } + + @Test + void givenQueueToTopicMappingConfig_whenCreatingTaskConfig_expectConfigSplitUpPerTask() { + Map settings = new HashMap<>(); + settings.put(QUEUE_TOPIC_MAPPING_CONF, "queue1:topic1,queue2:topic2,queue3:topic2"); + + connector.start(settings); + List> taskConfigs = connector.taskConfigs(3); + + assertEquals(3, taskConfigs.size()); + assertEquals("queue1:topic1", taskConfigs.get(0).get(QUEUE_TOPIC_MAPPING_CONF)); + assertEquals("queue2:topic2", taskConfigs.get(1).get(QUEUE_TOPIC_MAPPING_CONF)); + assertEquals("queue3:topic2", taskConfigs.get(2).get(QUEUE_TOPIC_MAPPING_CONF)); + } +} \ No newline at end of file From c7bfd358345f1194c84bcef5d8af49a145e1157a Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Tue, 15 Jun 2021 09:33:25 +0200 Subject: [PATCH 11/12] Remove info logging in delivery --- .../kafka/connect/rabbitmq/source/ConnectConsumer.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java index 1ad01b2..0a504e9 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java @@ -71,9 +71,6 @@ public void handleRecoverOk(String s) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { log.trace("handleDelivery({})", consumerTag); - log.info(envelope.toString()); - log.info(basicProperties.toString()); - SourceRecord sourceRecord = this.sourceRecordBuilder.sourceRecord(queue, consumerTag, envelope, basicProperties, bytes); this.records.add(sourceRecord); } From 6da09ecff634be0a2158887dc3c0cf15d89ab159 Mon Sep 17 00:00:00 2001 From: Jelle De Vleminck Date: Tue, 15 Jun 2021 10:47:47 +0200 Subject: [PATCH 12/12] Update README.md --- README.md | 8 ++++++++ .../rabbitmq/source/RabbitMQSourceConnectorConfig.java | 7 +++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e62f7cf..2ba0d4a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,14 @@ Kafka topic to write the messages to. *Type:* List +##### `rabbitmq.queue.topic.mapping` +*Importance:* High + +*Type:* List + +A list containing a mapping between a RabbitMQ queue and a Kafka topic. + This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting. This allows to use a single connector instance to have a many-to-many mapping, instead of only a many queues to one topic mapping. + When both settings are present. The 'rabbitmq.queue' and 'kafka.topic' will be used. Example of mapping config: 'queue1:topic1,queue2:topic2' rabbitmq.queue ##### `rabbitmq.host` diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java index e02e437..7709c35 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java @@ -49,10 +49,9 @@ public class RabbitMQSourceConnectorConfig extends CommonRabbitMQConnectorConfig "com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter"; public static final String QUEUE_TOPIC_MAPPING_CONF = "rabbitmq.queue.topic.mapping"; - public static final String QUEUE_TOPIC_MAPPING_DOC = "A comma separated list containing a mapping between a RabbitMQ queue and a Kafka topic. " + - "This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting. This setting is mutual exclusive of the 'rabbitmq.queue' and 'kafka.topic' combination. " + - "When both settings are present. This combination of the original config will be used. " + - "example of mapping config: 'queue1:topic1,queue2:topic2'"; + public static final String QUEUE_TOPIC_MAPPING_DOC = "A list containing a mapping between a RabbitMQ queue and a Kafka topic.\n" + + " This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting.\n" + + " When both settings are present. The 'rabbitmq.queue' and 'kafka.topic' will be used. Example of mapping config: 'queue1:topic1,queue2:topic2'"; public final int prefetchCount; public final boolean prefetchGlobal;