Skip to content

Commit 2ab5002

Browse files
authored
Date/Time based Span aggregation should always not present null bucket (#4327)
1 parent 38b4295 commit 2ab5002

32 files changed

+552
-230
lines changed

core/src/main/java/org/opensearch/sql/ast/expression/SpanUnit.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public enum SpanUnit {
4242
SPAN_UNITS = builder.add(SpanUnit.values()).build();
4343
}
4444

45+
/** Util method to check if the unit is time unit. */
46+
public static boolean isTimeUnit(SpanUnit unit) {
47+
return unit != UNKNOWN && unit != NONE;
48+
}
49+
4550
/** Util method to get span unit given the unit name. */
4651
public static SpanUnit of(String unit) {
4752
switch (unit) {

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
import org.opensearch.sql.ast.expression.ParseMethod;
8181
import org.opensearch.sql.ast.expression.PatternMethod;
8282
import org.opensearch.sql.ast.expression.PatternMode;
83+
import org.opensearch.sql.ast.expression.Span;
84+
import org.opensearch.sql.ast.expression.SpanUnit;
8385
import org.opensearch.sql.ast.expression.UnresolvedExpression;
8486
import org.opensearch.sql.ast.expression.WindowFrame;
8587
import org.opensearch.sql.ast.expression.WindowFrame.FrameType;
@@ -893,27 +895,42 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
893895
// The span column is always the first column in result whatever
894896
// the order of span in query is first or last one
895897
UnresolvedExpression span = node.getSpan();
896-
if (!Objects.isNull(span)) {
898+
if (Objects.nonNull(span)) {
897899
groupExprList.add(span);
900+
List<RexNode> timeSpanFilters =
901+
getTimeSpanField(span).stream()
902+
.map(f -> rexVisitor.analyze(f, context))
903+
.map(context.relBuilder::isNotNull)
904+
.toList();
905+
if (!timeSpanFilters.isEmpty()) {
906+
// add isNotNull filter before aggregation for time span
907+
context.relBuilder.filter(timeSpanFilters);
908+
}
898909
}
899910
groupExprList.addAll(node.getGroupExprList());
900-
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
901-
aggregateWithTrimming(groupExprList, aggExprList, context);
902-
// Add group by columns
903-
List<RexNode> aliasedGroupByList =
904-
aggregationAttributes.getLeft().stream()
905-
.map(this::extractAliasLiteral)
906-
.flatMap(Optional::stream)
907-
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
908-
.map(context.relBuilder::field)
909-
.map(f -> (RexNode) f)
910-
.toList();
911911

912912
// add stats hint to LogicalAggregation
913913
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
914914
Boolean bucketNullable =
915915
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
916-
if (!bucketNullable && !aliasedGroupByList.isEmpty()) {
916+
boolean toAddHintsOnAggregate = false;
917+
if (!bucketNullable
918+
&& !groupExprList.isEmpty()
919+
&& !(groupExprList.size() == 1 && getTimeSpanField(span).isPresent())) {
920+
toAddHintsOnAggregate = true;
921+
// add isNotNull filter before aggregation for non-nullable buckets
922+
List<RexNode> groupByList =
923+
groupExprList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
924+
context.relBuilder.filter(
925+
PlanUtils.getSelectColumns(groupByList).stream()
926+
.map(context.relBuilder::field)
927+
.map(context.relBuilder::isNotNull)
928+
.toList());
929+
}
930+
931+
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
932+
aggregateWithTrimming(groupExprList, aggExprList, context);
933+
if (toAddHintsOnAggregate) {
917934
final RelHint statHits =
918935
RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build();
919936
assert context.relBuilder.peek() instanceof LogicalAggregate
@@ -930,8 +947,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
930947
return rel instanceof LogicalAggregate;
931948
})
932949
.build());
933-
context.relBuilder.filter(
934-
aliasedGroupByList.stream().map(context.relBuilder::isNotNull).toList());
935950
}
936951

937952
// schema reordering
@@ -945,12 +960,32 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
945960
List<RexNode> aggRexList =
946961
outputFields.subList(numOfOutputFields - numOfAggList, numOfOutputFields);
947962
reordered.addAll(aggRexList);
963+
// Add group by columns
964+
List<RexNode> aliasedGroupByList =
965+
aggregationAttributes.getLeft().stream()
966+
.map(this::extractAliasLiteral)
967+
.flatMap(Optional::stream)
968+
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
969+
.map(context.relBuilder::field)
970+
.map(f -> (RexNode) f)
971+
.toList();
948972
reordered.addAll(aliasedGroupByList);
949973
context.relBuilder.project(reordered);
950974

951975
return context.relBuilder.peek();
952976
}
953977

978+
private Optional<UnresolvedExpression> getTimeSpanField(UnresolvedExpression expr) {
979+
if (Objects.isNull(expr)) return Optional.empty();
980+
if (expr instanceof Span span && SpanUnit.isTimeUnit(span.getUnit())) {
981+
return Optional.of(span.getField());
982+
}
983+
if (expr instanceof Alias alias) {
984+
return getTimeSpanField(alias.getDelegated());
985+
}
986+
return Optional.empty();
987+
}
988+
954989
/** extract the RexLiteral of Alias from a node */
955990
private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
956991
if (node == null) {

docs/user/ppl/cmd/stats.rst

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ stats [bucket_nullable=bool] <aggregation>... [by-clause]
5959
* span-expression: optional, at most one.
6060

6161
* Syntax: span(field_expr, interval_expr)
62-
* Description: The unit of the interval expression is the natural unit by default. If the field is a date and time type field, and the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``.
63-
62+
* Description: The unit of the interval expression is the natural unit by default. **If the field is a date/time type field, the aggregation results always ignore null bucket**. And the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``.
6463
* Available time unit:
6564

6665
+----------------------------+
@@ -923,3 +922,58 @@ PPL query::
923922
|-------------------------------------|
924923
| ["Amber","Dale","Hattie","Nanette"] |
925924
+-------------------------------------+
925+
926+
927+
Example 17: Span on date/time field always ignore null bucket
928+
=============================================================
929+
930+
Index example data:
931+
932+
+-------+--------+------------+
933+
| Name | DEPTNO | birthday |
934+
+=======+========+============+
935+
| Alice | 1 | 2024-04-21 |
936+
+-------+--------+------------+
937+
| Bob | 2 | 2025-08-21 |
938+
+-------+--------+------------+
939+
| Jeff | null | 2025-04-22 |
940+
+-------+--------+------------+
941+
| Adam | 2 | null |
942+
+-------+--------+------------+
943+
944+
PPL query::
945+
946+
PPL> source=example | stats count() as cnt by span(birthday, 1y) as year;
947+
fetched rows / total rows = 3/3
948+
+-----+------------+
949+
| cnt | year |
950+
|-----+------------|
951+
| 1 | 2024-01-01 |
952+
| 2 | 2025-01-01 |
953+
+-----+------------+
954+
955+
956+
PPL query::
957+
958+
PPL> source=example | stats count() as cnt by span(birthday, 1y) as year, DEPTNO;
959+
fetched rows / total rows = 3/3
960+
+-----+------------+--------+
961+
| cnt | year | DEPTNO |
962+
|-----+------------+--------|
963+
| 1 | 2024-01-01 | 1 |
964+
| 1 | 2025-01-01 | 2 |
965+
| 1 | 2025-01-01 | null |
966+
+-----+------------+--------+
967+
968+
969+
PPL query::
970+
971+
PPL> source=example | stats bucket_nullable=false count() as cnt by span(birthday, 1y) as year, DEPTNO;
972+
fetched rows / total rows = 3/3
973+
+-----+------------+--------+
974+
| cnt | year | DEPTNO |
975+
|-----+------------+--------|
976+
| 1 | 2024-01-01 | 1 |
977+
| 1 | 2025-01-01 | 2 |
978+
+-----+------------+--------+
979+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8-
import static org.junit.Assert.assertTrue;
9-
import static org.opensearch.sql.legacy.TestUtils.*;
108
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
119
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1210
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,15 +518,20 @@ public void testCountByNullableTimeSpan() throws IOException {
518518
JSONObject actual =
519519
executeQuery(
520520
String.format(
521-
"source=%s | head 5 | stats count(datetime0), count(datetime1) by span(datetime1,"
522-
+ " 15 minute) as datetime_span",
521+
"source=%s | head 5 | stats count(datetime0), count(datetime1) by span(time1,"
522+
+ " 15 minute) as time_span",
523523
TEST_INDEX_CALCS));
524524
verifySchema(
525525
actual,
526-
schema("datetime_span", "timestamp"),
526+
schema("time_span", "time"),
527527
schema("count(datetime0)", "bigint"),
528528
schema("count(datetime1)", "bigint"));
529-
verifyDataRows(actual, rows(5, 0, null));
529+
verifyDataRows(
530+
actual,
531+
rows(1, 0, "19:30:00"),
532+
rows(1, 0, "02:00:00"),
533+
rows(1, 0, "09:30:00"),
534+
rows(1, 0, "22:45:00"));
530535
}
531536

532537
@Test

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,11 @@ public enum Index {
872872
"time_data",
873873
getMappingFile("time_test_data_index_mapping.json"),
874874
"src/test/resources/time_test_data.json"),
875+
TIME_TEST_DATA_WITH_NULL(
876+
TestsConstants.TEST_INDEX_TIME_DATE_NULL,
877+
"time_data_with_null",
878+
getMappingFile("time_test_data_index_mapping.json"),
879+
"src/test/resources/time_test_data_with_null.json"),
875880
EVENTS(
876881
"events",
877882
"events",

integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class TestsConstants {
8282
public static final String TEST_INDEX_HDFS_LOGS = TEST_INDEX + "_hdfs_logs";
8383
public static final String TEST_INDEX_LOGS = TEST_INDEX + "_logs";
8484
public static final String TEST_INDEX_OTEL_LOGS = TEST_INDEX + "_otel_logs";
85+
public static final String TEST_INDEX_TIME_DATE_NULL = TEST_INDEX + "_time_date_null";
8586

8687
public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
8788
public static final String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,6 @@ public void testStatsBySpan() throws IOException {
455455

456456
@Test
457457
public void testStatsBySpanNonBucketNullable() throws IOException {
458-
// TODO isNotNull(Span) pushdown to script, can be optimized to exist()
459458
String expected = loadExpectedPlan("explain_stats_by_span_non_bucket_nullable.json");
460459
assertJsonEqualsIgnoreId(
461460
expected,
@@ -478,6 +477,14 @@ public void testStatsByTimeSpan() throws IOException {
478477
expected,
479478
explainQueryToString(
480479
String.format("source=%s | stats count() by span(birthdate,1M)", TEST_INDEX_BANK)));
480+
481+
// bucket_nullable doesn't impact by-span-time
482+
assertJsonEqualsIgnoreId(
483+
expected,
484+
explainQueryToString(
485+
String.format(
486+
"source=%s | stats bucket_nullable=false count() by span(birthdate,1M)",
487+
TEST_INDEX_BANK)));
481488
}
482489

483490
@Ignore("https://github.com/opensearch-project/OpenSearch/issues/3725")

integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
99
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES;
11+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATE_NULL;
1112
import static org.opensearch.sql.util.MatcherUtils.rows;
1213
import static org.opensearch.sql.util.MatcherUtils.schema;
1314
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -27,6 +28,7 @@ public void init() throws Exception {
2728
loadIndex(Index.ACCOUNT);
2829
loadIndex(Index.BANK_WITH_NULL_VALUES);
2930
loadIndex(Index.BANK);
31+
loadIndex(Index.TIME_TEST_DATA_WITH_NULL);
3032
}
3133

3234
@Test
@@ -749,4 +751,25 @@ public void testDisableLegacyPreferred() throws IOException {
749751
rows(null, 36));
750752
});
751753
}
754+
755+
@Test
756+
public void testStatsBySpanTimeWithNullBucket() throws IOException {
757+
JSONObject response =
758+
executeQuery(
759+
String.format(
760+
"source=%s | stats percentile(value, 50) as p50 by span(@timestamp, 12h) as"
761+
+ " half_day",
762+
TEST_INDEX_TIME_DATE_NULL));
763+
verifySchema(response, schema("p50", null, "int"), schema("half_day", null, "timestamp"));
764+
verifyDataRows(
765+
response,
766+
rows(8523, "2025-07-28 00:00:00"),
767+
rows(8094, "2025-07-28 12:00:00"),
768+
rows(8429, "2025-07-29 00:00:00"),
769+
rows(8216, "2025-07-29 12:00:00"),
770+
rows(8493, "2025-07-30 00:00:00"),
771+
rows(8426, "2025-07-30 12:00:00"),
772+
rows(8213, "2025-07-31 00:00:00"),
773+
rows(8490, "2025-07-31 12:00:00"));
774+
}
752775
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])\n LogicalProject(count()=[$1], state=[$0])\n LogicalFilter(condition=[IS NOT NULL($0)])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->IS NOT NULL($7), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"exists\":{\"field\":\"state\",\"boost\":1.0}},\"sort\":[],\"aggregations\":{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"size\":1000,\"min_doc_count\":1,\"shard_min_doc_count\":0,\"show_term_doc_count_error\":false,\"order\":{\"_key\":\"asc\"}},\"aggregations\":{\"count()\":{\"value_count\":{\"field\":\"_index\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])\n LogicalProject(count()=[$1], state=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(state=[$7])\n LogicalFilter(condition=[IS NOT NULL($7)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->IS NOT NULL($7), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"exists\":{\"field\":\"state\",\"boost\":1.0}},\"sort\":[],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":false,\"order\":\"asc\"}}}]},\"aggregations\":{\"count()\":{\"value_count\":{\"field\":\"_index\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}

0 commit comments

Comments
 (0)