Skip to content

Commit 7e876a0

Browse files
authored
Merge pull request #79 from kasparjarek/makeTombstone
Add MakeTombstone transform
2 parents a70f8c9 + ac18778 commit 7e876a0

File tree

3 files changed

+104
-0
lines changed

3 files changed

+104
-0
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ transforms.ConcatFields.delimiter="-"
129129
transforms.ConcatFields.field.replace.missing="*"
130130
```
131131

132+
### `MakeTombstone`
133+
134+
This transformation converts a record into a tombstone by setting its value and value schema to `null`.
135+
136+
It can be used together with predicates, for example, to create a tombstone event from a delete event produced by a source connector.
137+
138+
Here is an example of this transformation configuration:
139+
```properties
140+
transforms=MakeTombstone
141+
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
142+
```
143+
132144
## License
133145

134146
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.transforms.Transformation;
24+
25+
public class MakeTombstone<R extends ConnectRecord<R>> implements Transformation<R> {
26+
27+
@Override
28+
public R apply(final R record) {
29+
return record.newRecord(
30+
record.topic(),
31+
record.kafkaPartition(),
32+
record.keySchema(),
33+
record.key(),
34+
null,
35+
null,
36+
record.timestamp(),
37+
record.headers()
38+
);
39+
}
40+
41+
@Override
42+
public ConfigDef config() {
43+
return new ConfigDef();
44+
}
45+
46+
@Override
47+
public void close() {
48+
49+
}
50+
51+
@Override
52+
public void configure(final Map<String, ?> configs) {
53+
54+
}
55+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import org.apache.kafka.connect.data.Schema;
20+
import org.apache.kafka.connect.source.SourceRecord;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.junit.jupiter.api.Assertions.assertNull;
25+
26+
class MakeTombstoneTest {
27+
28+
@Test
29+
void shouldMakeTombstone() {
30+
final MakeTombstone<SourceRecord> makeTombstone = new MakeTombstone<>();
31+
final var record = new SourceRecord(null, null, "some_topic", Schema.STRING_SCHEMA, "dummy value");
32+
33+
final var actual = makeTombstone.apply(record);
34+
assertNull(actual.valueSchema(), "value schema should be null");
35+
assertNull(actual.value(), "value should be null");
36+
}
37+
}

0 commit comments

Comments
 (0)