Skip to content

Commit 9e819a7

Browse files
authored
Merge pull request #8 from aiven/tombstone-handler
Add TombstoneHandler
2 parents 6ed9b84 + 5033c8f commit 9e819a7

File tree

7 files changed

+377
-3
lines changed

7 files changed

+377
-3
lines changed

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,32 @@ The transformation defines the following configurations:
4646
- `field.name` - The name of the field which should be used as the topic name. If `null` or empty, the entire key or value is used (and assumed to be a string). By default is `null`.
4747
- `skip.missing.or.null` - In case the source of the new topic name is `null` or missing, should a record be silently passed without transformation. By default is `false`.
4848

49-
Here's an example of this transformation configuration:
49+
Here is an example of this transformation configuration:
5050

5151
```properties
5252
transforms=ExtractTopicFromValueField
5353
transforms.ExtractTopicFromValueField.type=io.aiven.kafka.connect.transforms.ExtractTopic$Value
5454
transforms.ExtractTopicFromValueField.field.name=inner_field_name
5555
```
5656

57+
### `TombstoneHandler`
58+
59+
This transformation manages tombstone records,
60+
i.e. records with the entire value field being `null`.
61+
62+
The transformation defines the following configurations:
63+
- `behavior` - The action the transformation must perform when encounter a tombstone record. The supported values are:
64+
- `drop_silent` - silently drop tombstone records.
65+
- `drop_warn` - drop tombstone records and log at `WARN` level.
66+
- `fail` - fail with `DataException`.
67+
68+
Here is an example of this transformation configuration:
69+
```properties
70+
transforms=TombstoneHandler
71+
transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHandler
72+
transforms.TombstoneHandler.behavior=drop_silent
73+
```
74+
5775
## License
5876

5977
This project is licensed under the [Apache License, Version 2.0](LICENSE).

config/checkstyle/java.header

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/\*
2-
\* Copyright 2019 Aiven Oy
2+
\* Copyright 20(19|2[0-9]) Aiven Oy
33
\*
44
\* Licensed under the Apache License, Version 2.0 \(the "License"\);
55
\* you may not use this file except in compliance with the License.

src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public Class<? extends Task> taskClass() {
5353

5454
@Override
5555
public List<Map<String, String>> taskConfigs(final int maxTasks) {
56-
return Collections.singletonList(Collections.EMPTY_MAP);
56+
return Collections.singletonList(Collections.emptyMap());
5757
}
5858

5959
@Override
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.errors.DataException;
24+
import org.apache.kafka.connect.transforms.Transformation;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class TombstoneHandler<R extends ConnectRecord<R>> implements Transformation<R> {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(TombstoneHandler.class);
32+
33+
private TombstoneHandlerConfig tombstoneHandlerConfig;
34+
35+
@Override
36+
public ConfigDef config() {
37+
return TombstoneHandlerConfig.config();
38+
}
39+
40+
@Override
41+
public void configure(final Map<String, ?> configs) {
42+
this.tombstoneHandlerConfig = new TombstoneHandlerConfig(configs);
43+
}
44+
45+
@Override
46+
public R apply(final R record) {
47+
if (record.value() == null) {
48+
behaveOnNull(record);
49+
return null;
50+
} else {
51+
return record;
52+
}
53+
}
54+
55+
private void behaveOnNull(final ConnectRecord<R> r) {
56+
57+
switch (tombstoneHandlerConfig.getBehavior()) {
58+
case FAIL:
59+
throw new DataException(
60+
String.format(
61+
"Tombstone record encountered, failing due to configured '%s' behavior",
62+
TombstoneHandlerConfig.Behavior.FAIL.name().toLowerCase()
63+
)
64+
);
65+
case DROP_SILENT:
66+
LOGGER.debug(
67+
"Tombstone record encountered, dropping due to configured '{}' behavior",
68+
TombstoneHandlerConfig.Behavior.DROP_SILENT.name().toLowerCase()
69+
);
70+
break;
71+
case DROP_WARN:
72+
LOGGER.debug(
73+
"Tombstone record encountered, dropping due to configured '{}' behavior",
74+
TombstoneHandlerConfig.Behavior.DROP_WARN.name().toLowerCase()
75+
);
76+
break;
77+
default:
78+
throw new DataException(
79+
String.format("Unknown behavior: %s", tombstoneHandlerConfig.getBehavior())
80+
);
81+
}
82+
}
83+
84+
@Override
85+
public void close() {
86+
}
87+
88+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import java.util.stream.Collectors;
24+
25+
import org.apache.kafka.common.config.AbstractConfig;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.apache.kafka.common.config.ConfigException;
28+
29+
public final class TombstoneHandlerConfig extends AbstractConfig {
30+
31+
public static final String TOMBSTONE_HANDLER_BEHAVIOR = "behavior";
32+
33+
public TombstoneHandlerConfig(final Map<?, ?> originals) {
34+
super(config(), originals);
35+
}
36+
37+
static ConfigDef config() {
38+
return new ConfigDef()
39+
.define(
40+
TOMBSTONE_HANDLER_BEHAVIOR,
41+
ConfigDef.Type.STRING,
42+
ConfigDef.NO_DEFAULT_VALUE,
43+
new ConfigDef.Validator() {
44+
@Override
45+
public void ensureValid(final String name, final Object value) {
46+
assert value instanceof String;
47+
48+
final String strValue = (String) value;
49+
50+
if (Objects.isNull(strValue) || strValue.isEmpty()) {
51+
throw new ConfigException(
52+
TOMBSTONE_HANDLER_BEHAVIOR,
53+
value,
54+
"String must be non-empty");
55+
}
56+
57+
try {
58+
Behavior.of(strValue);
59+
} catch (final IllegalArgumentException e) {
60+
throw new ConfigException(
61+
TOMBSTONE_HANDLER_BEHAVIOR,
62+
value,
63+
e.getMessage());
64+
}
65+
}
66+
},
67+
ConfigDef.Importance.MEDIUM,
68+
String.format(
69+
"Specifies the behavior on encountering tombstone messages. Possible values are: %s",
70+
Behavior.BEHAVIOR_NAMES
71+
)
72+
);
73+
}
74+
75+
public Behavior getBehavior() {
76+
return Behavior.of(getString(TOMBSTONE_HANDLER_BEHAVIOR));
77+
}
78+
79+
public enum Behavior {
80+
81+
DROP_SILENT,
82+
DROP_WARN,
83+
FAIL;
84+
85+
private static final List<String> BEHAVIOR_NAMES =
86+
Arrays.stream(Behavior.values())
87+
.map(b -> b.name().toLowerCase())
88+
.collect(Collectors.toList());
89+
90+
public static Behavior of(final String v) {
91+
92+
for (final Behavior b : Behavior.values()) {
93+
if (b.name().equalsIgnoreCase(v)) {
94+
return b;
95+
}
96+
}
97+
throw new IllegalArgumentException(
98+
String.format(
99+
"Unsupported behavior name: %s. Supported are: %s", v,
100+
String.join(",", BEHAVIOR_NAMES))
101+
);
102+
103+
}
104+
105+
}
106+
107+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.common.config.ConfigException;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
29+
final class TombstoneHandlerConfigTest {
30+
31+
@Test
32+
final void failOnUnknownBehaviorName() {
33+
final Throwable t =
34+
assertThrows(
35+
ConfigException.class,
36+
() -> new TombstoneHandlerConfig(newBehaviorProps("asdasdsadas"))
37+
);
38+
assertEquals(
39+
"Invalid value asdasdsadas for configuration behavior: "
40+
+ "Unsupported behavior name: asdasdsadas. Supported are: drop_silent,drop_warn,fail",
41+
t.getMessage()
42+
);
43+
}
44+
45+
@Test
46+
final void acceptCorrectBehaviorNames() {
47+
48+
TombstoneHandlerConfig c =
49+
new TombstoneHandlerConfig(
50+
newBehaviorProps(
51+
TombstoneHandlerConfig.Behavior.DROP_SILENT.name()
52+
)
53+
);
54+
assertEquals(TombstoneHandlerConfig.Behavior.DROP_SILENT, c.getBehavior());
55+
56+
c =
57+
new TombstoneHandlerConfig(
58+
newBehaviorProps(
59+
TombstoneHandlerConfig.Behavior.FAIL.name().toLowerCase()
60+
)
61+
);
62+
assertEquals(TombstoneHandlerConfig.Behavior.FAIL, c.getBehavior());
63+
64+
c =
65+
new TombstoneHandlerConfig(
66+
newBehaviorProps(
67+
"Drop_WArn"
68+
)
69+
);
70+
assertEquals(TombstoneHandlerConfig.Behavior.DROP_WARN, c.getBehavior());
71+
}
72+
73+
@Test
74+
final void failOnEmptyBehaviorName() {
75+
final Throwable t = assertThrows(
76+
ConfigException.class,
77+
() -> new TombstoneHandlerConfig(newBehaviorProps(""))
78+
);
79+
assertEquals("Invalid value for configuration behavior: String must be non-empty", t.getMessage());
80+
}
81+
82+
private Map<String, String> newBehaviorProps(final String bv) {
83+
return ImmutableMap.of(TombstoneHandlerConfig.TOMBSTONE_HANDLER_BEHAVIOR, bv);
84+
}
85+
86+
}

0 commit comments

Comments
 (0)