Skip to content

Commit d352b5a

Browse files
committed
Improved timeseries usability
Added conversion tool for non date (String, longs) time data KAFKA-232
1 parent 2f1b034 commit d352b5a

File tree

7 files changed

+394
-2
lines changed

7 files changed

+394
-2
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/sink/MongoSinkTaskIntegrationTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,41 @@ void testSinkProcessesSinkRecords() {
8181
}
8282
}
8383

84+
@Test
85+
@DisplayName("Ensure sink processes timeseries data from Kafka")
86+
void testSinkProcessesTimeseriesData() {
87+
assumeTrue(isGreaterThanFourDotFour());
88+
Map<String, String> cfg = createSettings();
89+
cfg.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG, "ts");
90+
cfg.put(MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG, "meta");
91+
cfg.put(MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG, "true");
92+
93+
try (AutoCloseableSinkTask task = createSinkTask()) {
94+
task.start(cfg);
95+
96+
List<Document> documents =
97+
rangeClosed(1, 10)
98+
.mapToObj(
99+
i -> {
100+
Date now = new Date();
101+
Document doc = new Document("_id", i);
102+
if (i == 0) {
103+
doc.put("ts", "1970T01:01:01.000001Z");
104+
} else if (i == 1) {
105+
doc.put("ts", 3600000);
106+
} else {
107+
doc.put("ts", now);
108+
}
109+
doc.put("meta", i);
110+
return doc;
111+
})
112+
.collect(toList());
113+
List<SinkRecord> sinkRecords = createRecords(documents);
114+
task.put(sinkRecords);
115+
assertEquals(getCollection().countDocuments(), 10);
116+
}
117+
}
118+
84119
@Test
85120
@DisplayName("Ensure sink can handle Tombstone null events")
86121
void testSinkCanHandleTombstoneNullEvents() {

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.CONNECTION_URI_CONFIG;
2222
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPICS_CONFIG;
2323
import static com.mongodb.kafka.connect.util.ClassHelper.createInstance;
24+
import static com.mongodb.kafka.connect.util.FlexibleDateTimeParser.DEFAULT_DATE_TIME_FORMATTER_PATTERN;
25+
import static com.mongodb.kafka.connect.util.Validators.emptyString;
2426
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
2527
import static java.lang.String.format;
2628
import static java.util.Arrays.asList;
2729
import static java.util.Collections.emptyList;
2830
import static java.util.Collections.singletonList;
2931
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
3032

33+
import java.time.format.DateTimeFormatter;
3134
import java.util.HashMap;
3235
import java.util.List;
3336
import java.util.Locale;
@@ -374,7 +377,34 @@ public String value() {
374377
private static final String TIMESERIES_GRANULARITY_DOC =
375378
"Describes the expected interval between subsequent measurements for a "
376379
+ "time series. Possible values: \"seconds\" \"minutes\" \"hours\".";
377-
public static final String TIMESERIES_GRANULARITY_DEFAULT = "";
380+
381+
public static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG =
382+
"timeseries.timefield.auto.convert";
383+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DISPLAY =
384+
"Convert the field to a BSON datetime type.";
385+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DOC =
386+
"Converts the timeseries field to a BSON datetime type. "
387+
+ "If the value is a numeric value it will use it as the milliseconds from epoch. Note any fractional parts are discarded. "
388+
+ "If the value is a String it will use `timeseries.timefield.auto.convert.date.format` configuration to parse the date.";
389+
390+
public static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG =
391+
"timeseries.timefield.auto.convert.date.format";
392+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DISPLAY =
393+
"The DateTimeFormatter pattern for the date.";
394+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DOC =
395+
"The DateTimeFormatter pattern to use when converting String dates. Defaults to supporting ISO style date times. "
396+
+ "Note: A string representation is expected to contain both date and time information. If the string only contains "
397+
+ "date information then the time since epoch will be taken from the start of that day. "
398+
+ "If a string representation does not contain time-zone offset, then the extracted date and time is interpreted as UTC.";
399+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DEFAULT =
400+
DEFAULT_DATE_TIME_FORMATTER_PATTERN;
401+
402+
public static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_CONFIG =
403+
"timeseries.timefield.auto.convert.locale.language.tag";
404+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_DISPLAY =
405+
"The DateTimeFormatter locale language tag to use: Defaults to using the neutral Locale.ROOT.";
406+
private static final String TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_DOC =
407+
"The DateTimeFormatter locale language tag to use with the date pattern: Defaults to Locale.ROOT.";
378408

379409
private static final Pattern CLASS_NAME =
380410
Pattern.compile("\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*");
@@ -1117,6 +1147,42 @@ private static ConfigDef createConfigDef() {
11171147
++orderInGroup,
11181148
ConfigDef.Width.MEDIUM,
11191149
TIMESERIES_GRANULARITY_DISPLAY);
1150+
configDef.define(
1151+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG,
1152+
ConfigDef.Type.BOOLEAN,
1153+
false,
1154+
ConfigDef.Importance.LOW,
1155+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DOC,
1156+
group,
1157+
++orderInGroup,
1158+
ConfigDef.Width.MEDIUM,
1159+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DISPLAY);
1160+
configDef.define(
1161+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG,
1162+
ConfigDef.Type.STRING,
1163+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DEFAULT,
1164+
errorCheckingValueValidator(
1165+
"A valid DateTimeFormatter format", DateTimeFormatter::ofPattern),
1166+
ConfigDef.Importance.LOW,
1167+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DOC,
1168+
group,
1169+
++orderInGroup,
1170+
ConfigDef.Width.MEDIUM,
1171+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_DISPLAY);
1172+
configDef.define(
1173+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_CONFIG,
1174+
ConfigDef.Type.STRING,
1175+
EMPTY_STRING,
1176+
emptyString()
1177+
.or(
1178+
errorCheckingValueValidator(
1179+
"A valid Locale language tag format", Locale::forLanguageTag)),
1180+
ConfigDef.Importance.LOW,
1181+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_DOC,
1182+
group,
1183+
++orderInGroup,
1184+
ConfigDef.Width.MEDIUM,
1185+
TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_DISPLAY);
11201186
return configDef;
11211187
}
11221188
}

src/main/java/com/mongodb/kafka/connect/sink/processor/PostProcessors.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.mongodb.kafka.connect.sink.processor;
1919

2020
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.POST_PROCESSOR_CHAIN_CONFIG;
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG;
2122
import static com.mongodb.kafka.connect.util.ClassHelper.createInstance;
2223
import static java.util.Collections.singletonList;
2324
import static java.util.Collections.unmodifiableList;
@@ -56,6 +57,9 @@ public PostProcessors(final MongoSinkTopicConfig config, final List<String> clas
5657
if (!hasDocumentIdAdder) {
5758
postProcessors.add(0, new DocumentIdAdder(config));
5859
}
60+
if (config.isTimeseries() && config.getBoolean(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG)) {
61+
postProcessors.add(new TimeseriesTimeFieldAutoConversion(config));
62+
}
5963

6064
this.postProcessorList = unmodifiableList(postProcessors);
6165
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.sink.processor;
17+
18+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG;
19+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_CONFIG;
20+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG;
21+
import static java.lang.String.format;
22+
23+
import java.util.Optional;
24+
import java.util.function.Supplier;
25+
26+
import org.apache.kafka.connect.sink.SinkRecord;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import org.bson.BsonDateTime;
31+
import org.bson.BsonValue;
32+
33+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
34+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
35+
import com.mongodb.kafka.connect.util.FlexibleDateTimeParser;
36+
37+
class TimeseriesTimeFieldAutoConversion extends PostProcessor {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(PostProcessor.class);
40+
41+
private final String fieldName;
42+
private final FlexibleDateTimeParser flexibleDateTimeParser;
43+
44+
TimeseriesTimeFieldAutoConversion(final MongoSinkTopicConfig config) {
45+
super(config);
46+
fieldName = config.getString(TIMESERIES_TIMEFIELD_CONFIG);
47+
flexibleDateTimeParser =
48+
new FlexibleDateTimeParser(
49+
config.getString(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG),
50+
config.getString(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_LOCALE_LANGUAGE_TAG_CONFIG));
51+
}
52+
53+
@Override
54+
public void process(final SinkDocument doc, final SinkRecord orig) {
55+
doc.getValueDoc()
56+
.filter(d -> d.containsKey(fieldName) && (d.isNumber(fieldName) || d.isString(fieldName)))
57+
.ifPresent(
58+
d -> {
59+
BsonValue timeField = d.get(fieldName);
60+
Optional<BsonDateTime> convertedValue;
61+
if (timeField.isNumber()) {
62+
convertedValue =
63+
tryToConvert(() -> new BsonDateTime(timeField.asNumber().longValue()));
64+
} else {
65+
convertedValue =
66+
tryToConvert(
67+
() ->
68+
new BsonDateTime(
69+
flexibleDateTimeParser.toEpochMilli(
70+
timeField.asString().toString())));
71+
}
72+
convertedValue.map(bsonDateTime -> d.put(fieldName, bsonDateTime));
73+
});
74+
}
75+
76+
Optional<BsonDateTime> tryToConvert(final Supplier<BsonDateTime> supplier) {
77+
try {
78+
return Optional.of(supplier.get());
79+
} catch (Exception e) {
80+
LOGGER.info(
81+
format(
82+
"Failed to convert field `%s` to a valid date time, so leaving as is: `%s`",
83+
fieldName, e.getMessage()));
84+
return Optional.empty();
85+
}
86+
}
87+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.util;
17+
18+
import static java.lang.String.format;
19+
20+
import java.time.Instant;
21+
import java.time.LocalDate;
22+
import java.time.LocalDateTime;
23+
import java.time.ZoneOffset;
24+
import java.time.format.DateTimeFormatter;
25+
import java.time.format.DateTimeParseException;
26+
import java.time.temporal.ChronoField;
27+
import java.time.temporal.TemporalAccessor;
28+
import java.util.Locale;
29+
30+
public class FlexibleDateTimeParser {
31+
32+
private static final String TIME = "HH:mm:ss";
33+
private static final String OPTIONAL_NANO_SECONDS = "[[.][SSSSSS][SSS]]";
34+
private static final String OPTIONAL_TIME_WITH_TIMEZONE =
35+
format("[%s%s[ ]VV[ ]'['VV']']", TIME, OPTIONAL_NANO_SECONDS);
36+
private static final String OPTIONAL_TIME_WITH_OFFSET =
37+
format("[%s%s[ ]X]", TIME, OPTIONAL_NANO_SECONDS);
38+
private static final String OPTIONAL_TIME = format("[%s%s]", TIME, OPTIONAL_NANO_SECONDS);
39+
40+
public static final String DEFAULT_DATE_TIME_FORMATTER_PATTERN =
41+
"yyyy-MM-dd" // Year month day
42+
+ "[['T'][ ]]" // optional T or space
43+
+ OPTIONAL_TIME_WITH_TIMEZONE
44+
+ OPTIONAL_TIME_WITH_OFFSET
45+
+ OPTIONAL_TIME;
46+
47+
private final DateTimeFormatter formatter;
48+
49+
public FlexibleDateTimeParser(final String dateTimePattern, final String languageTag) {
50+
Locale locale = languageTag.isEmpty() ? Locale.ROOT : Locale.forLanguageTag(languageTag);
51+
this.formatter = DateTimeFormatter.ofPattern(dateTimePattern, locale);
52+
}
53+
54+
public long toEpochMilli(final String dateTimeString) {
55+
TemporalAccessor parsed = formatter.parse(dateTimeString);
56+
if (parsed.isSupported(ChronoField.INSTANT_SECONDS)) {
57+
return Instant.from(parsed).toEpochMilli();
58+
} else if (parsed.isSupported(ChronoField.SECOND_OF_MINUTE)) {
59+
return LocalDateTime.from(parsed).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
60+
} else if (parsed.isSupported(ChronoField.DAY_OF_MONTH)) {
61+
return LocalDate.from(parsed).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
62+
}
63+
throw new DateTimeParseException("Unsupported date time string", dateTimeString, 0);
64+
}
65+
}

src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG;
3636
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_GRANULARITY_CONFIG;
3737
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG;
38+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG;
39+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG;
3840
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG;
3941
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG;
4042
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_TYPE_CONFIG;
@@ -44,6 +46,7 @@
4446
import static com.mongodb.kafka.connect.sink.SinkTestHelper.TEST_TOPIC;
4547
import static com.mongodb.kafka.connect.sink.SinkTestHelper.createConfigMap;
4648
import static com.mongodb.kafka.connect.sink.SinkTestHelper.createSinkConfig;
49+
import static com.mongodb.kafka.connect.util.FlexibleDateTimeParser.DEFAULT_DATE_TIME_FORMATTER_PATTERN;
4750
import static java.lang.String.format;
4851
import static java.util.Arrays.asList;
4952
import static java.util.Collections.emptyList;
@@ -782,9 +785,18 @@ void testTimeseries() {
782785
() -> assertEquals("", createSinkConfig().getString(TIMESERIES_TIMEFIELD_CONFIG)),
783786
() -> assertEquals("", createSinkConfig().getString(TIMESERIES_METAFIELD_CONFIG)),
784787
() -> assertEquals(0, createSinkConfig().getLong(TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG)),
788+
() ->
789+
assertFalse(createSinkConfig().getBoolean(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG)),
790+
() ->
791+
assertEquals(
792+
DEFAULT_DATE_TIME_FORMATTER_PATTERN,
793+
createSinkConfig()
794+
.getString(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG)),
795+
() -> assertEquals(0, createSinkConfig().getLong(TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG)),
785796
() -> assertInvalid(TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG, "-1"),
786797
() -> assertEquals("", createSinkConfig().getString(TIMESERIES_GRANULARITY_CONFIG)),
787-
() -> assertInvalid(TIMESERIES_GRANULARITY_CONFIG, "invalid granularity"));
798+
() -> assertInvalid(TIMESERIES_GRANULARITY_CONFIG, "invalid granularity"),
799+
() -> assertInvalid(TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG, "J"));
788800
}
789801

790802
private Exception assertInvalid(final String key, final String value) {

0 commit comments

Comments
 (0)