diff --git a/.gitignore b/.gitignore index be2e354..d1f4d7d 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ hs_err_pid* target .okhttpcache +/.idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6079e9a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM maven:3-openjdk-8-slim AS BUILD_CONNECT_TRANSFORM_ARCHIVE_PLUGIN +WORKDIR /tmp +RUN apt-get update && apt-get install -y git +COPY . /tmp/kafka-connect-transform-archive +RUN cd kafka-connect-transform-archive && mvn package diff --git a/README.md b/README.md index a7c6b31..d9a27ba 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,18 @@ This transform works by copying the key, value, topic, and timestamp to new reco This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties +# Archive name=Connector1 connector.class=org.apache.kafka.some.SourceConnector tasks.max=1 transforms=tran transforms.tran.type=com.github.jcustenborder.kafka.connect.archive.Archive +# Unarchive +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.archive.UnArchive ``` ##### Distributed Example @@ -42,12 +49,20 @@ Write the following json to `connector.json`, configure all of the required valu post the configuration to one the distributed connect worker(s). ```json +// Archive { "name" : "Connector1", "connector.class" : "org.apache.kafka.some.SourceConnector", "transforms" : "tran", "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.archive.Archive" } +// UnArchive +{ + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.archive.UnArchive" +} ``` Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of diff --git a/pom.xml b/pom.xml index dfd961b..f460d36 100644 --- a/pom.xml +++ b/pom.xml @@ -56,12 +56,21 @@ github https://github.com/jcustenborder/kafka-connect-transform-archive/issues + + 3.0.0 + com.github.jcustenborder cef-parser [0.0.1.7,0.0.1.2000) + + org.apache.kafka + connect-api + ${connect.api.version} + provided + org.reflections reflections @@ -74,6 +83,12 @@ [0.2.33,0.2.1000) test + + org.apache.commons + commons-lang3 + 3.12.0 + compile + diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java index 2f3e831..dc8e902 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java @@ -24,6 +24,8 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.Transformation; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.HashMap; import java.util.Map; @@ -45,30 +47,47 @@ private R applyWithSchema(R r) { final Schema schema = SchemaBuilder.struct() .name("com.github.jcustenborder.kafka.connect.archive.Storage") .field("key", r.keySchema()) + .field("key_string", Schema.STRING_SCHEMA) + .field("timestamp_year", Schema.INT8_SCHEMA) + .field("timestamp_month", Schema.INT8_SCHEMA) + .field("timestamp_day", Schema.INT8_SCHEMA) + .field("partition", Schema.INT64_SCHEMA) .field("value", r.valueSchema()) .field("topic", Schema.STRING_SCHEMA) + .field("headers", Schema.STRING_SCHEMA) .field("timestamp", Schema.INT64_SCHEMA); + Calendar recordDate = new GregorianCalendar(); + recordDate.setTimeInMillis(r.timestamp()); Struct value = new Struct(schema) .put("key", r.key()) + .put("key_string", String.valueOf(r.key()).replaceAll("[^\\x00-\\x7F]", "")) + .put("timestamp_year", recordDate.get(Calendar.YEAR)) + .put("timestamp_month", recordDate.get(Calendar.MONTH)) + .put("timestamp_day", recordDate.get(Calendar.DAY_OF_MONTH)) + .put("partition", r.kafkaPartition()) .put("value", r.value()) .put("topic", r.topic()) .put("timestamp", r.timestamp()); - return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), schema, value, r.timestamp()); } - @SuppressWarnings("unchecked") private R applySchemaless(R r) { - final Map archiveValue = new HashMap<>(); - final Map value = (Map) r.value(); + Calendar recordDate = new GregorianCalendar(); + recordDate.setTimeInMillis(r.timestamp()); archiveValue.put("key", r.key()); - archiveValue.put("value", value); + archiveValue.put("key_string", String.valueOf(r.key()).replaceAll("[^\\x00-\\x7F]", "")); + archiveValue.put("timestamp_year", recordDate.get(Calendar.YEAR)); + archiveValue.put("timestamp_month", recordDate.get(Calendar.MONTH)); + archiveValue.put("timestamp_day", recordDate.get(Calendar.DAY_OF_MONTH)); + archiveValue.put("value", r.value()); archiveValue.put("topic", r.topic()); + archiveValue.put("partition", r.kafkaPartition()); archiveValue.put("timestamp", r.timestamp()); - return r.newRecord(r.topic(), r.kafkaPartition(), null, null, null, archiveValue, r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), null, archiveValue, r.timestamp()); } @Override @@ -85,4 +104,6 @@ public void close() { public void configure(Map map) { } + + } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/UnArchive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/UnArchive.java new file mode 100644 index 0000000..9c39671 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/UnArchive.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2022 Iosif Nicolae (iosif@bringes.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.jcustenborder.kafka.connect.archive; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; + +import static org.apache.commons.lang3.SerializationUtils.deserialize; + + +@Description("The UnArchive transformation is used to unarchive data from S3 into the original format.") +@DocumentationNote("This transform works by copying the key, value, topic, and timestamp to new record where this is all " + + "contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly unarchive " + + "the record.") +public class UnArchive> implements Transformation { + @Override + public R apply(R r) { + return applySchemaless(r); + } + @SuppressWarnings("unchecked") + private R applySchemaless(R r) { + final Map value = (Map) r.value(); + return r.newRecord( + r.topic(), + value.get("partition") != null ? Integer.parseInt(value.get("partition").toString()) : null, + null, + deserialize((byte[]) value.get("key")), + null, + value.get("value"), + Long.parseLong(value.get("timestamp").toString()) + ); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +}