diff --git a/pom.xml b/pom.xml
index dfd961b..8e3ff79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
1.0.0
kafka-connect-transform-archive
- 0.1.0-SNAPSHOT
+ 0.2.0-SNAPSHOT
kafka-connect-transform-archive
https://github.com/jcustenborder/kafka-connect-transform-archive
2017
@@ -74,6 +74,18 @@
[0.2.33,0.2.1000)
test
+
+ org.apache.kafka
+ connect-runtime
+ ${kafka.version}
+ provided
+
+
+ junit
+ junit
+ 4.12
+ test
+
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java
index 2f3e831..5e71da6 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java
@@ -32,6 +32,19 @@
"contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly archive " +
"the record.")
public class Archive> implements Transformation {
+
+ public static final String ARCHIVE_STORAGE_SCHEMA_NAMESPACE =
+ "com.github.jcustenborder.kafka.connect.archive.Storage";
+
+ public static Schema getStructSchema(Schema keySchema, Schema valueSchema) {
+ return SchemaBuilder.struct()
+ .name(ARCHIVE_STORAGE_SCHEMA_NAMESPACE)
+ .field("topic", Schema.STRING_SCHEMA)
+ .field("timestamp", Schema.INT64_SCHEMA)
+ .field("key", keySchema)
+ .field("value", valueSchema);
+ }
+
@Override
public R apply(R r) {
if (r.valueSchema() == null) {
@@ -42,17 +55,12 @@ public R apply(R r) {
}
private R applyWithSchema(R r) {
- final Schema schema = SchemaBuilder.struct()
- .name("com.github.jcustenborder.kafka.connect.archive.Storage")
- .field("key", r.keySchema())
- .field("value", r.valueSchema())
- .field("topic", Schema.STRING_SCHEMA)
- .field("timestamp", Schema.INT64_SCHEMA);
+ final Schema schema = getStructSchema(r.keySchema(), r.valueSchema());
Struct value = new Struct(schema)
- .put("key", r.key())
- .put("value", r.value())
.put("topic", r.topic())
- .put("timestamp", r.timestamp());
+ .put("timestamp", r.timestamp())
+ .put("key", r.key())
+ .put("value", r.value());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());
}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/ArchiveData.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/ArchiveData.java
new file mode 100644
index 0000000..95fb1ae
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/ArchiveData.java
@@ -0,0 +1,186 @@
+/**
+ * Copyright © 2018 Jordan Moore (moore.jordan@outlook.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.archive;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class ArchiveData {
+ private byte[] key;
+ private byte[] value;
+ private String topic;
+ private long timestamp = -1L;
+
+ private ArchiveData() {}
+
+ public ArchiveData(byte[] data) throws IOException {
+ if (data == null) {
+ return;
+ }
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais)) {
+ int offset = 0;
+
+ // TopicName: length + utf8 bytes
+ int topicBytesLen = dis.readInt();
+ offset += Integer.BYTES;
+ if (topicBytesLen > 0) {
+ this.topic = new String(Arrays.copyOfRange(data, offset, offset + topicBytesLen), StandardCharsets.UTF_8);
+ offset += dis.read(data, offset, topicBytesLen);
+ }
+
+ // Timestamp
+ this.timestamp = dis.readLong();
+ offset += Long.BYTES;
+
+ // key as byte[]
+ int keySize = dis.readInt();
+ offset += Integer.BYTES;
+ if (keySize > 0) {
+ this.key = Arrays.copyOfRange(data, offset, offset + keySize);
+ offset += dis.read(data, offset, keySize);
+ }
+
+ // value as byte[]
+ int valueSize = dis.readInt();
+ offset += Integer.BYTES;
+ if (valueSize > 0) {
+ this.value = Arrays.copyOfRange(data, offset, offset + valueSize);
+ offset += dis.read(data, offset, valueSize);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ArchiveData that = (ArchiveData) o;
+ return Arrays.equals(key, that.key) &&
+ Arrays.equals(value, that.value) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(topic, timestamp);
+ result = 31 * result + Arrays.hashCode(key);
+ result = 31 * result + Arrays.hashCode(value);
+ return result;
+ }
+
+ public static class Builder {
+
+ private ArchiveData model;
+
+ private Builder() {}
+
+ public Builder(String topicName) {
+ this.model = new ArchiveData();
+ this.model.topic = topicName;
+ }
+
+ public Builder withKey(byte[] key) {
+ model.key = key;
+ return this;
+ }
+
+ public Builder withValue(byte[] value) {
+ model.value = value;
+ return this;
+ }
+
+ public Builder withTimestamp(long timestamp) {
+ model.timestamp = timestamp;
+ return this;
+ }
+
+ public ArchiveData build() {
+ if (model.topic == null) {
+ throw new RuntimeException("ArchiveData must have a topic name");
+ }
+ return model;
+ }
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "ArchiveData{" +
+ "topic=" + topic +
+ ", timestamp=" + timestamp +
+ ", key='" + Arrays.toString(key) + '\'' +
+ ", value=" + Arrays.toString(value) +
+ '}';
+ }
+
+ public byte[] getBytes() throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos)) {
+
+ // TopicName: int + utf8 string
+ byte[] topicBytes = null;
+ int topicByteLen = 0;
+ if (this.topic != null) {
+ topicBytes = this.topic.getBytes(StandardCharsets.UTF_8);
+ topicByteLen = topicBytes.length;
+ }
+ dos.writeInt(topicByteLen);
+ if (topicBytes != null) {
+ dos.write(topicBytes);
+ }
+
+ // Timestamp: long
+ dos.writeLong(this.timestamp);
+
+ // key as byte[]
+ dos.writeInt(this.key == null ? 0 : this.key.length);
+ if (this.key != null) {
+ dos.write(this.key);
+ }
+ // value as byte[]
+ dos.writeInt(this.value == null ? 0 : this.value.length);
+ if (this.value != null) {
+ dos.write(this.value);
+ }
+
+ dos.flush();
+ return baos.toByteArray();
+ }
+ }
+}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/BinaryArchive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/BinaryArchive.java
new file mode 100644
index 0000000..c154e6b
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/BinaryArchive.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright © 2018 Jordan Moore (moore.jordan@outlook.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.archive;
+
+import com.github.jcustenborder.kafka.connect.utils.config.Description;
+import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.io.IOException;
+
+@Description("The Archive transformation is used to help preserve all of the data for a message as binary when archived at its destination.")
+@DocumentationNote("This transform works by copying the key, value, topic, and timestamp to new record where this is all " +
+ "contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly archive " +
+ "the record as it originated from Kafka")
+public class BinaryArchive> extends Archive {
+
+ public static Schema getBytesSchema() {
+ return SchemaBuilder.bytes().name(ARCHIVE_STORAGE_SCHEMA_NAMESPACE);
+ }
+
+ @Override
+ public R apply(R r) {
+ final Schema schema = getBytesSchema();
+ byte[] data;
+ try {
+ data = new ArchiveData.Builder(r.topic())
+ .withTimestamp(r.timestamp())
+ .withKey((byte[]) r.key())
+ .withValue((byte[]) r.value())
+ .build()
+ .getBytes();
+ return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, data, r.timestamp());
+ } catch (IOException e) {
+ throw new ConnectException("Unable to transform record to byte[]", e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/converters/ArchiveByteArrayConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/converters/ArchiveByteArrayConverter.java
new file mode 100644
index 0000000..0c7d9c3
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/converters/ArchiveByteArrayConverter.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright © 2018 Jordan Moore (moore.jordan@outlook.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.archive.converters;
+
+import com.github.jcustenborder.kafka.connect.archive.Archive;
+import com.github.jcustenborder.kafka.connect.archive.ArchiveData;
+import com.github.jcustenborder.kafka.connect.archive.BinaryArchive;
+import com.github.jcustenborder.kafka.connect.archive.serialization.ArchiveDeserializer;
+import com.github.jcustenborder.kafka.connect.archive.serialization.ArchiveSerializer;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.converters.ByteArrayConverter;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class ArchiveByteArrayConverter extends ByteArrayConverter {
+
+ private final ArchiveSerializer serializer = new ArchiveSerializer();
+ private final ArchiveDeserializer deserializer = new ArchiveDeserializer();
+
+ private String storageFormat;
+
+ private static final List FORMATS = Arrays.asList(
+ "struct", "binary"
+ );
+
+ public ArchiveByteArrayConverter() {
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ super.configure(configs, isKey);
+
+ String format = String.valueOf(configs.get("archive.format"));
+ if (FORMATS.contains(format)) {
+ this.storageFormat = format;
+ } else {
+ throw new ConnectException("Invalid archive.format: " + format);
+ }
+
+ this.serializer.configure(configs, isKey);
+ this.deserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ if (schema == null) {
+ throw new DataException("Schema is not defined. Must be STRUCT or BYTES");
+ }
+ if (!schema.name().equals(Archive.ARCHIVE_STORAGE_SCHEMA_NAMESPACE)) {
+ throw new DataException(String.format(
+ "Invalid schema namespace for %s: %s",
+ ArchiveByteArrayConverter.class.getSimpleName(),
+ schema.name()));
+ }
+
+ try {
+ return serializer.serialize(topic, value == null ? null : (ArchiveData) value);
+ } catch (SerializationException e) {
+ throw new DataException("Failed to serialize to an ArchiveData: ", e);
+ }
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ Schema schema = this.storageFormat.equals("struct") ?
+ Archive.getStructSchema(Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA)
+ : BinaryArchive.getBytesSchema();
+ try {
+ ArchiveData connectValue = deserializer.deserialize(topic, value);
+ return new SchemaAndValue(schema, connectValue);
+ } catch (SerializationException e) {
+ throw new DataException("Failed to deserialize ArchiveData: ", e);
+ }
+ }
+}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveDeserializer.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveDeserializer.java
new file mode 100644
index 0000000..c63d0c9
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveDeserializer.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright © 2018 Jordan Moore (moore.jordan@outlook.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.archive.serialization;
+
+import com.github.jcustenborder.kafka.connect.archive.ArchiveData;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ArchiveDeserializer implements Deserializer {
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public ArchiveData deserialize(String topic, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+ try {
+ return new ArchiveData(data);
+ } catch (IOException e) {
+ throw new SerializationException("Error when deserializing byte[] to ArchiveData", e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveSerializer.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveSerializer.java
new file mode 100644
index 0000000..8fc4c80
--- /dev/null
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/serialization/ArchiveSerializer.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2018 Jordan Moore (moore.jordan@outlook.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.archive.serialization;
+
+import com.github.jcustenborder.kafka.connect.archive.ArchiveData;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ArchiveSerializer implements Serializer {
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] serialize(String topic, ArchiveData data) {
+ try {
+ return data == null ? null : data.getBytes();
+ } catch (IOException e) {
+ throw new SerializationException("Unable to serialize: " + data.toString());
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/archive/serialization/SerdeTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/archive/serialization/SerdeTest.java
new file mode 100644
index 0000000..fb8d0ab
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/archive/serialization/SerdeTest.java
@@ -0,0 +1,140 @@
+package com.github.jcustenborder.kafka.connect.archive.serialization;
+
+import com.github.jcustenborder.kafka.connect.archive.Archive;
+import com.github.jcustenborder.kafka.connect.archive.ArchiveData;
+import com.github.jcustenborder.kafka.connect.archive.BinaryArchive;
+import com.github.jcustenborder.kafka.connect.archive.converters.ArchiveByteArrayConverter;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class SerdeTest {
+
+ private final long timestamp = 1528243200L;
+ private final String topic = "archival";
+ private final String key = "some-key hello world";
+ private final String value = "some-value";
+
+ private ArchiveSerializer serializer;
+ private ArchiveDeserializer deserializer;
+
+ @BeforeEach
+ void setUp() {
+ serializer = new ArchiveSerializer();
+ deserializer = new ArchiveDeserializer();
+ }
+
+ @Test
+ void testSerde() {
+
+ ArchiveData a = new ArchiveData.Builder(topic)
+ .withTimestamp(timestamp)
+ .withKey(Utils.utf8(key))
+ .withValue(Utils.utf8(value))
+ .build();
+
+ byte[] data = serializer.serialize(topic, a);
+
+ ArchiveData a2 = deserializer.deserialize(topic, data);
+
+ Assert.assertThat(a, is(a2));
+
+ }
+
+ @Test
+ void testSerdeNullKey() {
+
+ ArchiveData a = new ArchiveData.Builder(topic)
+ .withTimestamp(timestamp)
+ .withValue(Utils.utf8(value))
+ .build();
+
+ byte[] data = serializer.serialize(topic, a);
+
+ ArchiveData a2 = deserializer.deserialize(topic, data);
+
+ Assert.assertThat(a, is(a2));
+
+ }
+
+ @Test
+ void testSerdeNullValue() {
+
+ ArchiveData a = new ArchiveData.Builder(topic)
+ .withTimestamp(timestamp)
+ .withKey(Utils.utf8(key))
+ .build();
+
+ byte[] data = serializer.serialize(topic, a);
+
+ ArchiveData a2 = deserializer.deserialize(topic, data);
+
+ Assert.assertThat(a, is(a2));
+
+ }
+
+ @Test
+ void testSerdeNullKeyNullValue() {
+
+ ArchiveData a = new ArchiveData.Builder(topic)
+ .withTimestamp(timestamp)
+ .build();
+
+ byte[] data = serializer.serialize(topic, a);
+
+ ArchiveData a2 = deserializer.deserialize(topic, data);
+
+ Assert.assertThat(a, is(a2));
+
+ }
+
+ @Test
+ void testSerdeDefaults() {
+
+ ArchiveData a = new ArchiveData.Builder(topic).build();
+
+ byte[] data = serializer.serialize(topic, a);
+
+ ArchiveData a2 = deserializer.deserialize(topic, data);
+
+ Assert.assertThat(a, is(a2));
+ }
+
+ @Test
+ void testConverter() {
+ ArchiveByteArrayConverter c = new ArchiveByteArrayConverter();
+ Map conf = new HashMap<>();
+
+ ArchiveData a = new ArchiveData.Builder(topic)
+ .withTimestamp(timestamp)
+ .withKey(Utils.utf8(key))
+ .withValue(Utils.utf8(value))
+ .build();
+
+ conf.put("archive.format", "struct");
+ c.configure(conf, false);
+ Schema s = Archive.getStructSchema(Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA);
+ byte[] data = c.fromConnectData(topic, s, a);
+ SchemaAndValue sv = c.toConnectData(topic, data);
+ Assert.assertThat(s.name(), is(Archive.ARCHIVE_STORAGE_SCHEMA_NAMESPACE));
+ Assert.assertThat(a, is(sv.value()));
+
+ conf.put("archive.format", "binary");
+ c.configure(conf, false);
+ s = BinaryArchive.getBytesSchema();
+ data = c.fromConnectData(topic, s, a);
+ sv = c.toConnectData(topic, data);
+ Assert.assertThat(s.name(), is(Archive.ARCHIVE_STORAGE_SCHEMA_NAMESPACE));
+ Assert.assertThat(a, is(sv.value()));
+
+ }
+}