Skip to content

Commit b428deb

Browse files
committed
add test and upgrade calcite
1 parent f97ddc8 commit b428deb

File tree

9 files changed

+1081
-207
lines changed

9 files changed

+1081
-207
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 199 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -108,124 +108,234 @@ public static TimestampData toTimestamp(String str, String format, String timezo
108108
}
109109
}
110110

111-
public static int timestampDiff(
112-
String symbol,
111+
public static Integer timestampDiff(
112+
String timeIntervalUnit,
113113
LocalZonedTimestampData fromTimestamp,
114114
LocalZonedTimestampData toTimestamp) {
115+
if (fromTimestamp == null || toTimestamp == null) {
116+
return null;
117+
}
115118
return timestampDiff(
116-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond());
119+
timeIntervalUnit,
120+
fromTimestamp.getEpochMillisecond(),
121+
ZoneId.systemDefault().getId(),
122+
toTimestamp.getEpochMillisecond(),
123+
ZoneId.systemDefault().getId());
117124
}
118125

119-
public static int timestampdiff(
120-
String symbol,
126+
public static Integer timestampdiff(
127+
String timeIntervalUnit,
121128
LocalZonedTimestampData fromTimestamp,
122129
LocalZonedTimestampData toTimestamp) {
123-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
130+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
124131
}
125132

126-
public static int timestampDiff(
127-
String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) {
128-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
133+
public static Integer timestampDiff(
134+
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
135+
if (fromTimestamp == null || toTimestamp == null) {
136+
return null;
137+
}
138+
return timestampDiff(
139+
timeIntervalUnit,
140+
fromTimestamp.getMillisecond(),
141+
null,
142+
toTimestamp.getMillisecond(),
143+
null);
129144
}
130145

131-
public static int timestampdiff(
132-
String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) {
133-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
146+
public static Integer timestampdiff(
147+
String timeIntervalUnit, TimestampData fromTimestamp, TimestampData toTimestamp) {
148+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
134149
}
135150

136-
public static int timestampDiff(
137-
String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
151+
public static Integer timestampDiff(
152+
String timeIntervalUnit,
153+
TimestampData fromTimestamp,
154+
LocalZonedTimestampData toTimestamp) {
155+
if (fromTimestamp == null || toTimestamp == null) {
156+
return null;
157+
}
138158
return timestampDiff(
139-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
159+
timeIntervalUnit,
160+
fromTimestamp.getMillisecond(),
161+
null,
162+
toTimestamp.getEpochMillisecond(),
163+
ZoneId.systemDefault().getId());
140164
}
141165

142-
public static int timestampdiff(
143-
String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
144-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
166+
public static Integer timestampdiff(
167+
String timeIntervalUnit,
168+
TimestampData fromTimestamp,
169+
LocalZonedTimestampData toTimestamp) {
170+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
145171
}
146172

147-
public static int timestampDiff(
148-
String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
173+
public static Integer timestampDiff(
174+
String timeIntervalUnit,
175+
LocalZonedTimestampData fromTimestamp,
176+
TimestampData toTimestamp) {
177+
if (fromTimestamp == null || toTimestamp == null) {
178+
return null;
179+
}
149180
return timestampDiff(
150-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
181+
timeIntervalUnit,
182+
fromTimestamp.getEpochMillisecond(),
183+
ZoneId.systemDefault().getId(),
184+
toTimestamp.getMillisecond(),
185+
null);
151186
}
152187

153-
public static int timestampdiff(
154-
String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
155-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
188+
public static Integer timestampdiff(
189+
String timeIntervalUnit,
190+
LocalZonedTimestampData fromTimestamp,
191+
TimestampData toTimestamp) {
192+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
156193
}
157194

158-
public static int timestampDiff(
159-
String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
160-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
195+
public static Integer timestampDiff(
196+
String timeIntervalUnit,
197+
ZonedTimestampData fromTimestamp,
198+
ZonedTimestampData toTimestamp) {
199+
if (fromTimestamp == null || toTimestamp == null) {
200+
return null;
201+
}
202+
return timestampDiff(
203+
timeIntervalUnit,
204+
fromTimestamp.getMillisecond(),
205+
fromTimestamp.getZoneId(),
206+
toTimestamp.getMillisecond(),
207+
toTimestamp.getZoneId());
161208
}
162209

163-
public static int timestampdiff(
164-
String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
165-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
210+
public static Integer timestampdiff(
211+
String timeIntervalUnit,
212+
ZonedTimestampData fromTimestamp,
213+
ZonedTimestampData toTimestamp) {
214+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
166215
}
167216

168-
public static int timestampDiff(
169-
String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
217+
public static Integer timestampDiff(
218+
String timeIntervalUnit,
219+
LocalZonedTimestampData fromTimestamp,
220+
ZonedTimestampData toTimestamp) {
221+
if (fromTimestamp == null || toTimestamp == null) {
222+
return null;
223+
}
170224
return timestampDiff(
171-
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond());
225+
timeIntervalUnit,
226+
fromTimestamp.getEpochMillisecond(),
227+
ZoneId.systemDefault().getId(),
228+
toTimestamp.getMillisecond(),
229+
toTimestamp.getZoneId());
172230
}
173231

174-
public static int timestampdiff(
175-
String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
176-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
232+
public static Integer timestampdiff(
233+
String timeIntervalUnit,
234+
LocalZonedTimestampData fromTimestamp,
235+
ZonedTimestampData toTimestamp) {
236+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
177237
}
178238

179-
public static int timestampDiff(
180-
String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
239+
public static Integer timestampDiff(
240+
String timeIntervalUnit,
241+
ZonedTimestampData fromTimestamp,
242+
LocalZonedTimestampData toTimestamp) {
243+
if (fromTimestamp == null || toTimestamp == null) {
244+
return null;
245+
}
181246
return timestampDiff(
182-
symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond());
247+
timeIntervalUnit,
248+
fromTimestamp.getMillisecond(),
249+
fromTimestamp.getZoneId(),
250+
toTimestamp.getEpochMillisecond(),
251+
ZoneId.systemDefault().getId());
183252
}
184253

185-
public static int timestampdiff(
186-
String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) {
187-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
254+
public static Integer timestampdiff(
255+
String timeIntervalUnit,
256+
ZonedTimestampData fromTimestamp,
257+
LocalZonedTimestampData toTimestamp) {
258+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
188259
}
189260

190-
public static int timestampDiff(
191-
String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
192-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
261+
public static Integer timestampDiff(
262+
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
263+
if (fromTimestamp == null || toTimestamp == null) {
264+
return null;
265+
}
266+
return timestampDiff(
267+
timeIntervalUnit,
268+
fromTimestamp.getMillisecond(),
269+
null,
270+
toTimestamp.getMillisecond(),
271+
toTimestamp.getZoneId());
193272
}
194273

195-
public static int timestampdiff(
196-
String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
197-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
274+
public static Integer timestampdiff(
275+
String timeIntervalUnit, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
276+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
198277
}
199278

200-
public static int timestampDiff(
201-
String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
202-
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
279+
public static Integer timestampDiff(
280+
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
281+
if (fromTimestamp == null || toTimestamp == null) {
282+
return null;
283+
}
284+
return timestampDiff(
285+
timeIntervalUnit,
286+
fromTimestamp.getMillisecond(),
287+
fromTimestamp.getZoneId(),
288+
toTimestamp.getMillisecond(),
289+
null);
203290
}
204291

205-
public static int timestampdiff(
206-
String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
207-
return timestampDiff(symbol, fromTimestamp, toTimestamp);
292+
public static Integer timestampdiff(
293+
String timeIntervalUnit, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) {
294+
return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
208295
}
209296

210-
public static int timestampDiff(String timeIntervalUnit, long fromDate, long toDate) {
211-
Calendar from = Calendar.getInstance();
297+
public static Integer timestampDiff(
298+
String timeIntervalUnit,
299+
long fromDate,
300+
String fromDateZoneId,
301+
long toDate,
302+
String toDateZoneId) {
303+
if (fromDateZoneId == null || fromDateZoneId.isEmpty()) {
304+
fromDateZoneId = "UTC";
305+
}
306+
if (toDateZoneId == null || toDateZoneId.isEmpty()) {
307+
toDateZoneId = "UTC";
308+
}
309+
Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromDateZoneId));
212310
from.setTime(new Date(fromDate));
213-
Calendar to = Calendar.getInstance();
311+
Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toDateZoneId));
214312
to.setTime(new Date(toDate));
215-
Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
313+
long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
216314
switch (timeIntervalUnit) {
217315
case "SECOND":
218-
return second.intValue();
316+
if (second > Integer.MAX_VALUE) {
317+
return null;
318+
}
319+
return (int) second;
219320
case "MINUTE":
220-
return second.intValue() / 60;
321+
if (second > Integer.MAX_VALUE) {
322+
return null;
323+
}
324+
return (int) second / 60;
221325
case "HOUR":
222-
return second.intValue() / 3600;
326+
if (second > Integer.MAX_VALUE) {
327+
return null;
328+
}
329+
return (int) second / 3600;
223330
case "DAY":
224-
return second.intValue() / (24 * 3600);
331+
if (second > Integer.MAX_VALUE) {
332+
return null;
333+
}
334+
return (int) second / (24 * 3600);
225335
case "MONTH":
226336
return to.get(Calendar.YEAR) * 12
227-
+ to.get(Calendar.MONDAY)
228-
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY));
337+
+ to.get(Calendar.MONTH)
338+
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH));
229339
case "YEAR":
230340
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
231341
default:
@@ -236,24 +346,38 @@ public static int timestampDiff(String timeIntervalUnit, long fromDate, long toD
236346
}
237347
}
238348

239-
public static TimestampData timestampadd(
240-
String timeIntervalUnit, int interval, LocalZonedTimestampData timePoint) {
241-
return timestampadd(timeIntervalUnit, interval, timePoint.getEpochMillisecond());
349+
public static LocalZonedTimestampData timestampadd(
350+
String timeIntervalUnit, Integer interval, LocalZonedTimestampData timePoint) {
351+
if (interval == null || timePoint == null) {
352+
return null;
353+
}
354+
return LocalZonedTimestampData.fromEpochMillis(
355+
timestampadd(timeIntervalUnit, interval, timePoint.getEpochMillisecond()));
242356
}
243357

244-
public static TimestampData timestampadd(
245-
String timeIntervalUnit, int interval, ZonedTimestampData timePoint) {
246-
return timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond());
358+
public static ZonedTimestampData timestampadd(
359+
String timeIntervalUnit, Integer interval, ZonedTimestampData timePoint) {
360+
if (interval == null || timePoint == null) {
361+
return null;
362+
}
363+
return ZonedTimestampData.of(
364+
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()),
365+
0,
366+
timePoint.getZoneId());
247367
}
248368

249369
public static TimestampData timestampadd(
250-
String timeIntervalUnit, int interval, TimestampData timePoint) {
251-
return timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond());
370+
String timeIntervalUnit, Integer interval, TimestampData timePoint) {
371+
if (interval == null || timePoint == null) {
372+
return null;
373+
}
374+
return TimestampData.fromMillis(
375+
timestampadd(timeIntervalUnit, interval, timePoint.getMillisecond()));
252376
}
253377

254-
public static TimestampData timestampadd(
255-
String timeIntervalUnit, int interval, long timePoint) {
378+
private static long timestampadd(String timeIntervalUnit, int interval, long timePoint) {
256379
Calendar calendar = Calendar.getInstance();
380+
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
257381
calendar.setTime(new Date(timePoint));
258382
int field;
259383
switch (timeIntervalUnit) {
@@ -282,7 +406,7 @@ public static TimestampData timestampadd(
282406
timeIntervalUnit));
283407
}
284408
calendar.add(field, interval);
285-
return TimestampData.fromMillis(calendar.getTimeInMillis());
409+
return calendar.getTimeInMillis();
286410
}
287411

288412
public static boolean betweenAsymmetric(String value, String minValue, String maxValue) {

0 commit comments

Comments
 (0)