Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9464f9c
Pass keys
iosifnicolae2 Aug 31, 2022
6205de2
Merge pull request #1 from iosifnicolae2/patch-1
iosifnicolae2 Sep 1, 2022
a32f80b
Update Archive.java
iosifnicolae2 Sep 1, 2022
1302e3d
Create UnArchive.java
iosifnicolae2 Sep 1, 2022
6f8701e
Merge pull request #2 from iosifnicolae2/iosifnicolae2-add-unarchive
iosifnicolae2 Sep 1, 2022
d27c12c
Merge pull request #3 from iosifnicolae2/iosifnicolae2-add-unarchive-1
iosifnicolae2 Sep 1, 2022
0815a10
Update README.md
iosifnicolae2 Sep 1, 2022
7f846fc
chore: Remove unused imports
iosifnicolae2 Sep 1, 2022
dc522f1
Update UnArchive.java
iosifnicolae2 Sep 1, 2022
b7499eb
chore: convert the values
Sep 1, 2022
c679fa0
chore: fix indentation
Sep 1, 2022
97929ba
chore: fix styling issues
Sep 1, 2022
705e7b0
feat: add sample Dockerfile
Sep 1, 2022
4b7a5f0
feat: parse json values if it's the case
Sep 1, 2022
27514a7
feat: backward compatibility
Sep 1, 2022
a091e6d
Update UnArchive.java
Sep 1, 2022
51d40e1
Update UnArchive.java
Sep 1, 2022
ff36b72
Update UnArchive.java
Sep 1, 2022
e61f5bc
feat: we will do the json decoding using another transformer
Sep 1, 2022
2e0ace0
Add partition
Sep 1, 2022
5ad147a
Update Archive.java
Sep 1, 2022
864b06e
Update Archive.java
Sep 1, 2022
7922c22
Up
Sep 1, 2022
c4f46e9
Update UnArchive.java
Sep 1, 2022
0704396
Update UnArchive.java
Sep 1, 2022
9fcc778
Update UnArchive.java
Sep 1, 2022
10aa03e
Update UnArchive.java
Sep 1, 2022
239b8aa
upgrade to connect-api 3.0.0
Sep 2, 2022
dd0c492
archive record headers
Sep 2, 2022
a9278b1
Revert "archive record headers"
Sep 2, 2022
1914893
Update Archive.java
Sep 5, 2022
3fe65e3
Use SerializationUtils
Sep 5, 2022
f31c3fe
add key_string value so we can partition by this field
Sep 5, 2022
9007776
Update Archive.java
Sep 5, 2022
b9923bd
Extract timestamp year, month and day
Sep 6, 2022
1b18fe7
Update Archive.java
Sep 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
hs_err_pid*
target
.okhttpcache
/.idea/
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM maven:3-openjdk-8-slim AS BUILD_CONNECT_TRANSFORM_ARCHIVE_PLUGIN
WORKDIR /tmp
RUN apt-get update && apt-get install -y git
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL3008: Pin versions in apt get install. Instead of apt-get install <package> use apt-get install <package>=<version>


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL3009: Delete the apt-get lists after installing something


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL3015: Avoid additional packages by specifying --no-install-recommends


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

COPY . /tmp/kafka-connect-transform-archive
RUN cd kafka-connect-transform-archive && mvn package
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL3003: Use WORKDIR to switch to a directory


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@ This transform works by copying the key, value, topic, and timestamp to new reco
This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers).

```properties
# Archive
name=Connector1
connector.class=org.apache.kafka.some.SourceConnector
tasks.max=1
transforms=tran
transforms.tran.type=com.github.jcustenborder.kafka.connect.archive.Archive
# Unarchive
name=Connector1
connector.class=org.apache.kafka.some.SourceConnector
tasks.max=1
transforms=tran
transforms.tran.type=com.github.jcustenborder.kafka.connect.archive.UnArchive
```

##### Distributed Example
Expand All @@ -42,12 +49,20 @@ Write the following json to `connector.json`, configure all of the required valu
post the configuration to one the distributed connect worker(s).

```json
// Archive
{
"name" : "Connector1",
"connector.class" : "org.apache.kafka.some.SourceConnector",
"transforms" : "tran",
"transforms.tran.type" : "com.github.jcustenborder.kafka.connect.archive.Archive"
}
// UnArchive
{
"name" : "Connector1",
"connector.class" : "org.apache.kafka.some.SourceConnector",
"transforms" : "tran",
"transforms.tran.type" : "com.github.jcustenborder.kafka.connect.archive.UnArchive"
}
```

Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of
Expand Down
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,21 @@
<system>github</system>
<url>https://github.com/jcustenborder/kafka-connect-transform-archive/issues</url>
</issueManagement>
<properties>
<connect.api.version>3.0.0</connect.api.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.jcustenborder</groupId>
<artifactId>cef-parser</artifactId>
<version>[0.0.1.7,0.0.1.2000)</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${connect.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
Expand All @@ -74,6 +83,12 @@
<version>[0.2.33,0.2.1000)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -45,30 +47,47 @@ private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.archive.Storage")
.field("key", r.keySchema())
.field("key_string", Schema.STRING_SCHEMA)
.field("timestamp_year", Schema.INT8_SCHEMA)
.field("timestamp_month", Schema.INT8_SCHEMA)
.field("timestamp_day", Schema.INT8_SCHEMA)
.field("partition", Schema.INT64_SCHEMA)
.field("value", r.valueSchema())
.field("topic", Schema.STRING_SCHEMA)
.field("headers", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA);
Calendar recordDate = new GregorianCalendar();
recordDate.setTimeInMillis(r.timestamp());
Struct value = new Struct(schema)
.put("key", r.key())
.put("key_string", String.valueOf(r.key()).replaceAll("[^\\x00-\\x7F]", ""))
.put("timestamp_year", recordDate.get(Calendar.YEAR))
.put("timestamp_month", recordDate.get(Calendar.MONTH))
.put("timestamp_day", recordDate.get(Calendar.DAY_OF_MONTH))
.put("partition", r.kafkaPartition())
.put("value", r.value())
.put("topic", r.topic())
.put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), schema, value, r.timestamp());
}

@SuppressWarnings("unchecked")
private R applySchemaless(R r) {

final Map<String, Object> archiveValue = new HashMap<>();

final Map<String, Object> value = (Map<String, Object>) r.value();
Calendar recordDate = new GregorianCalendar();
recordDate.setTimeInMillis(r.timestamp());

archiveValue.put("key", r.key());
archiveValue.put("value", value);
archiveValue.put("key_string", String.valueOf(r.key()).replaceAll("[^\\x00-\\x7F]", ""));
archiveValue.put("timestamp_year", recordDate.get(Calendar.YEAR));
archiveValue.put("timestamp_month", recordDate.get(Calendar.MONTH));
archiveValue.put("timestamp_day", recordDate.get(Calendar.DAY_OF_MONTH));
archiveValue.put("value", r.value());
archiveValue.put("topic", r.topic());
archiveValue.put("partition", r.kafkaPartition());
archiveValue.put("timestamp", r.timestamp());

return r.newRecord(r.topic(), r.kafkaPartition(), null, null, null, archiveValue, r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), null, archiveValue, r.timestamp());
}

@Override
Expand All @@ -85,4 +104,6 @@ public void close() {
public void configure(Map<String, ?> map) {

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright © 2022 Iosif Nicolae ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.jcustenborder.kafka.connect.archive;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

import static org.apache.commons.lang3.SerializationUtils.deserialize;


@Description("The UnArchive transformation is used to unarchive data from S3 into the original format.")
@DocumentationNote("This transform works by copying the key, value, topic, and timestamp to new record where this is all " +
"contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly unarchive " +
"the record.")
public class UnArchive<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R r) {
return applySchemaless(r);
}
@SuppressWarnings("unchecked")
private R applySchemaless(R r) {
final Map<String, Object> value = (Map<String, Object>) r.value();
return r.newRecord(
r.topic(),
value.get("partition") != null ? Integer.parseInt(value.get("partition").toString()) : null,
null,
deserialize((byte[]) value.get("key")),
null,
value.get("value"),
Long.parseLong(value.get("timestamp").toString())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NULL_DEREFERENCE: object returned by value.get("timestamp") could be null and is dereferenced at line 46.


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

);
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}