Skip to content

Commit 17090ac

Browse files
committed
Add ExtractTimestamp transformation and tests
1 parent 266cfa4 commit 17090ac

File tree

5 files changed

+447
-3
lines changed

5 files changed

+447
-3
lines changed

README.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,34 @@ This is a set of [Kafka Connect transformations](https://kafka.apache.org/docume
66

77
## Transformations
88

9+
See [the Kafka documentation](https://kafka.apache.org/documentation/#connect_transforms) for more details about configuring transformations.
10+
11+
### `ExtractTimestamp`
12+
13+
This transformation replaces the original record's timestamp with a value taken from the the record.
14+
15+
The transformation:
16+
- expects the record value to be either a `STRUCT` or a `MAP`;
17+
- expects it to have a specified field;
18+
- expects the value of the field to be either `INT64` or `org.apache.kafka.connect.data.Timestamp` and not be `null`.
19+
20+
Exists in one variant:
21+
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values.
22+
23+
The transformation defines the following configurations:
24+
25+
- `field.name` - The name of the field which should be used as the new timestamp. Cannot be `null` or empty.
26+
27+
Here's an example of this transformation configuration:
28+
29+
```properties
30+
transforms=ExtractTimestampFromValueField
31+
transforms.ExtractTimestampFromValueField.field.name=inner_field_name
32+
```
33+
934
### `ExtractTopic`
1035

11-
This transformation extracts a string value from the message and use it as the topic name.
36+
This transformation extracts a string value from the record and use it as the topic name.
1237

1338
The transformation can use either the whole key or value (in this case, it must have `STRING` type) or a field in them (in this case, it must have `STRUCT` type and the field's value must be `STRING`).
1439

@@ -28,8 +53,6 @@ transforms=ExtractTopicFromValueField
2853
transforms.ExtractTopicFromValueField.field.name=inner_field_name
2954
```
3055

31-
See [the Kafka documentation](https://kafka.apache.org/documentation/#connect_transforms) for more details about configuring transformations.
32-
3356
## License
3457

3558
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Date;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import org.apache.kafka.connect.connector.ConnectRecord;
24+
import org.apache.kafka.connect.data.Struct;
25+
import org.apache.kafka.connect.errors.DataException;
26+
import org.apache.kafka.connect.transforms.Transformation;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public abstract class ExtractTimestamp<R extends ConnectRecord<R>> implements Transformation<R> {
32+
private static final Logger log = LoggerFactory.getLogger(ExtractTimestamp.class);
33+
34+
private ExtractTimestampConfig config;
35+
36+
@Override
37+
public ConfigDef config() {
38+
return ExtractTimestampConfig.config();
39+
}
40+
41+
@Override
42+
public void configure(final Map<String, ?> configs) {
43+
this.config = new ExtractTimestampConfig(configs);
44+
}
45+
46+
@Override
47+
public R apply(final R record) {
48+
if (record.value() == null) {
49+
throw new DataException("Value can't be null: " + record);
50+
}
51+
52+
final Object fieldValue;
53+
if (record.value() instanceof Struct) {
54+
final Struct struct = (Struct) record.value();
55+
if (struct.schema().field(config.fieldName()) == null) {
56+
throw new DataException(config.fieldName() + " field must be present and its value can't be null: "
57+
+ record);
58+
}
59+
fieldValue = struct.get(config.fieldName());
60+
} else if (record.value() instanceof Map) {
61+
final Map map = (Map) record.value();
62+
fieldValue = map.get(config.fieldName());
63+
} else {
64+
throw new DataException("Value type must be STRUCT or MAP: " + record);
65+
}
66+
67+
if (fieldValue == null) {
68+
throw new DataException(config.fieldName() + " field must be present and its value can't be null: "
69+
+ record);
70+
}
71+
72+
final long newTimestamp;
73+
if (fieldValue instanceof Long) {
74+
newTimestamp = (long) fieldValue;
75+
} else if (fieldValue instanceof Date) {
76+
newTimestamp = ((Date) fieldValue).getTime();
77+
} else {
78+
throw new DataException(config.fieldName()
79+
+ " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "
80+
+ record);
81+
}
82+
83+
return record.newRecord(
84+
record.topic(),
85+
record.kafkaPartition(),
86+
record.keySchema(),
87+
record.key(),
88+
record.valueSchema(),
89+
record.value(),
90+
newTimestamp
91+
);
92+
}
93+
94+
public static final class Value<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {
95+
// There's an implementation only for value, not for key.
96+
// We provide $Value class anyway for the consistency sake
97+
// and in case we need a $Key version in the future as well.
98+
}
99+
100+
@Override
101+
public void close() {
102+
}
103+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
24+
final class ExtractTimestampConfig extends AbstractConfig {
25+
public static final String FIELD_NAME_CONFIG = "field.name";
26+
private static final String FIELD_NAME_DOC =
27+
"The name of the field is to be used as the source of timestamp. "
28+
+ "The field must have INT64 or org.apache.kafka.connect.data.Timestamp type "
29+
+ "and must mot be null.";
30+
31+
ExtractTimestampConfig(final Map<?, ?> originals) {
32+
super(config(), originals);
33+
}
34+
35+
static ConfigDef config() {
36+
return new ConfigDef()
37+
.define(
38+
FIELD_NAME_CONFIG,
39+
ConfigDef.Type.STRING,
40+
ConfigDef.NO_DEFAULT_VALUE,
41+
new ConfigDef.NonEmptyString(),
42+
ConfigDef.Importance.HIGH,
43+
FIELD_NAME_DOC);
44+
}
45+
46+
final String fieldName() {
47+
return getString(FIELD_NAME_CONFIG);
48+
}
49+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.common.config.ConfigException;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
29+
class ExtractTimestampConfigTest {
30+
@Test
31+
void emptyConfig() {
32+
final Map<String, String> props = new HashMap<>();
33+
final Throwable e = assertThrows(ConfigException.class, () -> new ExtractTimestampConfig(props));
34+
assertEquals("Missing required configuration \"field.name\" which has no default value.",
35+
e.getMessage());
36+
}
37+
38+
@Test
39+
void emptyFieldName() {
40+
final Map<String, String> props = new HashMap<>();
41+
props.put("field.name", "");
42+
final Throwable e = assertThrows(ConfigException.class, () -> new ExtractTimestampConfig(props));
43+
assertEquals("Invalid value for configuration field.name: String must be non-empty",
44+
e.getMessage());
45+
}
46+
47+
@Test
48+
void definedFieldName() {
49+
final Map<String, String> props = new HashMap<>();
50+
props.put("field.name", "test");
51+
final ExtractTimestampConfig config = new ExtractTimestampConfig(props);
52+
assertEquals("test", config.fieldName());
53+
}
54+
}

0 commit comments

Comments
 (0)