Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<version>1.0.0</version>
</parent>
<artifactId>kafka-connect-transform-archive</artifactId>
<version>0.1.0-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>
<name>kafka-connect-transform-archive</name>
<url>https://github.com/jcustenborder/kafka-connect-transform-archive</url>
<inceptionYear>2017</inceptionYear>
Expand Down Expand Up @@ -74,6 +74,18 @@
<version>[0.2.33,0.2.1000)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@
"contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly archive " +
"the record.")
public class Archive<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String ARCHIVE_STORAGE_SCHEMA_NAMESPACE =
"com.github.jcustenborder.kafka.connect.archive.Storage";

public static Schema getStructSchema(Schema keySchema, Schema valueSchema) {
return SchemaBuilder.struct()
.name(ARCHIVE_STORAGE_SCHEMA_NAMESPACE)
.field("topic", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA)
.field("key", keySchema)
.field("value", valueSchema);
}

@Override
public R apply(R r) {
if (r.valueSchema() == null) {
Expand All @@ -42,17 +55,12 @@ public R apply(R r) {
}

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.archive.Storage")
.field("key", r.keySchema())
.field("value", r.valueSchema())
.field("topic", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA);
final Schema schema = getStructSchema(r.keySchema(), r.valueSchema());
Struct value = new Struct(schema)
.put("key", r.key())
.put("value", r.value())
.put("topic", r.topic())
.put("timestamp", r.timestamp());
.put("timestamp", r.timestamp())
.put("key", r.key())
.put("value", r.value());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* Copyright © 2018 Jordan Moore ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.archive;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;

public class ArchiveData {
private byte[] key;
private byte[] value;
private String topic;
private long timestamp = -1L;

private ArchiveData() {}

public ArchiveData(byte[] data) throws IOException {
if (data == null) {
return;
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais)) {
int offset = 0;

// TopicName: length + utf8 bytes
int topicBytesLen = dis.readInt();
offset += Integer.BYTES;
if (topicBytesLen > 0) {
this.topic = new String(Arrays.copyOfRange(data, offset, offset + topicBytesLen), StandardCharsets.UTF_8);
offset += dis.read(data, offset, topicBytesLen);
}

// Timestamp
this.timestamp = dis.readLong();
offset += Long.BYTES;

// key as byte[]
int keySize = dis.readInt();
offset += Integer.BYTES;
if (keySize > 0) {
this.key = Arrays.copyOfRange(data, offset, offset + keySize);
offset += dis.read(data, offset, keySize);
}

// value as byte[]
int valueSize = dis.readInt();
offset += Integer.BYTES;
if (valueSize > 0) {
this.value = Arrays.copyOfRange(data, offset, offset + valueSize);
offset += dis.read(data, offset, valueSize);
}
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ArchiveData that = (ArchiveData) o;
return Arrays.equals(key, that.key) &&
Arrays.equals(value, that.value) &&
Objects.equals(topic, that.topic) &&
Objects.equals(timestamp, that.timestamp);
}

@Override
public int hashCode() {
int result = Objects.hash(topic, timestamp);
result = 31 * result + Arrays.hashCode(key);
result = 31 * result + Arrays.hashCode(value);
return result;
}

public static class Builder {

private ArchiveData model;

private Builder() {}

public Builder(String topicName) {
this.model = new ArchiveData();
this.model.topic = topicName;
}

public Builder withKey(byte[] key) {
model.key = key;
return this;
}

public Builder withValue(byte[] value) {
model.value = value;
return this;
}

public Builder withTimestamp(long timestamp) {
model.timestamp = timestamp;
return this;
}

public ArchiveData build() {
if (model.topic == null) {
throw new RuntimeException("ArchiveData must have a topic name");
}
return model;
}
}

public byte[] getKey() {
return key;
}

public byte[] getValue() {
return value;
}

public String getTopic() {
return topic;
}

public Long getTimestamp() {
return timestamp;
}

@Override
public String toString() {
return "ArchiveData{" +
"topic=" + topic +
", timestamp=" + timestamp +
", key='" + Arrays.toString(key) + '\'' +
", value=" + Arrays.toString(value) +
'}';
}

public byte[] getBytes() throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {

// TopicName: int + utf8 string
byte[] topicBytes = null;
int topicByteLen = 0;
if (this.topic != null) {
topicBytes = this.topic.getBytes(StandardCharsets.UTF_8);
topicByteLen = topicBytes.length;
}
dos.writeInt(topicByteLen);
if (topicBytes != null) {
dos.write(topicBytes);
}

// Timestamp: long
dos.writeLong(this.timestamp);

// key as byte[]
dos.writeInt(this.key == null ? 0 : this.key.length);
if (this.key != null) {
dos.write(this.key);
}
// value as byte[]
dos.writeInt(this.value == null ? 0 : this.value.length);
if (this.value != null) {
dos.write(this.value);
}

dos.flush();
return baos.toByteArray();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright © 2018 Jordan Moore ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.archive;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;

import java.io.IOException;

@Description("The Archive transformation is used to help preserve all of the data for a message as binary when archived at its destination.")
@DocumentationNote("This transform works by copying the key, value, topic, and timestamp to new record where this is all " +
"contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly archive " +
"the record as it originated from Kafka")
public class BinaryArchive<R extends ConnectRecord<R>> extends Archive<R> {

public static Schema getBytesSchema() {
return SchemaBuilder.bytes().name(ARCHIVE_STORAGE_SCHEMA_NAMESPACE);
}

@Override
public R apply(R r) {
final Schema schema = getBytesSchema();
byte[] data;
try {
data = new ArchiveData.Builder(r.topic())
.withTimestamp(r.timestamp())
.withKey((byte[]) r.key())
.withValue((byte[]) r.value())
.build()
.getBytes();
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, data, r.timestamp());
} catch (IOException e) {
throw new ConnectException("Unable to transform record to byte[]", e);
}
}

}
Loading