diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/RecordData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/RecordData.java index c3aa97b65bb..1be00883b6d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/RecordData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/RecordData.java @@ -126,6 +126,14 @@ public interface RecordData { */ DecimalData getDecimal(int pos, int precision, int scale); + /** + * Returns the zone time value at the given position. + * + *

The precision is required to determine whether the time value was stored in a compact + * representation (see {@link ZoneTimeData}). + */ + ZoneTimeData getZoneTime(int pos, int precision); + /** * Returns the timestamp value at the given position. * @@ -201,6 +209,9 @@ static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos case TIME_WITHOUT_TIME_ZONE: fieldGetter = record -> record.getInt(fieldPos); break; + case TIME_WITH_TIME_ZONE: + fieldGetter = record -> record.getZoneTime(fieldPos, getPrecision(fieldType)); + break; case BIGINT: fieldGetter = record -> record.getLong(fieldPos); break; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZoneTimeData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZoneTimeData.java new file mode 100644 index 00000000000..677ab157be6 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZoneTimeData.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.data; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +/** + * An internal data structure representing data of {@link ZonedTimeType}. It aims to converting + * various Java time representations into the date-time in a particular time zone. + * + *

The ISO time format is used by default, it includes the date, time (including fractional + * parts), and offset from UTC, such as '10:15:30+01:00'. + */ +@PublicEvolving +public class ZoneTimeData implements Comparable { + + private static final long MILLIS_TO_NANO = 1_000L; + /** + * The ISO time format includes the date, time (including fractional parts), and offset from + * UTC, such as '10:15:30.030431+01:00'. + */ + public static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_OFFSET_TIME; + + private final long nanoOfDay; + // this field holds time zone id + private final String zoneId; + + public ZoneTimeData(long nanoOfDay, String zoneId) { + this.nanoOfDay = nanoOfDay; + this.zoneId = zoneId; + } + + public static ZoneTimeData fromLocalTime(LocalTime localTime, ZoneId zoneId) { + return new ZoneTimeData(localTime.toNanoOfDay(), zoneId.getId()); + } + + public static ZoneTimeData fromNanoOfDay(long nanoOfDay, ZoneId zoneId) { + return new ZoneTimeData(nanoOfDay, zoneId.getId()); + } + + public static ZoneTimeData fromMillsOfDay(long millsOfDay, ZoneId zoneId) { + return new ZoneTimeData(millsOfDay * MILLIS_TO_NANO, zoneId.getId()); + } + + public static ZoneTimeData fromIsoLocalTimeString(String timeString, ZoneId zoneId) { + return fromLocalTime(LocalTime.parse(timeString), zoneId); + } + + public ZonedDateTime toZoneLocalTime() { + return ZonedDateTime.ofInstant( + Instant.from(LocalDateTime.of(LocalDate.now(), LocalTime.ofNanoOfDay(nanoOfDay))), + ZoneId.of(zoneId)); + } + + public String toString() { + return toZoneLocalTime().format(ISO_FORMATTER); + } + + @Override + public final boolean equals(Object o) { + if (!(o instanceof ZoneTimeData)) { + return false; + } + + ZoneTimeData timeData = (ZoneTimeData) o; + return nanoOfDay == timeData.nanoOfDay; + } + + @Override + public int compareTo(ZoneTimeData other) { + return Long.compare(nanoOfDay, other.nanoOfDay); + } + + @Override + public int hashCode() { + return Objects.hash(nanoOfDay, zoneId); + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java index 2cd4bc1f54b..58412586e81 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java @@ -25,12 +25,14 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZoneTimeData; import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import java.nio.ByteOrder; +import java.time.ZoneId; /** * An implementation of {@link RecordData} which is backed by {@link MemorySegment} instead of @@ -164,6 +166,12 @@ public DecimalData getDecimal(int pos, int precision, int scale) { segments, offset, offsetAndSize, precision, scale); } + @Override + public ZoneTimeData getZoneTime(int pos, int precision) { + String[] parts = getString(pos).toString().split(TIMESTAMP_DELIMITER); + return ZoneTimeData.fromNanoOfDay(Long.parseLong(parts[0]), ZoneId.of(parts[2])); + } + @Override public TimestampData getTimestamp(int pos, int precision) { assertIndexIsValid(pos); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeDefaultVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeDefaultVisitor.java index 31036eeb673..ce981b3ebda 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeDefaultVisitor.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeDefaultVisitor.java @@ -95,6 +95,11 @@ public R visit(TimeType timeType) { return defaultMethod(timeType); } + @Override + public R visit(ZoneTimeType timeType) { + return defaultMethod(timeType); + } + @Override public R visit(TimestampType timestampType) { return defaultMethod(timestampType); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeRoot.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeRoot.java index 357fea64f03..02cf79d5784 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeRoot.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeRoot.java @@ -88,6 +88,8 @@ public enum DataTypeRoot { TIME_WITHOUT_TIME_ZONE(DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIME), + TIME_WITH_TIME_ZONE(DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIME), + TIMESTAMP_WITHOUT_TIME_ZONE( DataTypeFamily.PREDEFINED, DataTypeFamily.DATETIME, DataTypeFamily.TIMESTAMP), diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeVisitor.java index fe38fa8eee5..315053f247c 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeVisitor.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypeVisitor.java @@ -56,6 +56,8 @@ public interface DataTypeVisitor { R visit(TimeType timeType); + R visit(ZoneTimeType zoneTimeType); + R visit(TimestampType timestampType); R visit(ZonedTimestampType zonedTimestampType); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypes.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypes.java index 4c5b6cb4dce..cb256c83516 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypes.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataTypes.java @@ -221,6 +221,44 @@ public static TimeType TIME(int precision) { return new TimeType(precision); } + /** + * Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits + * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both + * inclusive). + * + *

An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond + * precision and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}. + * + *

Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as + * the semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not + * provided. + * + * @see #TIME() + * @see TimeType + */ + public static ZoneTimeType TIME_TZ() { + return new ZoneTimeType(); + } + + /** + * Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits + * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both + * inclusive). + * + *

An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond + * precision and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}. + * + *

Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as + * the semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not + * provided. + * + * @see #TIME() + * @see TimeType + */ + public static ZoneTimeType TIME_TZ(int precision) { + return new ZoneTimeType(precision); + } + /** * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP} with 6 digits of fractional * seconds by default. @@ -447,6 +485,11 @@ public OptionalInt visit(TimeType timeType) { return OptionalInt.of(timeType.getPrecision()); } + @Override + public OptionalInt visit(ZoneTimeType zoneTimeType) { + return OptionalInt.of(zoneTimeType.getPrecision()); + } + @Override public OptionalInt visit(TimestampType timestampType) { return OptionalInt.of(timestampType.getPrecision()); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/ZoneTimeType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/ZoneTimeType.java new file mode 100644 index 00000000000..2f6899804bb --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/ZoneTimeType.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.types; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Data type of a time with a time-zone in the ISO-8601 calendar system. + * + *

The zone time type is used to represent a time with a specific time zone, such as + * '10:15:30+01:00'. + */ +@PublicEvolving +public class ZoneTimeType extends DataType { + + private static final long serialVersionUID = 1L; + + public static final int MIN_PRECISION = TimeType.MIN_PRECISION; + + public static final int MAX_PRECISION = TimeType.MAX_PRECISION; + + public static final int DEFAULT_PRECISION = TimeType.DEFAULT_PRECISION; + + private static final String FORMAT = "TIME(%d) WITH TIME ZONE"; + + private final int precision; + + /** Creates a {@link ZoneTimeType} with default precision. */ + public ZoneTimeType(boolean isNullable, int precision) { + super(isNullable, DataTypeRoot.TIME_WITH_TIME_ZONE); + if (precision < MIN_PRECISION || precision > MAX_PRECISION) { + throw new IllegalArgumentException( + String.format( + "Time with time zone precision must be between %d and %d (both inclusive).", + MIN_PRECISION, MAX_PRECISION)); + } + this.precision = precision; + } + + public ZoneTimeType(int precision) { + this(true, precision); + } + + public ZoneTimeType() { + this(DEFAULT_PRECISION); + } + + public int getPrecision() { + return precision; + } + + @Override + public DataType copy(boolean isNullable) { + return new ZoneTimeType(isNullable, precision); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, precision); + } + + @Override + public String asSummaryString() { + return asSerializableString(); + } + + @Override + public String toString() { + return "ZONETIME"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; // No additional fields to compare + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(DataTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), precision); + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java index 4db049baf34..9c6c9e6c90b 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZoneTimeData; import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.types.DataField; import org.apache.flink.cdc.common.types.DataType; @@ -61,6 +62,8 @@ public static Class toInternalConversionClass(DataType type) { case DATE: case TIME_WITHOUT_TIME_ZONE: return Integer.class; + case TIME_WITH_TIME_ZONE: + return ZoneTimeData.class; case BIGINT: return Long.class; case FLOAT: diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index cbd22c4c176..e4a1a478c2f 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -57,6 +57,7 @@ import org.apache.flink.cdc.common.types.TinyIntType; import org.apache.flink.cdc.common.types.VarBinaryType; import org.apache.flink.cdc.common.types.VarCharType; +import org.apache.flink.cdc.common.types.ZoneTimeType; import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap; @@ -436,6 +437,8 @@ private static DataType strictlyMergeDataTypes(List dataTypes) { return DataTypes.BINARY(BinaryType.MAX_LENGTH); } else if (type.is(DataTypeRoot.VARBINARY)) { return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH); + } else if (type.is(DataTypeRoot.TIME_WITH_TIME_ZONE)) { + return DataTypes.TIME_TZ(ZoneTimeType.MAX_PRECISION); } else if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION); } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) { @@ -678,6 +681,11 @@ static Object coerceObject( } } + if (destinationType.is(DataTypeRoot.TIME_WITH_TIME_ZONE) + && originalType.is(DataTypeRoot.TIME_WITH_TIME_ZONE)) { + return originalField; + } + if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { // For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java index 19bb0353844..a27a5de412a 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java @@ -72,6 +72,8 @@ class DataTypeUtilsTest { DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIME(6), + DataTypes.TIME_TZ(), + DataTypes.TIME_TZ(6), DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP_LTZ(), @@ -105,6 +107,8 @@ class DataTypeUtilsTest { DATE(), TIME(), TIME(6), + TIME(), + TIME(6), TIMESTAMP(), TIMESTAMP(6), TIMESTAMP_LTZ(), diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java index 818412af276..8fc33311cea 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZoneTimeData; import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; @@ -37,6 +38,7 @@ import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZoneTimeType; import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; @@ -60,6 +62,7 @@ import java.util.stream.Stream; import static org.apache.flink.cdc.common.types.DataTypes.DECIMAL; +import static org.apache.flink.cdc.common.types.DataTypes.TIME_TZ; import static org.apache.flink.cdc.common.types.DataTypes.VARCHAR; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceObject; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow; @@ -98,6 +101,8 @@ class SchemaMergingUtilsTest { private static final DataType DATE = DataTypes.DATE(); private static final DataType TIME = DataTypes.TIME(); + private static final DataType TIME_TZ = DataTypes.TIME_TZ(ZoneTimeType.MAX_PRECISION); + private static final DataType ROW = DataTypes.ROW(INT, STRING); private static final DataType ARRAY = DataTypes.ARRAY(STRING); private static final DataType MAP = DataTypes.MAP(INT, STRING); @@ -121,6 +126,7 @@ class SchemaMergingUtilsTest { DOUBLE, // Date and time types TIMESTAMP, + TIME_TZ, TIMESTAMP_LTZ, TIMESTAMP_TZ, TIME, @@ -631,6 +637,7 @@ void testIsDataTypeCompatible() { Tuple2.of(TIMESTAMP, STRING), Tuple2.of(DATE, STRING), Tuple2.of(TIME, STRING), + Tuple2.of(TIME_TZ, TIME_TZ), Tuple2.of(ROW, STRING), Tuple2.of(ARRAY, STRING), Tuple2.of(MAP, STRING), @@ -764,7 +771,7 @@ BINARY, binOf("notre dame"), STRING, binStrOf("bm90cmUgZGFtZQ==")), Tuple4.of( DATE, dateOf(2020, 4, 4), TIMESTAMP_TZ, zTsOf("2020", "04", "04")), Tuple4.of(DATE, dateOf(2021, 5, 5), STRING, binStrOf("2021-05-05")), - + Tuple4.of(TIME_TZ, timeOf(10, 10, 10), TIME_TZ, binStrOf("10:10:10")), // From TIMESTAMP Tuple4.of( TIMESTAMP, @@ -1171,6 +1178,12 @@ private static long dateOf(int year, int month, int dayOfMonth) { return LocalDate.of(year, month, dayOfMonth).toEpochDay(); } + private static ZoneTimeData timeOf(int hour, int minute, int second) { + return ZoneTimeData.fromMillsOfDay( + Instant.parse(String.format("%s:%s:%s", hour, minute, second)).toEpochMilli(), + ZoneId.of("UTC")); + } + private static TimestampData tsOf(String year, String month, String dayOfMonth) { return TimestampData.fromTimestamp( Timestamp.valueOf(String.format("%s-%s-%s 00:00:00", year, month, dayOfMonth)));