Skip to content

Commit 39f0235

Browse files
aiwenmoleonardBang
authored andcommitted
[FLINK-36647][transform] Support Timestampdiff and Timestampadd function in cdc pipeline transform
This closes apache#3698
1 parent 551bdf3 commit 39f0235

File tree

11 files changed

+1234
-97
lines changed

11 files changed

+1234
-97
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
157157
| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). |
158158
| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. |
159159
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. |
160+
| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
160161
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161162
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162163
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |

docs/content/docs/core-concept/transform.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
157157
| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). |
158158
| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. |
159159
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. |
160+
| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
160161
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161162
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162163
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.time.Instant;
2626
import java.time.ZoneId;
2727
import java.time.ZonedDateTime;
28+
import java.util.Calendar;
2829
import java.util.Date;
2930
import java.util.TimeZone;
3031

@@ -198,4 +199,93 @@ public static String formatTimestampMillis(long ts, String format, TimeZone time
198199
Date dateTime = new Date(ts);
199200
return formatter.format(dateTime);
200201
}
202+
203+
// --------------------------------------------------------------------------------------------
204+
// Compare
205+
// --------------------------------------------------------------------------------------------
206+
207+
public static Integer timestampDiff(
208+
String timeIntervalUnit,
209+
long fromDate,
210+
String fromTimezone,
211+
long toDate,
212+
String toTimezone) {
213+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
214+
from.setTime(new Date(fromDate));
215+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
216+
to.setTime(new Date(toDate));
217+
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
218+
switch (timeIntervalUnit) {
219+
case "SECOND":
220+
if (second > Integer.MAX_VALUE) {
221+
return null;
222+
}
223+
return (int) second;
224+
case "MINUTE":
225+
if (second > Integer.MAX_VALUE) {
226+
return null;
227+
}
228+
return (int) second / 60;
229+
case "HOUR":
230+
if (second > Integer.MAX_VALUE) {
231+
return null;
232+
}
233+
return (int) second / 3600;
234+
case "DAY":
235+
if (second > Integer.MAX_VALUE) {
236+
return null;
237+
}
238+
return (int) second / (24 * 3600);
239+
case "MONTH":
240+
return to.get(Calendar.YEAR) * 12
241+
+ to.get(Calendar.MONTH)
242+
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH));
243+
case "YEAR":
244+
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
245+
default:
246+
throw new RuntimeException(
247+
String.format(
248+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
249+
timeIntervalUnit));
250+
}
251+
}
252+
253+
// --------------------------------------------------------------------------------------------
254+
// Add
255+
// --------------------------------------------------------------------------------------------
256+
257+
public static long timestampAdd(
258+
String timeIntervalUnit, int interval, long timePoint, String timezone) {
259+
Calendar calendar = Calendar.getInstance();
260+
calendar.setTimeZone(TimeZone.getTimeZone(timezone));
261+
calendar.setTime(new Date(timePoint));
262+
int field;
263+
switch (timeIntervalUnit) {
264+
case "SECOND":
265+
field = Calendar.SECOND;
266+
break;
267+
case "MINUTE":
268+
field = Calendar.MINUTE;
269+
break;
270+
case "HOUR":
271+
field = Calendar.HOUR;
272+
break;
273+
case "DAY":
274+
field = Calendar.DATE;
275+
break;
276+
case "MONTH":
277+
field = Calendar.MONTH;
278+
break;
279+
case "YEAR":
280+
field = Calendar.YEAR;
281+
break;
282+
default:
283+
throw new RuntimeException(
284+
String.format(
285+
"Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
286+
timeIntervalUnit));
287+
}
288+
calendar.add(field, interval);
289+
return calendar.getTimeInMillis();
290+
}
201291
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 105 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import java.time.LocalDateTime;
3535
import java.time.ZoneId;
3636
import java.util.Arrays;
37-
import java.util.Calendar;
38-
import java.util.Date;
3937
import java.util.TimeZone;
4038
import java.util.UUID;
4139
import java.util.regex.Matcher;
@@ -129,83 +127,127 @@ public static TimestampData toTimestamp(String str, String format, String timezo
129127
}
130128
}
131129

132-
public static int timestampDiff(
133-
String symbol,
130+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
131+
public static Integer timestampDiff(
132+
String timeIntervalUnit,
134133
LocalZonedTimestampData fromTimestamp,
135-
LocalZonedTimestampData toTimestamp) {
136-
return timestampDiff(
137-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond());
138-
}
139-
140-
public static int timestampDiff(
141-
String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) {
142-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
143-
}
144-
145-
public static int timestampDiff(
146-
String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
147-
return timestampDiff(
148-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
134+
LocalZonedTimestampData toTimestamp,
135+
String timezone) {
136+
if (fromTimestamp == null || toTimestamp == null) {
137+
return null;
138+
}
139+
return DateTimeUtils.timestampDiff(
140+
timeIntervalUnit,
141+
fromTimestamp.getEpochMillisecond(),
142+
timezone,
143+
toTimestamp.getEpochMillisecond(),
144+
timezone);
145+
}
146+
147+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
148+
public static Integer timestampDiff(
149+
String timeIntervalUnit,
150+
TimestampData fromTimestamp,
151+
TimestampData toTimestamp,
152+
String timezone) {
153+
if (fromTimestamp == null || toTimestamp == null) {
154+
return null;
155+
}
156+
return DateTimeUtils.timestampDiff(
157+
timeIntervalUnit,
158+
fromTimestamp.getMillisecond(),
159+
"UTC",
160+
toTimestamp.getMillisecond(),
161+
"UTC");
162+
}
163+
164+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
165+
public static Integer timestampDiff(
166+
String timeIntervalUnit,
167+
TimestampData fromTimestamp,
168+
LocalZonedTimestampData toTimestamp,
169+
String timezone) {
170+
if (fromTimestamp == null || toTimestamp == null) {
171+
return null;
172+
}
173+
return DateTimeUtils.timestampDiff(
174+
timeIntervalUnit,
175+
fromTimestamp.getMillisecond(),
176+
"UTC",
177+
toTimestamp.getEpochMillisecond(),
178+
timezone);
149179
}
150180

151-
public static int timestampDiff(
152-
String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
153-
return timestampDiff(
154-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
181+
// Be compatible with the existing definition of Function TIMESTAMP_DIFF
182+
public static Integer timestampDiff(
183+
String timeIntervalUnit,
184+
LocalZonedTimestampData fromTimestamp,
185+
TimestampData toTimestamp,
186+
String timezone) {
187+
if (fromTimestamp == null || toTimestamp == null) {
188+
return null;
189+
}
190+
return DateTimeUtils.timestampDiff(
191+
timeIntervalUnit,
192+
fromTimestamp.getEpochMillisecond(),
193+
timezone,
194+
toTimestamp.getMillisecond(),
195+
"UTC");
155196
}
156197

157-
public static int timestampDiff(
158-
String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
159-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
198+
public static Integer timestampdiff(
199+
String timeIntervalUnit,
200+
LocalZonedTimestampData fromTimestamp,
201+
LocalZonedTimestampData toTimestamp,
202+
String timezone) {
203+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
160204
}
161205

162-
public static int timestampDiff(
163-
String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
164-
return timestampDiff(
165-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
206+
public static Integer timestampdiff(
207+
String timeIntervalUnit,
208+
TimestampData fromTimestamp,
209+
TimestampData toTimestamp,
210+
String timezone) {
211+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
166212
}
167213

168-
public static int timestampDiff(
169-
String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
170-
return timestampDiff(
171-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
214+
public static Integer timestampdiff(
215+
String timeIntervalUnit,
216+
TimestampData fromTimestamp,
217+
LocalZonedTimestampData toTimestamp,
218+
String timezone) {
219+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
172220
}
173221

174-
public static int timestampDiff(
175-
String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
176-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
222+
public static Integer timestampdiff(
223+
String timeIntervalUnit,
224+
LocalZonedTimestampData fromTimestamp,
225+
TimestampData toTimestamp,
226+
String timezone) {
227+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone);
177228
}
178229

179-
public static int timestampDiff(
180-
String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
181-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
230+
public static LocalZonedTimestampData timestampadd(
231+
String timeIntervalUnit,
232+
Integer interval,
233+
LocalZonedTimestampData timePoint,
234+
String timezone) {
235+
if (interval == null || timePoint == null) {
236+
return null;
237+
}
238+
return LocalZonedTimestampData.fromEpochMillis(
239+
DateTimeUtils.timestampAdd(
240+
timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone));
182241
}
183242

184-
public static int timestampDiff(String symbol, long fromDate, long toDate) {
185-
Calendar from = Calendar.getInstance();
186-
from.setTime(new Date(fromDate));
187-
Calendar to = Calendar.getInstance();
188-
to.setTime(new Date(toDate));
189-
Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
190-
switch (symbol) {
191-
case "SECOND":
192-
return second.intValue();
193-
case "MINUTE":
194-
return second.intValue() / 60;
195-
case "HOUR":
196-
return second.intValue() / 3600;
197-
case "DAY":
198-
return second.intValue() / (24 * 3600);
199-
case "MONTH":
200-
return to.get(Calendar.YEAR) * 12
201-
+ to.get(Calendar.MONDAY)
202-
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY));
203-
case "YEAR":
204-
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
205-
default:
206-
LOG.error("Unsupported timestamp diff: {}", symbol);
207-
throw new RuntimeException("Unsupported timestamp diff: " + symbol);
243+
public static TimestampData timestampadd(
244+
String timeIntervalUnit, Integer interval, TimestampData timePoint, String timezone) {
245+
if (interval == null || timePoint == null) {
246+
return null;
208247
}
248+
return TimestampData.fromMillis(
249+
DateTimeUtils.timestampAdd(
250+
timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC"));
209251
}
210252

211253
public static boolean betweenAsymmetric(String value, String minValue, String maxValue) {

0 commit comments

Comments
 (0)