Skip to content

Commit 3a1746f

Browse files
authored
Merge pull request #26 from aiven/timestamp_extractor_configuration
Add support for configuration of timestamp extractor
2 parents dfd7044 + c3f1928 commit 3a1746f

File tree

6 files changed

+270
-88
lines changed

6 files changed

+270
-88
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ Exists in two variants:
2222
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values.
2323

2424
The transformation defines the following configurations:
25-
2625
- `field.name` - The name of the field which should be used as the new timestamp. Cannot be `null` or empty.
26+
- `timestamp.resolution` - The timestamp resolution for key or value
27+
There are two possible values:
28+
- `milliseconds` - key or value timestamp in milliseconds
29+
- `seconds` - key or value timestamp in seconds and will be converted in milliseconds,
30+
the default is `milliseconds`.
2731

2832
Here's an example of this transformation configuration:
2933

src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ static void setUpAll() throws IOException, InterruptedException {
9090
final File integrationTestClassesPath = new File(System.getProperty("integration-test.classes.path"));
9191
assert integrationTestClassesPath.exists();
9292

93-
final Class[] testConnectorClasses = new Class[]{
93+
final Class<?>[] testConnectorClasses = new Class[]{
9494
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class
9595
};
96-
for (final Class clazz : testConnectorClasses) {
96+
for (final Class<?> clazz : testConnectorClasses) {
9797
final String packageName = clazz.getPackage().getName();
9898
final String packagePrefix = packageName + ".";
9999
assert clazz.getCanonicalName().startsWith(packagePrefix);

src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Date;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
2122

2223
import org.apache.kafka.common.config.ConfigDef;
2324
import org.apache.kafka.connect.connector.ConnectRecord;
@@ -70,9 +71,15 @@ public R apply(final R record) {
7071

7172
final long newTimestamp;
7273
if (fieldValue instanceof Long) {
73-
newTimestamp = (long) fieldValue;
74+
final var longFieldValue = (long) fieldValue;
75+
if (config.timestampResolution() == ExtractTimestampConfig.TimestampResolution.SECONDS) {
76+
newTimestamp = TimeUnit.SECONDS.toMillis(longFieldValue);
77+
} else {
78+
newTimestamp = longFieldValue;
79+
}
7480
} else if (fieldValue instanceof Date) {
75-
newTimestamp = ((Date) fieldValue).getTime();
81+
final var dateFieldValue = (Date) fieldValue;
82+
newTimestamp = dateFieldValue.getTime();
7683
} else {
7784
throw new DataException(config.fieldName()
7885
+ " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "

src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,96 @@
1616

1717
package io.aiven.kafka.connect.transforms;
1818

19+
import java.util.Arrays;
1920
import java.util.Map;
21+
import java.util.stream.Collectors;
2022

2123
import org.apache.kafka.common.config.AbstractConfig;
2224
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.common.config.ConfigException;
2326

2427
final class ExtractTimestampConfig extends AbstractConfig {
28+
2529
public static final String FIELD_NAME_CONFIG = "field.name";
26-
private static final String FIELD_NAME_DOC =
27-
"The name of the field is to be used as the source of timestamp. "
30+
private static final String FIELD_NAME_DOC = "The name of the field is to be used as the source of timestamp. "
2831
+ "The field must have INT64 or org.apache.kafka.connect.data.Timestamp type "
2932
+ "and must mot be null.";
3033

34+
public static final String EPOCH_RESOLUTION_CONFIG = "timestamp.resolution";
35+
private static final String EPOCH_RESOLUTION_DOC = "Time resolution used for INT64 type field. "
36+
+ "Valid values are \"seconds\" for seconds since epoch and \"milliseconds\" for "
37+
+ "milliseconds since epoch. Default is \"milliseconds\" and ignored for "
38+
+ "org.apache.kafka.connect.data.Timestamp type.";
39+
40+
41+
public enum TimestampResolution {
42+
43+
MILLISECONDS("milliseconds"),
44+
SECONDS("seconds");
45+
46+
final String resolution;
47+
48+
private static final String RESOLUTIONS =
49+
Arrays.stream(values()).map(TimestampResolution::resolution).collect(Collectors.joining(", "));
50+
51+
private TimestampResolution(final String resolution) {
52+
this.resolution = resolution;
53+
}
54+
55+
public String resolution() {
56+
return resolution;
57+
}
58+
59+
public static TimestampResolution fromString(final String value) {
60+
for (final var r : values()) {
61+
if (r.resolution.equals(value)) {
62+
return r;
63+
}
64+
}
65+
throw new IllegalArgumentException(
66+
"Unsupported resolution type '" + value + "'. Supported are: " + RESOLUTIONS);
67+
}
68+
69+
}
70+
3171
ExtractTimestampConfig(final Map<?, ?> originals) {
3272
super(config(), originals);
3373
}
3474

3575
static ConfigDef config() {
3676
return new ConfigDef()
37-
.define(
38-
FIELD_NAME_CONFIG,
39-
ConfigDef.Type.STRING,
40-
ConfigDef.NO_DEFAULT_VALUE,
41-
new ConfigDef.NonEmptyString(),
42-
ConfigDef.Importance.HIGH,
43-
FIELD_NAME_DOC);
77+
.define(
78+
FIELD_NAME_CONFIG,
79+
ConfigDef.Type.STRING,
80+
ConfigDef.NO_DEFAULT_VALUE,
81+
new ConfigDef.NonEmptyString(),
82+
ConfigDef.Importance.HIGH,
83+
FIELD_NAME_DOC)
84+
.define(
85+
EPOCH_RESOLUTION_CONFIG,
86+
ConfigDef.Type.STRING,
87+
TimestampResolution.MILLISECONDS.resolution,
88+
new ConfigDef.Validator() {
89+
@Override
90+
public void ensureValid(final String name, final Object value) {
91+
assert value instanceof String;
92+
try {
93+
TimestampResolution.fromString((String) value);
94+
} catch (final IllegalArgumentException e) {
95+
throw new ConfigException(EPOCH_RESOLUTION_CONFIG, value, e.getMessage());
96+
}
97+
}
98+
},
99+
ConfigDef.Importance.LOW,
100+
EPOCH_RESOLUTION_DOC);
44101
}
45102

46103
final String fieldName() {
47104
return getString(FIELD_NAME_CONFIG);
48105
}
106+
107+
final TimestampResolution timestampResolution() {
108+
return TimestampResolution.fromString(getString(EPOCH_RESOLUTION_CONFIG));
109+
}
110+
49111
}

src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,52 @@ void definedFieldName() {
5151
final ExtractTimestampConfig config = new ExtractTimestampConfig(props);
5252
assertEquals("test", config.fieldName());
5353
}
54+
55+
@Test
56+
void emptyTimestampResolution() {
57+
final var props = new HashMap<>();
58+
props.put("field.name", "test");
59+
final var config = new ExtractTimestampConfig(props);
60+
assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution());
61+
}
62+
63+
@Test
64+
void definedTimestampResolutionInSeconds() {
65+
final var props = new HashMap<>();
66+
props.put("field.name", "test");
67+
props.put(
68+
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
69+
ExtractTimestampConfig.TimestampResolution.SECONDS.resolution
70+
);
71+
final var config = new ExtractTimestampConfig(props);
72+
assertEquals(ExtractTimestampConfig.TimestampResolution.SECONDS, config.timestampResolution());
73+
}
74+
75+
@Test
76+
void definedTimestampResolutionInMillis() {
77+
final var props = new HashMap<>();
78+
props.put("field.name", "test");
79+
props.put(
80+
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
81+
ExtractTimestampConfig.TimestampResolution.MILLISECONDS.resolution
82+
);
83+
final var config = new ExtractTimestampConfig(props);
84+
assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution());
85+
}
86+
87+
@Test
88+
void wrongTimestampResolution() {
89+
final var props = new HashMap<>();
90+
props.put("field.name", "test");
91+
props.put(
92+
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
93+
"foo"
94+
);
95+
final var e = assertThrows(ConfigException.class, () -> new ExtractTimestampConfig(props));
96+
assertEquals(
97+
"Invalid value foo for configuration timestamp.resolution: "
98+
+ "Unsupported resolution type 'foo'. Supported are: milliseconds, seconds",
99+
e.getMessage());
100+
}
101+
54102
}

0 commit comments

Comments
 (0)