Skip to content

Commit e61a922

Browse files
committed
[FLINK-38887-2] Transform supports handling nested types (ARRAY, MAP, ROW, VARIANT)
1 parent 8e27374 commit e61a922

File tree

36 files changed

+990
-1395
lines changed

36 files changed

+990
-1395
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,16 @@
1717

1818
package org.apache.flink.cdc.common.utils;
1919

20-
import org.apache.flink.cdc.common.data.DateData;
21-
import org.apache.flink.cdc.common.data.TimeData;
22-
2320
import org.slf4j.Logger;
2421
import org.slf4j.LoggerFactory;
2522

2623
import java.text.ParseException;
2724
import java.text.SimpleDateFormat;
2825
import java.time.Instant;
29-
import java.time.ZoneId;
30-
import java.time.ZonedDateTime;
26+
import java.time.LocalDate;
27+
import java.time.LocalDateTime;
28+
import java.time.LocalTime;
29+
import java.time.format.DateTimeFormatter;
3130
import java.util.Calendar;
3231
import java.util.Date;
3332
import java.util.TimeZone;
@@ -64,6 +63,9 @@ public class DateTimeUtils {
6463
private static final ThreadLocalCache<String, SimpleDateFormat> FORMATTER_CACHE =
6564
ThreadLocalCache.of(SimpleDateFormat::new);
6665

66+
private static final ThreadLocalCache<String, DateTimeFormatter> DATE_TIME_FORMATTER_CACHE =
67+
ThreadLocalCache.of(DateTimeFormatter::ofPattern);
68+
6769
// --------------------------------------------------------------------------------------------
6870
// TIMESTAMP to DATE/TIME utils
6971
// --------------------------------------------------------------------------------------------
@@ -74,12 +76,12 @@ public class DateTimeUtils {
7476
* @param ts the timestamp in milliseconds.
7577
* @return the date in days.
7678
*/
77-
public static DateData timestampMillisToDate(long ts) {
79+
public static LocalDate timestampMillisToDate(long ts) {
7880
long days = ts / MILLIS_PER_DAY;
7981
if (days < 0) {
8082
days = days - 1;
8183
}
82-
return DateData.fromEpochDay((int) days);
84+
return LocalDate.ofEpochDay(days);
8385
}
8486

8587
/**
@@ -88,30 +90,19 @@ public static DateData timestampMillisToDate(long ts) {
8890
* @param ts the timestamp in milliseconds.
8991
* @return the time in milliseconds.
9092
*/
91-
public static TimeData timestampMillisToTime(long ts) {
92-
return TimeData.fromMillisOfDay((int) (ts % MILLIS_PER_DAY));
93+
public static LocalTime timestampMillisToTime(long ts) {
94+
return LocalTime.ofNanoOfDay(ts * 1_000_000);
9395
}
9496

9597
// --------------------------------------------------------------------------------------------
9698
// Parsing functions
9799
// --------------------------------------------------------------------------------------------
98-
/** Returns the epoch days since 1970-01-01. */
99-
public static int parseDate(String dateStr, String fromFormat) {
100-
// It is OK to use UTC, we just want get the epoch days
101-
// TODO use offset, better performance
102-
long ts = internalParseTimestampMillis(dateStr, fromFormat, TimeZone.getTimeZone("UTC"));
103-
ZoneId zoneId = ZoneId.of("UTC");
104-
Instant instant = Instant.ofEpochMilli(ts);
105-
ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
106-
return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth());
107-
}
108-
109-
public static DateData parseDate(String dateStr, String fromFormat, String timezone) {
110-
long ts = internalParseTimestampMillis(dateStr, fromFormat, TimeZone.getTimeZone(timezone));
111-
ZoneId zoneId = ZoneId.of(timezone);
112-
Instant instant = Instant.ofEpochMilli(ts);
113-
ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
114-
return DateData.fromLocalDate(zdt.toLocalDate());
100+
public static LocalDate parseDate(String dateStr, String fromFormat) {
101+
try {
102+
return LocalDate.parse(dateStr, DateTimeFormatter.ofPattern(fromFormat));
103+
} catch (Exception e) {
104+
return null;
105+
}
115106
}
116107

117108
private static long internalParseTimestampMillis(String dateStr, String format, TimeZone tz) {
@@ -122,9 +113,9 @@ private static long internalParseTimestampMillis(String dateStr, String format,
122113
return date.getTime();
123114
} catch (ParseException e) {
124115
LOG.error(
125-
String.format(
126-
"Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
127-
dateStr, format),
116+
"Exception when parsing datetime string '{}' in format '{}', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
117+
dateStr,
118+
format,
128119
e);
129120
return Long.MIN_VALUE;
130121
}
@@ -196,27 +187,31 @@ public static long unixTimestamp(String dateStr, String format, TimeZone timeZon
196187
// Format
197188
// --------------------------------------------------------------------------------------------
198189

199-
public static String formatTimestampMillis(long ts, String format, TimeZone timeZone) {
190+
public static String formatInstant(Instant ts, String format, TimeZone timeZone) {
200191
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
201192
formatter.setTimeZone(timeZone);
202-
Date dateTime = new Date(ts);
193+
Date dateTime = Date.from(ts);
203194
return formatter.format(dateTime);
204195
}
205196

197+
public static String formatLocalDateTime(LocalDateTime ts, String format) {
198+
return ts.format(DATE_TIME_FORMATTER_CACHE.get(format));
199+
}
200+
206201
// --------------------------------------------------------------------------------------------
207202
// Compare
208203
// --------------------------------------------------------------------------------------------
209204

210205
public static Integer timestampDiff(
211206
String timeIntervalUnit,
212-
long fromDate,
207+
Instant fromTimestamp,
213208
String fromTimezone,
214-
long toDate,
209+
Instant toTimestamp,
215210
String toTimezone) {
216211
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
217-
from.setTime(new Date(fromDate));
212+
from.setTime(Date.from(fromTimestamp));
218213
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
219-
to.setTime(new Date(toDate));
214+
to.setTime(Date.from(toTimestamp));
220215
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
221216
switch (timeIntervalUnit) {
222217
case "SECOND":
@@ -257,11 +252,11 @@ public static Integer timestampDiff(
257252
// Add
258253
// --------------------------------------------------------------------------------------------
259254

260-
public static long timestampAdd(
261-
String timeIntervalUnit, int interval, long timePoint, String timezone) {
255+
public static Instant timestampAdd(
256+
String timeIntervalUnit, int interval, Instant timePoint, String timezone) {
262257
Calendar calendar = Calendar.getInstance();
263258
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
264-
calendar.setTime(new Date(timePoint));
259+
calendar.setTime(Date.from(timePoint));
265260
int field;
266261
switch (timeIntervalUnit) {
267262
case "SECOND":
@@ -289,6 +284,6 @@ public static long timestampAdd(
289284
timeIntervalUnit));
290285
}
291286
calendar.add(field, interval);
292-
return calendar.getTimeInMillis();
287+
return calendar.toInstant();
293288
}
294289
}

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ void testComplicatedUdfReturnTypes(ValuesDataSink.SinkApi sinkApi, String langua
10031003
// Check the order and content of all received events
10041004
String[] outputEvents = outCaptor.toString().trim().split("\n");
10051005
assertThat(outputEvents)
1006-
.contains(
1006+
.containsExactlyInAnyOrder(
10071007
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`char_col` STRING,`varchar_col` STRING,`binary_col` BINARY(17),`varbinary_col` VARBINARY(17),`ts_col` TIMESTAMP(2),`ts_ltz_col` TIMESTAMP_LTZ(2),`decimal_col` DECIMAL(10, 3),`non_null_col` DECIMAL(10, 3)}, primaryKeys=col1, options=({key1=value1})}",
10081008
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
10091009
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
3737
import org.apache.flink.cdc.common.schema.Schema;
3838
import org.apache.flink.cdc.common.types.DataType;
39+
import org.apache.flink.cdc.common.types.variant.Variant;
40+
import org.apache.flink.cdc.common.types.variant.VariantBuilder;
3941
import org.apache.flink.cdc.composer.PipelineExecution;
4042
import org.apache.flink.cdc.composer.definition.PipelineDef;
4143
import org.apache.flink.cdc.composer.definition.SinkDef;
@@ -106,6 +108,7 @@
106108
import static org.apache.flink.cdc.common.types.DataTypes.TINYINT;
107109
import static org.apache.flink.cdc.common.types.DataTypes.VARBINARY;
108110
import static org.apache.flink.cdc.common.types.DataTypes.VARCHAR;
111+
import static org.apache.flink.cdc.common.types.DataTypes.VARIANT;
109112
import static org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory.IDENTIFIER;
110113
import static org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions.PRINT_ENABLED;
111114
import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS;
@@ -154,6 +157,7 @@ class TransformSpecsITCase {
154157
.physicalColumn("map_string_array_string_", MAP(STRING(), ARRAY(STRING())))
155158
.physicalColumn(
156159
"complex_row_", ROW(FIELD("name", STRING()), FIELD("length", INT())))
160+
.physicalColumn("variant_", VARIANT())
157161
.build();
158162

159163
private static final BinaryRecordDataGenerator generator =
@@ -165,6 +169,26 @@ class TransformSpecsITCase {
165169
static {
166170
BinaryRecordDataGenerator nestedGenerator =
167171
new BinaryRecordDataGenerator(new DataType[] {STRING(), INT()});
172+
VariantBuilder builder = Variant.newBuilder();
173+
174+
Variant objectVariant =
175+
builder.object()
176+
.add("k", builder.of(1))
177+
.add("object", builder.object().add("k", builder.of("hello")).build())
178+
.add(
179+
"array",
180+
builder.array()
181+
.add(builder.of(1))
182+
.add(builder.of(2))
183+
.add(builder.object().add("kk", builder.of(1.123f)).build())
184+
.build())
185+
.build();
186+
Variant arrayVariant =
187+
builder.array()
188+
.add(builder.object().add("k", builder.of(1)).build())
189+
.add(builder.of("hello"))
190+
.add(builder.object().add("k", builder.of(2)).build())
191+
.build();
168192

169193
BinaryRecordData record1 =
170194
generator.generate(
@@ -213,7 +237,8 @@ class TransformSpecsITCase {
213237
s("three"),
214238
new GenericArrayData(
215239
new Object[] {s("T"), s("H"), s("R"), s("E"), s("E")})),
216-
nestedGenerator.generate(new Object[] {s("Alice"), 5})
240+
nestedGenerator.generate(new Object[] {s("Alice"), 5}),
241+
objectVariant,
217242
});
218243

219244
BinaryRecordData record2 =
@@ -260,15 +285,16 @@ class TransformSpecsITCase {
260285
new GenericArrayData(new Object[] {s("E"), s("R")}),
261286
s("三"),
262287
new GenericArrayData(new Object[] {s("S"), s("A"), s("N")})),
263-
nestedGenerator.generate(new Object[] {s("Derrida"), 7})
288+
nestedGenerator.generate(new Object[] {s("Derrida"), 7}),
289+
arrayVariant
264290
});
265291

266292
BinaryRecordData record3 =
267293
generator.generate(
268294
new Object[] {
269295
0L, null, null, null, null, null, null, null, null, null, null, null,
270296
null, null, null, null, null, null, null, null, null, null, null, null,
271-
null, null, null, null, null, null, null, null, null, null
297+
null, null, null, null, null, null, null, null, null, null, null
272298
});
273299

274300
testInputSuites.add(new CreateTableEvent(testTableId, testInputSchema));

0 commit comments

Comments
 (0)