Skip to content

Commit a33f093

Browse files
committed
Address review comments
Signed-off-by: currantw <taylor.curran@improving.com>
1 parent f5992c9 commit a33f093

File tree

5 files changed

+220
-190
lines changed

5 files changed

+220
-190
lines changed

integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,14 +396,14 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
396396
frame = sql(s"""
397397
| source=$testTable
398398
| | eval last_wednesday = relative_timestamp("-1d@w3")
399-
| | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now() )
399+
| | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now())
400400
| | eval day_of_week = day_of_week(now())
401401
| | eval expected_days_ago = case(day_of_week >= 4, day_of_week - 4 else day_of_week + 3)
402-
| | eval test_result = (expected_weeks_ago = actual_weeks_ago)
402+
| | eval test_result = (expected_days_ago = actual_days_ago)
403403
| | fields test_result
404404
| | head 1
405405
| """.stripMargin)
406-
assertSameRows(Seq(Row(1)), frame)
406+
assertSameRows(Seq(Row(true)), frame)
407407
}
408408

409409
// TODO #957: Support earliest

ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import java.net.UnknownHostException;
2929
import java.sql.Timestamp;
3030
import java.time.Instant;
31-
import java.time.LocalDateTime;
3231
import java.time.ZoneId;
32+
import java.time.ZonedDateTime;
3333
import java.util.Collection;
3434
import java.util.List;
3535
import java.util.Map;
@@ -42,7 +42,7 @@
4242

4343
public interface SerializableUdf {
4444

45-
abstract class SerializableAbstractFunction1<T1,R> extends AbstractFunction1<T1,R>
45+
abstract class SerializableAbstractFunction1<T1, R> extends AbstractFunction1<T1, R>
4646
implements Serializable {
4747
}
4848

@@ -212,22 +212,39 @@ public BigInteger apply(String ipAddress) {
212212
}
213213

214214
/**
215-
* Returns the {@link Timestamp} corresponding to the given relative string and current timestamp.
215+
* Returns the {@link Instant} corresponding to the given relative string, current timestamp, and current time zone ID.
216216
* Throws {@link RuntimeException} if the relative string is not supported.
217217
*/
218-
Function3<String, Instant, String, Timestamp> relativeTimestampFunction = new SerializableAbstractFunction3<String, Instant, String, Timestamp>() {
218+
Function3<String, Object, String, Instant> relativeTimestampFunction = new SerializableAbstractFunction3<String, Object, String, Instant>() {
219+
219220
@Override
220-
public Timestamp apply(String relativeString, Instant currentInstant, String zoneId) {
221-
LocalDateTime currentLocalDateTime = LocalDateTime.ofInstant(currentInstant, ZoneId.of(zoneId));
222-
LocalDateTime relativeLocalDateTime = TimeUtils.getRelativeLocalDateTime(relativeString, currentLocalDateTime);
223-
return Timestamp.valueOf(relativeLocalDateTime);
221+
public Instant apply(String relativeString, Object currentTimestamp, String zoneIdString) {
222+
223+
/// If `spark.sql.datetime.java8API.enabled` is set to `true`, [org.apache.spark.sql.types.TimestampType]
224+
/// is converted to [Instant] by Catalyst; otherwise, [Timestamp] is used instead.
225+
Instant currentInstant =
226+
currentTimestamp instanceof Timestamp
227+
? ((Timestamp) currentTimestamp).toInstant()
228+
: (Instant) currentTimestamp;
229+
230+
/// The Spark session time zone (`spark.sql.session.timeZone`)
231+
/// is used, which may be different from the system time zone.
232+
ZoneId zoneId = ZoneId.of(zoneIdString);
233+
234+
/// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago)
235+
/// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g.
236+
/// start of previous Wednesday) need to account for the local date time.
237+
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId);
238+
ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime);
239+
240+
return relativeDateTime.toInstant();
224241
}
225242
};
226243

227244
/**
228245
* Get the function reference according to its name
229246
*
230-
* @param funcName string representing function to retrieve.
247+
* @param funcName string representing function to retrieve.
231248
* @return relevant ScalaUDF for given function name.
232249
*/
233250
static ScalaUDF visit(String funcName, List<Expression> expressions) {
@@ -270,7 +287,7 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
270287
true);
271288
case "ip_to_int":
272289
return new ScalaUDF(geoIpUtils.ipToInt,
273-
DataTypes.createDecimalType(38,0),
290+
DataTypes.createDecimalType(38, 0),
274291
seq(expressions),
275292
seq(),
276293
Option.empty(),

ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/TimeUtils.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111
import java.time.DayOfWeek;
1212
import java.time.Duration;
13-
import java.time.LocalDateTime;
1413
import java.time.Month;
1514
import java.time.Period;
15+
import java.time.ZonedDateTime;
1616
import java.time.temporal.ChronoUnit;
1717
import java.util.HashMap;
1818
import java.util.Map;
@@ -81,7 +81,7 @@ public class TimeUtils {
8181
Map.entry("w6", DayOfWeek.SATURDAY));
8282

8383
/**
84-
* Returns the relative {@link LocalDateTime} corresponding to the given relative string and local date time.
84+
* Returns the relative {@link ZonedDateTime} corresponding to the given relative string and zoned date time.
8585
* <p>
8686
* The relative time string has syntax {@code [+|-]<offset_time_integer><offset_time_unit>@<snap_time_unit>}, and
8787
* is made up of two optional components:
@@ -219,12 +219,12 @@ public class TimeUtils {
219219
* </body>
220220
* </table>
221221
*/
222-
public static LocalDateTime getRelativeLocalDateTime(String relativeString, LocalDateTime localDateTime) {
222+
public static ZonedDateTime getRelativeZonedDateTime(String relativeString, ZonedDateTime zonedDateTime) {
223223

224-
LocalDateTime relativeLocalDateTime = localDateTime;
224+
ZonedDateTime relativeZonedDateTime = zonedDateTime;
225225

226226
if (relativeString.equalsIgnoreCase(NOW)) {
227-
return localDateTime;
227+
return zonedDateTime;
228228
}
229229

230230
Matcher matcher = RELATIVE_PATTERN.matcher(relativeString);
@@ -235,27 +235,27 @@ public static LocalDateTime getRelativeLocalDateTime(String relativeString, Loca
235235

236236

237237
if (matcher.group("offset") != null) {
238-
relativeLocalDateTime = applyOffset(
239-
relativeLocalDateTime,
238+
relativeZonedDateTime = applyOffset(
239+
relativeZonedDateTime,
240240
matcher.group("offsetSign"),
241241
matcher.group("offsetValue"),
242242
matcher.group("offsetUnit"));
243243
}
244244

245245
if (matcher.group("snap") != null) {
246-
relativeLocalDateTime = applySnap(
247-
relativeLocalDateTime,
246+
relativeZonedDateTime = applySnap(
247+
relativeZonedDateTime,
248248
matcher.group("snapUnit"));
249249
}
250250

251-
return relativeLocalDateTime;
251+
return relativeZonedDateTime;
252252
}
253253

254254
/**
255255
* Applies the offset specified by the offset sign, value,
256-
* and unit to the given local date time, and returns the result.
256+
* and unit to the given zoned date time, and returns the result.
257257
*/
258-
private LocalDateTime applyOffset(LocalDateTime localDateTime, String offsetSign, String offsetValue, String offsetUnit) {
258+
private ZonedDateTime applyOffset(ZonedDateTime zonedDateTime, String offsetSign, String offsetValue, String offsetUnit) {
259259

260260
int offsetValueInt = Optional.ofNullable(offsetValue).map(Integer::parseInt).orElse(1);
261261
if (offsetSign.equals(NEGATIVE_SIGN)) {
@@ -271,46 +271,46 @@ private LocalDateTime applyOffset(LocalDateTime localDateTime, String offsetSign
271271

272272
if (DURATION_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) {
273273
Duration offsetDuration = DURATION_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt);
274-
return localDateTime.plus(offsetDuration);
274+
return zonedDateTime.plus(offsetDuration);
275275
}
276276

277277
if (PERIOD_FOR_TIME_UNIT_MAP.containsKey(offsetUnitLowerCase)) {
278278
Period offsetPeriod = PERIOD_FOR_TIME_UNIT_MAP.get(offsetUnitLowerCase).multipliedBy(offsetValueInt);
279-
return localDateTime.plus(offsetPeriod);
279+
return zonedDateTime.plus(offsetPeriod);
280280
}
281281

282282
String message = String.format("The relative date time unit '%s' is not supported.", offsetUnit);
283283
throw new IllegalArgumentException(message);
284284
}
285285

286286
/**
287-
* Snaps the given local date time to the start of the previous time
287+
* Snaps the given zoned date time to the start of the previous time
288288
* period specified by the given snap unit, and returns the result.
289289
*/
290-
private LocalDateTime applySnap(LocalDateTime localDateTime, String snapUnit) {
290+
private ZonedDateTime applySnap(ZonedDateTime zonedDateTime, String snapUnit) {
291291

292292
// Convert to lower case to make case-insensitive.
293293
String snapUnitLowerCase = snapUnit.toLowerCase();
294294

295295
if (SECOND_UNITS_SET.contains(snapUnitLowerCase)) {
296-
return localDateTime.truncatedTo(ChronoUnit.SECONDS);
296+
return zonedDateTime.truncatedTo(ChronoUnit.SECONDS);
297297
} else if (MINUTE_UNITS_SET.contains(snapUnitLowerCase)) {
298-
return localDateTime.truncatedTo(ChronoUnit.MINUTES);
298+
return zonedDateTime.truncatedTo(ChronoUnit.MINUTES);
299299
} else if (HOUR_UNITS_SET.contains(snapUnitLowerCase)) {
300-
return localDateTime.truncatedTo(ChronoUnit.HOURS);
300+
return zonedDateTime.truncatedTo(ChronoUnit.HOURS);
301301
} else if (DAY_UNITS_SET.contains(snapUnitLowerCase)) {
302-
return localDateTime.truncatedTo(ChronoUnit.DAYS);
302+
return zonedDateTime.truncatedTo(ChronoUnit.DAYS);
303303
} else if (WEEK_UNITS_SET.contains(snapUnitLowerCase)) {
304-
return applySnapToDayOfWeek(localDateTime, DayOfWeek.SUNDAY);
304+
return applySnapToDayOfWeek(zonedDateTime, DayOfWeek.SUNDAY);
305305
} else if (MONTH_UNITS_SET.contains(snapUnitLowerCase)) {
306-
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
306+
return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
307307
} else if (QUARTER_UNITS_SET.contains(snapUnitLowerCase)) {
308-
Month snapMonth = localDateTime.getMonth().firstMonthOfQuarter();
309-
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).withMonth(snapMonth.getValue());
308+
Month snapMonth = zonedDateTime.getMonth().firstMonthOfQuarter();
309+
return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).withMonth(snapMonth.getValue());
310310
} else if (YEAR_UNITS_SET.contains(snapUnitLowerCase)) {
311-
return localDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
311+
return zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
312312
} else if (DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.containsKey(snapUnitLowerCase)) {
313-
return applySnapToDayOfWeek(localDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnitLowerCase));
313+
return applySnapToDayOfWeek(zonedDateTime, DAY_OF_THE_WEEK_FOR_SNAP_UNIT_MAP.get(snapUnitLowerCase));
314314
}
315315

316316
String message = String.format("The relative date time unit '%s' is not supported.", snapUnit);
@@ -321,10 +321,10 @@ private LocalDateTime applySnap(LocalDateTime localDateTime, String snapUnit) {
321321
* Snaps the given date time to the start of the previous
322322
* specified day of the week, and returns the result.
323323
*/
324-
private LocalDateTime applySnapToDayOfWeek(LocalDateTime dateTime, DayOfWeek snapDayOfWeek) {
325-
LocalDateTime snappedDateTime = dateTime.truncatedTo(ChronoUnit.DAYS);
324+
private ZonedDateTime applySnapToDayOfWeek(ZonedDateTime zonedDateTime, DayOfWeek snapDayOfWeek) {
325+
ZonedDateTime snappedDateTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS);
326326

327-
int daysToSnap = dateTime.getDayOfWeek().getValue() - snapDayOfWeek.getValue();
327+
int daysToSnap = zonedDateTime.getDayOfWeek().getValue() - snapDayOfWeek.getValue();
328328
if (daysToSnap < 0) daysToSnap += DayOfWeek.values().length;
329329

330330
return snappedDateTime.minusDays(daysToSnap);

ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableTimeUdfTest.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,49 +13,62 @@
1313

1414
import static org.junit.Assert.assertEquals;
1515
import static org.junit.Assert.assertThrows;
16-
import static org.opensearch.sql.expression.function.SerializableUdf.relativeTimestampFunction;
1716

1817
public class SerializableTimeUdfTest {
1918

20-
// Monday, Jan 03, 2000 @ 01:01:01.100
21-
private final Instant MOCK_INSTANT = Instant.parse("2000-01-03T01:01:01.100Z");
19+
// Monday, Jan 03, 2000 @ 01:01:01.100 UTC
2220
private final ZoneId MOCK_ZONE_ID = ZoneId.of("UTC");
21+
private final Instant MOCK_INSTANT = Instant.parse("2000-01-03T01:01:01.100Z");
22+
private final Timestamp MOCK_TIMESTAMP = Timestamp.from(MOCK_INSTANT);
2323

2424
@Test
2525
public void relativeTimestampTest() {
2626

27-
/* These are only basic tests of the relative date time functionality.
28-
For more comprehensive tests, see {@link TimeUtilsTest}.
29-
*/
30-
31-
testValid("-60m", "2000-01-03 00:01:01.100");
32-
testValid("-H", "2000-01-03 00:01:01.100");
33-
testValid("+2wk", "2000-01-17 01:01:01.100");
34-
testValid("-1h@W3", "1999-12-29 00:00:00");
35-
testValid("@d", "2000-01-03 00:00:00");
36-
testValid("now", "2000-01-03 01:01:01.100");
37-
38-
testInvalid("invalid", "The relative date time 'invalid' is not supported.");
39-
testInvalid("INVALID", "The relative date time 'INVALID' is not supported.");
40-
testInvalid("~h", "The relative date time '~h' is not supported.");
41-
testInvalid("+1.1h", "The relative date time '+1.1h' is not supported.");
42-
testInvalid("+ms", "The relative date time unit 'ms' is not supported.");
43-
testInvalid("+1INVALID", "The relative date time unit 'INVALID' is not supported.");
44-
testInvalid("@INVALID", "The relative date time unit 'INVALID' is not supported.");
45-
testInvalid("@ms", "The relative date time unit 'ms' is not supported.");
46-
testInvalid("@w8", "The relative date time unit 'w8' is not supported.");
27+
/// These are only basic tests of the relative time functionality.
28+
/// For more comprehensive tests, see [TimeUtilsTest].
29+
30+
testValidInstant("-60m", "2000-01-03T00:01:01.100Z");
31+
testValidInstant("-H", "2000-01-03T00:01:01.100Z");
32+
testValidInstant("+2wk", "2000-01-17T01:01:01.100Z");
33+
testValidInstant("-1h@W3", "1999-12-29T00:00:00Z");
34+
testValidInstant("@d", "2000-01-03T00:00:00Z");
35+
testValidInstant("now", "2000-01-03T01:01:01.100Z");
36+
37+
testValidTimestamp("-60m", "2000-01-03T00:01:01.100Z");
38+
testValidTimestamp("-H", "2000-01-03T00:01:01.100Z");
39+
testValidTimestamp("+2wk", "2000-01-17T01:01:01.100Z");
40+
testValidTimestamp("-1h@W3", "1999-12-29T00:00:00Z");
41+
testValidTimestamp("@d", "2000-01-03T00:00:00Z");
42+
testValidTimestamp("now", "2000-01-03T01:01:01.100Z");
43+
44+
testInvalidString("invalid", "The relative date time 'invalid' is not supported.");
45+
testInvalidString("INVALID", "The relative date time 'INVALID' is not supported.");
46+
testInvalidString("~h", "The relative date time '~h' is not supported.");
47+
testInvalidString("+1.1h", "The relative date time '+1.1h' is not supported.");
48+
testInvalidString("+ms", "The relative date time unit 'ms' is not supported.");
49+
testInvalidString("+1INVALID", "The relative date time unit 'INVALID' is not supported.");
50+
testInvalidString("@INVALID", "The relative date time unit 'INVALID' is not supported.");
51+
testInvalidString("@ms", "The relative date time unit 'ms' is not supported.");
52+
testInvalidString("@w8", "The relative date time unit 'w8' is not supported.");
53+
}
54+
55+
private void testValidInstant(String relativeString, String expectedInstantString) {
56+
String testMessage = String.format("\"%s\"", relativeString);
57+
Instant expectedInstant = Instant.parse(expectedInstantString);
58+
Instant actualTimestamp = SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString());
59+
assertEquals(testMessage, expectedInstant, actualTimestamp);
4760
}
4861

49-
private void testValid(String relativeString, String expectedTimestampString) {
62+
private void testValidTimestamp(String relativeString, String expectedInstantString) {
5063
String testMessage = String.format("\"%s\"", relativeString);
51-
Timestamp expectedTimestamp = Timestamp.valueOf(expectedTimestampString);
52-
Timestamp actualTimestamp = relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString());
53-
assertEquals(testMessage, expectedTimestamp, actualTimestamp);
64+
Instant expectedInstant = Instant.parse(expectedInstantString);
65+
Instant actualTimestamp = SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_TIMESTAMP, MOCK_ZONE_ID.toString());
66+
assertEquals(testMessage, expectedInstant, actualTimestamp);
5467
}
5568

56-
private void testInvalid(String relativeString, String expectedExceptionMessage) {
69+
private void testInvalidString(String relativeString, String expectedExceptionMessage) {
5770
String testMessage = String.format("\"%s\"", relativeString);
58-
String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, () -> relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString())).getMessage();
71+
String actualExceptionMessage = assertThrows(testMessage, RuntimeException.class, () -> SerializableUdf.relativeTimestampFunction.apply(relativeString, MOCK_INSTANT, MOCK_ZONE_ID.toString())).getMessage();
5972
assertEquals(expectedExceptionMessage, actualExceptionMessage);
6073
}
6174
}

0 commit comments

Comments
 (0)