Skip to content

Commit dfd7044

Browse files
authored
Merge pull request #25 from aiven/timestamp_extractor_key
Add support for extracting timestamp from message key
2 parents d58f142 + d4045ab commit dfd7044

File tree

6 files changed

+317
-208
lines changed

6 files changed

+317
-208
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ The transformation:
1717
- expects it to have a specified field;
1818
- expects the value of the field to be either `INT64` or `org.apache.kafka.connect.data.Timestamp` and not be `null`.
1919

20-
Exists in one variant:
21-
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values.
20+
Exists in two variants:
21+
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Key` - works on keys;
22+
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values.
2223

2324
The transformation defines the following configurations:
2425

@@ -28,6 +29,7 @@ Here's an example of this transformation configuration:
2829

2930
```properties
3031
transforms=ExtractTimestampFromValueField
32+
transforms.ExtractTimestampFromValueField.type=io.aiven.kafka.connect.transforms.ExtractTimestamp$Value
3133
transforms.ExtractTimestampFromValueField.field.name=inner_field_name
3234
```
3335

src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,12 @@
2121

2222
import org.apache.kafka.common.config.ConfigDef;
2323
import org.apache.kafka.connect.connector.ConnectRecord;
24+
import org.apache.kafka.connect.data.SchemaAndValue;
2425
import org.apache.kafka.connect.data.Struct;
2526
import org.apache.kafka.connect.errors.DataException;
2627
import org.apache.kafka.connect.transforms.Transformation;
2728

28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
30-
3129
public abstract class ExtractTimestamp<R extends ConnectRecord<R>> implements Transformation<R> {
32-
private static final Logger log = LoggerFactory.getLogger(ExtractTimestamp.class);
3330

3431
private ExtractTimestampConfig config;
3532

@@ -45,23 +42,25 @@ public void configure(final Map<String, ?> configs) {
4542

4643
@Override
4744
public R apply(final R record) {
48-
if (record.value() == null) {
49-
throw new DataException("Value can't be null: " + record);
45+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
46+
47+
if (schemaAndValue.value() == null) {
48+
throw new DataException(keyOrValue() + " can't be null: " + record);
5049
}
5150

5251
final Object fieldValue;
53-
if (record.value() instanceof Struct) {
54-
final Struct struct = (Struct) record.value();
52+
if (schemaAndValue.value() instanceof Struct) {
53+
final Struct struct = (Struct) schemaAndValue.value();
5554
if (struct.schema().field(config.fieldName()) == null) {
5655
throw new DataException(config.fieldName() + " field must be present and its value can't be null: "
5756
+ record);
5857
}
5958
fieldValue = struct.get(config.fieldName());
60-
} else if (record.value() instanceof Map) {
61-
final Map map = (Map) record.value();
59+
} else if (schemaAndValue.value() instanceof Map) {
60+
final Map<?, ?> map = (Map<?, ?>) schemaAndValue.value();
6261
fieldValue = map.get(config.fieldName());
6362
} else {
64-
throw new DataException("Value type must be STRUCT or MAP: " + record);
63+
throw new DataException(keyOrValue() + " type must be STRUCT or MAP: " + record);
6564
}
6665

6766
if (fieldValue == null) {
@@ -91,13 +90,36 @@ public R apply(final R record) {
9190
);
9291
}
9392

94-
public static final class Value<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {
95-
// There's an implementation only for value, not for key.
96-
// We provide $Value class anyway for the consistency sake
97-
// and in case we need a $Key version in the future as well.
98-
}
99-
10093
@Override
10194
public void close() {
10295
}
96+
97+
protected abstract String keyOrValue();
98+
99+
protected abstract SchemaAndValue getSchemaAndValue(final R record);
100+
101+
public static final class Key<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {
102+
@Override
103+
protected SchemaAndValue getSchemaAndValue(final R record) {
104+
return new SchemaAndValue(record.keySchema(), record.key());
105+
}
106+
107+
@Override
108+
protected String keyOrValue() {
109+
return "key";
110+
}
111+
}
112+
113+
public static final class Value<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {
114+
@Override
115+
protected SchemaAndValue getSchemaAndValue(final R record) {
116+
return new SchemaAndValue(record.valueSchema(), record.value());
117+
}
118+
119+
@Override
120+
protected String keyOrValue() {
121+
return "value";
122+
}
123+
}
124+
103125
}

src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 Aiven Oy
2+
* Copyright 2020 Aiven Oy
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import org.apache.kafka.connect.data.Schema;
20+
import org.apache.kafka.connect.sink.SinkRecord;
21+
22+
public class ExtractTimestampKeyTest extends ExtractTimestampTest {
23+
24+
@Override
25+
protected String keyOrValue() {
26+
return "key";
27+
}
28+
29+
@Override
30+
protected ExtractTimestamp<SinkRecord> createTransformationObject() {
31+
return new ExtractTimestamp.Key<>();
32+
}
33+
34+
@Override
35+
protected SinkRecord record(final Schema schema, final Object data) {
36+
return record(schema, data, null, null);
37+
}
38+
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright 2020 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Date;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.record.TimestampType;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaBuilder;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.data.Timestamp;
28+
import org.apache.kafka.connect.errors.DataException;
29+
import org.apache.kafka.connect.sink.SinkRecord;
30+
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
34+
35+
import static org.junit.jupiter.api.Assertions.assertEquals;
36+
import static org.junit.jupiter.api.Assertions.assertThrows;
37+
38+
abstract class ExtractTimestampTest {
39+
private static final String FIELD = "test_field";
40+
41+
@Test
42+
void recordNotStructOrMap() {
43+
final SinkRecord originalRecord = record(SchemaBuilder.INT8_SCHEMA, (byte) 123);
44+
final Throwable e = assertThrows(DataException.class,
45+
() -> transformation().apply(originalRecord));
46+
assertEquals(keyOrValue() + " type must be STRUCT or MAP: " + originalRecord,
47+
e.getMessage());
48+
}
49+
50+
@Test
51+
void recordStructNull() {
52+
final Schema schema = SchemaBuilder.struct().schema();
53+
final SinkRecord originalRecord = record(schema, null);
54+
final Throwable e = assertThrows(DataException.class,
55+
() -> transformation().apply(originalRecord));
56+
assertEquals(keyOrValue() + " can't be null: " + originalRecord,
57+
e.getMessage());
58+
}
59+
60+
@Test
61+
void recordMapNull() {
62+
final SinkRecord originalRecord = record(null, null);
63+
final Throwable e = assertThrows(DataException.class,
64+
() -> transformation().apply(originalRecord));
65+
assertEquals(keyOrValue() + " can't be null: " + originalRecord,
66+
e.getMessage());
67+
}
68+
69+
@Test
70+
void structWithMissingField() {
71+
final Schema schema = SchemaBuilder.struct()
72+
.field(FIELD, Schema.INT64_SCHEMA)
73+
.build();
74+
final SinkRecord originalRecord = record(null, new Struct(schema));
75+
final Throwable e = assertThrows(DataException.class,
76+
() -> transformation().apply(originalRecord));
77+
assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord,
78+
e.getMessage());
79+
}
80+
81+
@Test
82+
void mapWithMissingField() {
83+
final SinkRecord originalRecord = record(null, new HashMap<>());
84+
final Throwable e = assertThrows(DataException.class,
85+
() -> transformation().apply(originalRecord));
86+
assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord,
87+
e.getMessage());
88+
}
89+
90+
@Test
91+
void structWithNullField() {
92+
final Schema schema = SchemaBuilder.struct()
93+
.field(FIELD, Schema.OPTIONAL_INT64_SCHEMA)
94+
.build();
95+
final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, null));
96+
final Throwable e = assertThrows(DataException.class,
97+
() -> transformation().apply(originalRecord));
98+
assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord,
99+
e.getMessage());
100+
}
101+
102+
@Test
103+
void mapWithNullField() {
104+
final HashMap<Object, Object> valueMap = new HashMap<>();
105+
valueMap.put(FIELD, null);
106+
final SinkRecord originalRecord = record(null, valueMap);
107+
final Throwable e = assertThrows(DataException.class,
108+
() -> transformation().apply(originalRecord));
109+
assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord,
110+
e.getMessage());
111+
}
112+
113+
@Test
114+
void structWithFieldOfIncorrectType() {
115+
final Schema schema = SchemaBuilder.struct()
116+
.field(FIELD, Schema.STRING_SCHEMA)
117+
.build();
118+
final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, "aaa"));
119+
final Throwable e = assertThrows(DataException.class,
120+
() -> transformation().apply(originalRecord));
121+
assertEquals(FIELD + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "
122+
+ originalRecord,
123+
e.getMessage());
124+
}
125+
126+
@Test
127+
void mapWithFieldOfIncorrectType() {
128+
final HashMap<Object, Object> valueMap = new HashMap<>();
129+
valueMap.put(FIELD, "aaa");
130+
final SinkRecord originalRecord = record(null, valueMap);
131+
final Throwable e = assertThrows(DataException.class,
132+
() -> transformation().apply(originalRecord));
133+
assertEquals(FIELD + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "
134+
+ originalRecord,
135+
e.getMessage());
136+
}
137+
138+
@ParameterizedTest
139+
@ValueSource(booleans = { true, false })
140+
void structWithIntField(final boolean optional) {
141+
final Schema schema;
142+
if (optional) {
143+
schema = SchemaBuilder.struct()
144+
.field(FIELD, Schema.OPTIONAL_INT64_SCHEMA)
145+
.build();
146+
} else {
147+
schema = SchemaBuilder.struct()
148+
.field(FIELD, Schema.INT64_SCHEMA)
149+
.build();
150+
}
151+
final long timestamp = 11363151277L;
152+
final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, timestamp));
153+
final SinkRecord transformedRecord = transformation().apply(originalRecord);
154+
assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord);
155+
}
156+
157+
@Test
158+
void mapWithIntField() {
159+
final long timestamp = 11363151277L;
160+
final HashMap<Object, Object> valueMap = new HashMap<>();
161+
valueMap.put(FIELD, timestamp);
162+
final SinkRecord originalRecord = record(null, valueMap);
163+
final SinkRecord transformedRecord = transformation().apply(originalRecord);
164+
assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord);
165+
}
166+
167+
@Test
168+
void structWithTimestampField() {
169+
final Schema schema = SchemaBuilder.struct()
170+
.field(FIELD, Timestamp.SCHEMA)
171+
.build();
172+
final long timestamp = 11363151277L;
173+
final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, new Date(timestamp)));
174+
final SinkRecord transformedRecord = transformation().apply(originalRecord);
175+
assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord);
176+
}
177+
178+
@Test
179+
void mapWithTimestampField() {
180+
final long timestamp = 11363151277L;
181+
final HashMap<Object, Object> valueMap = new HashMap<>();
182+
valueMap.put(FIELD, new Date(timestamp));
183+
final SinkRecord originalRecord = record(null, valueMap);
184+
final SinkRecord transformedRecord = transformation().apply(originalRecord);
185+
assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord);
186+
}
187+
188+
private ExtractTimestamp<SinkRecord> transformation() {
189+
final Map<String, String> props = new HashMap<>();
190+
props.put("field.name", FIELD);
191+
final ExtractTimestamp<SinkRecord> transform = createTransformationObject();
192+
transform.configure(props);
193+
return transform;
194+
}
195+
196+
protected abstract SinkRecord record(final Schema schema, final Object data);
197+
198+
protected SinkRecord record(final Schema keySchema,
199+
final Object key,
200+
final Schema valueSchema,
201+
final Object value) {
202+
return new SinkRecord("original_topic", 0,
203+
keySchema, key,
204+
valueSchema, value,
205+
123L,
206+
456L, TimestampType.CREATE_TIME);
207+
}
208+
209+
private SinkRecord setNewTimestamp(final SinkRecord record, final long newTimestamp) {
210+
return record.newRecord(record.topic(),
211+
record.kafkaPartition(),
212+
record.keySchema(),
213+
record.key(),
214+
record.valueSchema(),
215+
record.value(),
216+
newTimestamp,
217+
record.headers()
218+
);
219+
}
220+
221+
protected abstract String keyOrValue();
222+
223+
protected abstract ExtractTimestamp<SinkRecord> createTransformationObject();
224+
}

0 commit comments

Comments
 (0)