Skip to content

Commit 4a40676

Browse files
authored
opensearch-project#991 Support Relative Date Times (opensearch-project#1006)
* Unrelated typos Signed-off-by: currantw <taylor.curran@improving.com> * Initial implementation of relative date time logic and unit tests Signed-off-by: currantw <taylor.curran@improving.com> * Minor cleanup Signed-off-by: currantw <taylor.curran@improving.com> * Update to make relative time case-insensitive, and add corresponding unit tests. Signed-off-by: currantw <taylor.curran@improving.com> * Add a few more unit test cases. Signed-off-by: currantw <taylor.curran@improving.com> * Split pattern into smaller strings. Signed-off-by: currantw <taylor.curran@improving.com> * Add `relativeDateTimeFunction` to `SerializableUdf`, along with corresponding unit tests in `SerializableTimeUdfTest`. Add `mockito-inline` to dependencies for `ppl-spark-integration` to allow mocking of current datetime. Signed-off-by: currantw <taylor.curran@improving.com> * Fix dangling Javadoc Signed-off-by: currantw <taylor.curran@improving.com> * Initial implementation of `relative_timestamp` UDF. Signed-off-by: currantw <taylor.curran@improving.com> * Add integration tests, refactor to use Timestamp Signed-off-by: currantw <taylor.curran@improving.com> * Add documentation Signed-off-by: currantw <taylor.curran@improving.com> * Review comments: add more documentation, add ignored tests for earliest and latest. Signed-off-by: currantw <taylor.curran@improving.com> * Minor clean up Signed-off-by: currantw <taylor.curran@improving.com> * Update to use Instant and ZoneId. For some reason, timestamp can be returned, but output from `$CurrentTimestamp` is an `Instant` Signed-off-by: currantw <taylor.curran@improving.com> * Remove unused import Signed-off-by: currantw <taylor.curran@improving.com> * Address review comments Signed-off-by: currantw <taylor.curran@improving.com> * Address review comments Signed-off-by: currantw <taylor.curran@improving.com> * Update unit tests as per review comments Signed-off-by: currantw <taylor.curran@improving.com> * Remove duplicate documentation. Signed-off-by: currantw <taylor.curran@improving.com> * Fix failing IT Signed-off-by: currantw <taylor.curran@improving.com> --------- Signed-off-by: currantw <taylor.curran@improving.com>
1 parent f054022 commit 4a40676

File tree

14 files changed

+683
-11
lines changed

14 files changed

+683
-11
lines changed

docs/ppl-lang/PPL-Example-Commands.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,4 +499,9 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
499499
- `source = table | eval cdate = CAST('2012-08-07' as date), ctime = cast('2012-08-07T08:07:06' as timestamp) | fields cdate, ctime`
500500
- `source = table | eval chained_cast = cast(cast("true" as boolean) as integer) | fields chained_cast`
501501

502+
#### **relative_timestamp**
503+
[See additional function details](functions/ppl-datetime#RELATIVE_TIMESTAMP)
504+
- `source = table | eval one_hour_ago = relative_timestamp("-1h") | where timestamp < one_hour_ago`
505+
- `source = table | eval start_of_today = relative_timestamp("@d") | where timestamp > start_of_today`
506+
- `source = table | eval last_saturday = relative_timestamp("-1d@w6") | where timestamp >= last_saturday`
502507
---

docs/ppl-lang/functions/ppl-datetime.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,93 @@ Example:
733733
| 3 |
734734
+-------------------------------+
735735

736+
### `RELATIVE_TIMESTAMP`
737+
738+
**Description:**
739+
740+
741+
**Usage:** relative_timestamp(str) returns a relative timestamp corresponding to the given relative string and the
742+
current timestamp at the time of query execution.
743+
744+
The relative timestamp string has syntax `[+|-]<offset_time_integer><offset_time_unit>@<snap_time_unit>`, and is
745+
made up of two optional components.
746+
* An offset from the current timestamp, which is composed of a sign (`+` or `-`), optional `offset_time_integer`, and
747+
`offset_time_unit`. If the offset time integer is not specified, it defaults to `1`. For example, `+2hr` is two
748+
hours after the current timestamp, while `-mon` is one month ago.
749+
* A snap-to time using the `@` symbol followed by `snap_time_unit`. The snap-to time is applied after the offset (if
750+
specified), and rounds the time <i>down</i> to the start of the specified time unit. For example, `@wk` is the start
751+
of the current week (Sunday is considered to be the first day of the week).
752+
753+
The special relative timestamp string `now`, corresponding to the current timestamp, is also supported. The current
754+
timestamp is determined once at the start of query execution, and is used for all relative timestamp calculations for
755+
that query.
756+
757+
The relative timestamp string is case-insensitive.
758+
759+
The following values are supported for `offset_time_unit`:
760+
761+
| Time Unit | Supported Keywords |
762+
|-----------|-------------------------------------------|
763+
| Seconds | `s`, `sec`, `secs`, `second`, `seconds` |
764+
| Minutes | `m`, `min`, `mins`, `minute`, `minutes` |
765+
| Hours | `h`, `hr`, `hrs`, `hour`, `hours` |
766+
| Days | `d`, `day`, `days` |
767+
| Weeks | `w`, `wk`, `wks`, `week`, `weeks` |
768+
| Quarters | `q`, `qtr`, `qtrs`, `quarter`, `quarters` |
769+
| Years | `y`, `yr`, `yrs`, `year`, `years` |
770+
771+
All the time units above are supported for `snap_time_unit`, as well as the following day-of-the-week time units:
772+
773+
| Time Unit | Supported Keywords |
774+
|-----------|--------------------|
775+
| Sunday | `w0`, `w7` |
776+
| Monday | `w1` |
777+
| Tuesday | `w2` |
778+
| Wednesday | `w3` |
779+
| Thursday | `w4` |
780+
| Friday | `w5` |
781+
| Saturday | `w6` |
782+
783+
For example, if the current timestamp is Monday, January 03, 2000 at 01:01:01 am:
784+
785+
| Relative String | Description | Resulting Relative Time |
786+
|-----------------|--------------------------------------------------------------|---------------------------------------------|
787+
| `-60m` | Sixty minutes ago | Monday, January 03, 2000 at 00:01:01 am |
788+
| `-1H` | One hour ago | Monday, January 03, 2000 at 00:01:01 am |
789+
| `+2wk` | Two weeks from now | Monday, January 17, 2000 at 00:01:01 am |
790+
| `-1h@W3` | One hour ago, rounded to the start of the previous Wednesday | Wednesday, December 29, 1999 at 00:00:00 am |
791+
| `@d` | Start of the current day | Monday, January 03, 2000 at 00:00:00 am |
792+
| `now` | Now | Monday, January 03, 2000 at 01:01:01 am |
793+
794+
Argument type: STRING
795+
796+
Return type: TIMESTAMP
797+
798+
Example:
799+
800+
os> source=people | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now")) | fields seconds_diff | head 1
801+
fetched rows / total rows = 1/1
802+
+--------------+
803+
| seconds_diff |
804+
|--------------+
805+
| 0 |
806+
+--------------+
807+
808+
os> source=people | eval hours_diff = timestampdiff(HOUR, now(), relative_timestamp("+1h")) | fields hours_diff | head 1
809+
fetched rows / total rows = 1/1
810+
+------------+
811+
| hours_diff |
812+
|------------+
813+
| 1 |
814+
+------------+
815+
816+
os> source=people | eval day = day_of_week(relative_timestamp("@w0")) | fields day | head 1
817+
fetched rows / total rows = 1/1
818+
+-----+
819+
| day |
820+
|-----|
821+
| 1 |
822+
+-----+
736823

737824
### `SECOND`
738825

docs/ppl-lang/ppl-where-command.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,4 @@ PPL query:
6161
| eval factor = case(a > 15, a - 14, isnull(b), a - 7, a < 3, a + 1 else 1)
6262
| where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even'
6363
| stats count() by factor`
64+
- `source = table | where timestamp >= relative_timestamp("-1d@w6")`

integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltInDateTimeFunctionITSuite.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,70 @@ class FlintSparkPPLBuiltInDateTimeFunctionITSuite
368368
assertSameRows(Seq(Row(3)), frame)
369369
}
370370

371+
test("test RELATIVE_TIMESTAMP") {
372+
var frame = sql(s"""
373+
| source=$testTable
374+
| | eval seconds_diff = timestampdiff(SECOND, now(), relative_timestamp("now"))
375+
| | fields seconds_diff
376+
| | head 1
377+
| """.stripMargin)
378+
assertSameRows(Seq(Row(0)), frame)
379+
380+
frame = sql(s"""
381+
| source=$testTable
382+
| | eval hours_diff = timestampdiff(HOUR, relative_timestamp("+1h"), relative_timestamp("+1d"))
383+
| | fields hours_diff
384+
| | head 1
385+
| """.stripMargin)
386+
assertSameRows(Seq(Row(23)), frame)
387+
388+
frame = sql(s"""
389+
| source =$testTable
390+
| | eval day = day_of_week(relative_timestamp("@w0"))
391+
| | fields day
392+
| | head 1
393+
| """.stripMargin)
394+
assertSameRows(Seq(Row(1)), frame)
395+
396+
frame = sql(s"""
397+
| source=$testTable
398+
| | eval last_wednesday = relative_timestamp("-1d@w3")
399+
| | eval actual_days_ago = timestampdiff(DAY, last_wednesday, now())
400+
| | eval day_of_week = day_of_week(now())
401+
| | eval expected_days_ago = case(day_of_week > 4, day_of_week - 4 else day_of_week + 3)
402+
| | eval test_result = (expected_days_ago = actual_days_ago)
403+
| | fields test_result
404+
| | head 1
405+
| """.stripMargin)
406+
assertSameRows(Seq(Row(true)), frame)
407+
}
408+
409+
// TODO #957: Support earliest
410+
ignore("test EARLIEST") {
411+
var frame = sql(s"""
412+
| source=$testTable
413+
| | eval earliest_hour_before = earliest(now(), "-1h")
414+
| | eval earliest_now = earliest(now(), "now")
415+
| | eval earliest_hour_after = earliest(now(), "+1h")
416+
| | fields earliest_hour_before, earliest_now, earliest_hour_after
417+
| | head 1
418+
| """.stripMargin)
419+
assertSameRows(Seq(Row(true), Row(true), Row(false)), frame)
420+
}
421+
422+
// TODO #957: Support latest
423+
ignore("test LATEST") {
424+
var frame = sql(s"""
425+
| source=$testTable
426+
| | eval latest_hour_before = latest(now(), "-1h")
427+
| | eval latest_now = latest(now(), "now")
428+
| | eval latest_hour_after = latest(now(), "+1h")
429+
| | fields latest_hour_before, latest_now, latest_hour_after
430+
| | head 1
431+
| """.stripMargin)
432+
assertSameRows(Seq(Row(false), Row(true), Row(true)), frame)
433+
}
434+
371435
test("test CURRENT_TIME is not supported") {
372436
val ex = intercept[UnsupportedOperationException](sql(s"""
373437
| source = $testTable

ppl-spark-integration/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ translation between PPL's logical plan to Spark's Catalyst logical plan.
66
### Context
77
The next concepts are the main purpose of introduction this functionality:
88
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
9-
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
9+
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
1010
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
1111
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
1212
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community
@@ -37,7 +37,7 @@ In Apache Spark, the DataFrame API serves as a programmatic interface for data m
3737

3838
For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets.
3939

40-
The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan)
40+
The following section describes the two main options for translating the PPL query (using the logical plan) into the spark correspondent component (either dataframe API or spark logical plan)
4141

4242

4343
### Translation Process

ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ MONTHNAME: 'MONTHNAME';
338338
NOW: 'NOW';
339339
PERIOD_ADD: 'PERIOD_ADD';
340340
PERIOD_DIFF: 'PERIOD_DIFF';
341+
RELATIVE_TIMESTAMP: 'RELATIVE_TIMESTAMP';
341342
SEC_TO_TIME: 'SEC_TO_TIME';
342343
STR_TO_DATE: 'STR_TO_DATE';
343344
SUBDATE: 'SUBDATE';

ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ dateTimeFunctionName
755755
| NOW
756756
| PERIOD_ADD
757757
| PERIOD_DIFF
758+
| RELATIVE_TIMESTAMP
758759
| QUARTER
759760
| SECOND
760761
| SECOND_OF_MINUTE

ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/DataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import lombok.RequiredArgsConstructor;
1010
import org.opensearch.sql.data.type.ExprCoreType;
1111

12-
/** The DataType defintion in AST. Question, could we use {@link ExprCoreType} directly in AST? */
12+
/** The DataType definition in AST. Question, could we use {@link ExprCoreType} directly in AST? */
1313

1414
@RequiredArgsConstructor
1515
public enum DataType {

ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ public enum BuiltinFunctionName {
133133
LOCALTIMESTAMP(FunctionName.of("localtimestamp")),
134134
SYSDATE(FunctionName.of("sysdate")),
135135

136+
// Relative timestamp functions
137+
RELATIVE_TIMESTAMP(FunctionName.of("relative_timestamp")),
138+
136139
/** Text Functions. */
137140
TOSTRING(FunctionName.of("tostring")),
138141

ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,23 @@
1313
import org.apache.spark.sql.types.DataTypes;
1414
import scala.Function1;
1515
import scala.Function2;
16+
import scala.Function3;
1617
import scala.Option;
1718
import scala.Serializable;
1819
import scala.runtime.AbstractFunction1;
1920
import scala.runtime.AbstractFunction2;
21+
import scala.runtime.AbstractFunction3;
2022
import scala.collection.JavaConverters;
2123
import scala.collection.mutable.WrappedArray;
2224

25+
import java.lang.Boolean;
2326
import java.math.BigInteger;
2427
import java.net.InetAddress;
2528
import java.net.UnknownHostException;
29+
import java.sql.Timestamp;
30+
import java.time.Instant;
31+
import java.time.ZoneId;
32+
import java.time.ZonedDateTime;
2633
import java.util.Collection;
2734
import java.util.List;
2835
import java.util.Map;
@@ -35,11 +42,18 @@
3542

3643
public interface SerializableUdf {
3744

45+
abstract class SerializableAbstractFunction1<T1, R> extends AbstractFunction1<T1, R>
46+
implements Serializable {
47+
}
3848

3949
abstract class SerializableAbstractFunction2<T1, T2, R> extends AbstractFunction2<T1, T2, R>
4050
implements Serializable {
4151
}
4252

53+
abstract class SerializableAbstractFunction3<T1, T2, T3, R> extends AbstractFunction3<T1, T2, T3, R>
54+
implements Serializable {
55+
}
56+
4357
/**
4458
* Remove specified keys from a JSON string.
4559
*
@@ -109,7 +123,7 @@ public String apply(String jsonStr, WrappedArray<String> elements) {
109123
}
110124
}
111125
};
112-
126+
113127
Function2<String, String, Boolean> cidrFunction = new SerializableAbstractFunction2<>() {
114128

115129
IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
@@ -197,14 +211,40 @@ public BigInteger apply(String ipAddress) {
197211
};
198212
}
199213

200-
abstract class SerializableAbstractFunction1<T1,R> extends AbstractFunction1<T1,R>
201-
implements Serializable {
202-
}
214+
/**
215+
* Returns the {@link Instant} corresponding to the given relative string, current timestamp, and current time zone ID.
216+
* Throws {@link RuntimeException} if the relative string is not supported.
217+
*/
218+
Function3<String, Object, String, Instant> relativeTimestampFunction = new SerializableAbstractFunction3<String, Object, String, Instant>() {
219+
220+
@Override
221+
public Instant apply(String relativeString, Object currentTimestamp, String zoneIdString) {
222+
223+
/// If `spark.sql.datetime.java8API.enabled` is set to `true`, [org.apache.spark.sql.types.TimestampType]
224+
/// is converted to [Instant] by Catalyst; otherwise, [Timestamp] is used instead.
225+
Instant currentInstant =
226+
currentTimestamp instanceof Timestamp
227+
? ((Timestamp) currentTimestamp).toInstant()
228+
: (Instant) currentTimestamp;
229+
230+
/// The Spark session time zone (`spark.sql.session.timeZone`)
231+
/// is used, which may be different from the system time zone.
232+
ZoneId zoneId = ZoneId.of(zoneIdString);
233+
234+
/// Relative time calculations are performed using [ZonedDateTime] because offsets (e.g. one hour ago)
235+
/// need to account for changes in the time zone offset (e.g. daylight savings time), while snaps (e.g.
236+
/// start of previous Wednesday) need to account for the local date time.
237+
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(currentInstant, zoneId);
238+
ZonedDateTime relativeDateTime = TimeUtils.getRelativeZonedDateTime(relativeString, currentDateTime);
239+
240+
return relativeDateTime.toInstant();
241+
}
242+
};
203243

204244
/**
205245
* Get the function reference according to its name
206246
*
207-
* @param funcName string representing function to retrieve.
247+
* @param funcName string representing function to retrieve.
208248
* @return relevant ScalaUDF for given function name.
209249
*/
210250
static ScalaUDF visit(String funcName, List<Expression> expressions) {
@@ -247,13 +287,22 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
247287
true);
248288
case "ip_to_int":
249289
return new ScalaUDF(geoIpUtils.ipToInt,
250-
DataTypes.createDecimalType(38,0),
290+
DataTypes.createDecimalType(38, 0),
251291
seq(expressions),
252292
seq(),
253293
Option.empty(),
254294
Option.apply("ip_to_int"),
255295
false,
256296
true);
297+
case "relative_timestamp":
298+
return new ScalaUDF(relativeTimestampFunction,
299+
DataTypes.TimestampType,
300+
seq(expressions),
301+
seq(),
302+
Option.empty(),
303+
Option.apply("relative_timestamp"),
304+
false,
305+
true);
257306
default:
258307
return null;
259308
}

0 commit comments

Comments
 (0)