Skip to content

Commit e232f6a

Browse files
authored
Support first/last aggregate functions for PPL (#4223)
* Support first/last aggregation functions for PPL Signed-off-by: Kai Huang <[email protected]> * Support null Signed-off-by: Kai Huang <[email protected]> * remove legacy Signed-off-by: Kai Huang <[email protected]> * update doc Signed-off-by: Kai Huang <[email protected]> * fix doctest Signed-off-by: Kai Huang <[email protected]> * fix stats.rst file Signed-off-by: Kai Huang <[email protected]> * fixes Signed-off-by: Kai Huang <[email protected]> * move pushdown logic to AggregateAnalyzer Signed-off-by: Kai Huang <[email protected]> * fix IT and update null handling Signed-off-by: Kai Huang <[email protected]> * add test cases for null handling Signed-off-by: Kai Huang <[email protected]> * handle parallelism Signed-off-by: Kai Huang <[email protected]> * Simplify CalciteExplainIT and add UT for AggregateAnalyzer Signed-off-by: Kai Huang <[email protected]> # Conflicts: # opensearch/src/test/java/org/opensearch/sql/opensearch/request/AggregateAnalyzerTest.java * fixes Signed-off-by: Kai Huang <[email protected]> --------- Signed-off-by: Kai Huang <[email protected]>
1 parent d46cb4c commit e232f6a

File tree

15 files changed

+869
-9
lines changed

15 files changed

+869
-9
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.udf.udaf;
7+
8+
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
9+
10+
/**
11+
* FIRST aggregation function - returns the first non-null value in natural document order. Returns
12+
* NULL if no records exist, or if all records have NULL values for the field.
13+
*/
14+
public class FirstAggFunction implements UserDefinedAggFunction<FirstAggFunction.FirstAccumulator> {
15+
16+
@Override
17+
public FirstAccumulator init() {
18+
return new FirstAccumulator();
19+
}
20+
21+
@Override
22+
public Object result(FirstAccumulator accumulator) {
23+
return accumulator.value();
24+
}
25+
26+
@Override
27+
public FirstAccumulator add(FirstAccumulator acc, Object... values) {
28+
Object candidateValue = values[0];
29+
// Only accept the first non-null value encountered
30+
// Skip null values to find the first actual value
31+
if (candidateValue != null) {
32+
acc.setValue(candidateValue);
33+
}
34+
return acc;
35+
}
36+
37+
public static class FirstAccumulator implements Accumulator {
38+
private volatile Object first;
39+
private volatile boolean hasValue;
40+
41+
public FirstAccumulator() {
42+
this.first = null;
43+
this.hasValue = false;
44+
}
45+
46+
public synchronized void setValue(Object value) {
47+
if (!hasValue) {
48+
this.first = value;
49+
this.hasValue = true;
50+
}
51+
}
52+
53+
@Override
54+
public Object value(Object... argList) {
55+
return first;
56+
}
57+
58+
public int size() {
59+
return hasValue ? 1 : 0;
60+
}
61+
}
62+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.udf.udaf;
7+
8+
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
9+
10+
/**
11+
* LAST aggregation function - returns the last non-null value in natural document order. Returns
12+
* NULL if no records exist, or if all records have NULL values for the field.
13+
*/
14+
public class LastAggFunction implements UserDefinedAggFunction<LastAggFunction.LastAccumulator> {
15+
16+
@Override
17+
public LastAccumulator init() {
18+
return new LastAccumulator();
19+
}
20+
21+
@Override
22+
public Object result(LastAccumulator accumulator) {
23+
return accumulator.value();
24+
}
25+
26+
@Override
27+
public LastAccumulator add(LastAccumulator acc, Object... values) {
28+
Object candidateValue = values[0];
29+
// Only update with non-null values to keep the last non-null value
30+
// Skip null values to preserve the previous non-null value
31+
if (candidateValue != null) {
32+
acc.setValue(candidateValue);
33+
}
34+
return acc;
35+
}
36+
37+
public static class LastAccumulator implements Accumulator {
38+
private volatile Object last;
39+
40+
public LastAccumulator() {
41+
this.last = null;
42+
}
43+
44+
public synchronized void setValue(Object value) {
45+
this.last = value;
46+
}
47+
48+
@Override
49+
public Object value(Object... argList) {
50+
return last;
51+
}
52+
53+
public int size() {
54+
return last != null ? 1 : 0;
55+
}
56+
}
57+
}

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ public enum BuiltinFunctionName {
207207
LIST(FunctionName.of("list")),
208208
// Not always an aggregation query
209209
NESTED(FunctionName.of("nested")),
210+
// Document order aggregation functions
211+
FIRST(FunctionName.of("first")),
212+
LAST(FunctionName.of("last")),
210213

211214
/** Text Functions. */
212215
ASCII(FunctionName.of("ascii")),
@@ -359,6 +362,8 @@ public enum BuiltinFunctionName {
359362
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
360363
.put("list", BuiltinFunctionName.LIST)
361364
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
365+
.put("first", BuiltinFunctionName.FIRST)
366+
.put("last", BuiltinFunctionName.LAST)
362367
.build();
363368

364369
private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.calcite.sql.type.SqlTypeTransforms;
3030
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
3131
import org.apache.calcite.util.BuiltInMethod;
32+
import org.opensearch.sql.calcite.udf.udaf.FirstAggFunction;
33+
import org.opensearch.sql.calcite.udf.udaf.LastAggFunction;
3234
import org.opensearch.sql.calcite.udf.udaf.ListAggFunction;
3335
import org.opensearch.sql.calcite.udf.udaf.LogPatternAggFunction;
3436
import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction;
@@ -423,6 +425,12 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
423425
"TAKE",
424426
PPLReturnTypes.ARG0_ARRAY,
425427
PPLOperandTypes.ANY_OPTIONAL_INTEGER);
428+
public static final SqlAggFunction FIRST =
429+
createUserDefinedAggFunction(
430+
FirstAggFunction.class, "FIRST", ReturnTypes.ARG0, PPLOperandTypes.ANY_OPTIONAL_INTEGER);
431+
public static final SqlAggFunction LAST =
432+
createUserDefinedAggFunction(
433+
LastAggFunction.class, "LAST", ReturnTypes.ARG0, PPLOperandTypes.ANY_OPTIONAL_INTEGER);
426434
public static final SqlAggFunction PERCENTILE_APPROX =
427435
createUserDefinedAggFunction(
428436
PercentileApproxFunction.class,

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static org.opensearch.sql.expression.function.BuiltinFunctionName.EXPM1;
6767
import static org.opensearch.sql.expression.function.BuiltinFunctionName.EXTRACT;
6868
import static org.opensearch.sql.expression.function.BuiltinFunctionName.FILTER;
69+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.FIRST;
6970
import static org.opensearch.sql.expression.function.BuiltinFunctionName.FLOOR;
7071
import static org.opensearch.sql.expression.function.BuiltinFunctionName.FORALL;
7172
import static org.opensearch.sql.expression.function.BuiltinFunctionName.FROM_DAYS;
@@ -102,6 +103,7 @@
102103
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_OBJECT;
103104
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_SET;
104105
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_VALID;
106+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LAST;
105107
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LAST_DAY;
106108
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LATEST;
107109
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LEFT;
@@ -1176,6 +1178,26 @@ void populate() {
11761178
},
11771179
wrapSqlOperandTypeChecker(
11781180
SqlStdOperatorTable.ARG_MAX.getOperandTypeChecker(), LATEST.name(), false));
1181+
1182+
// Register FIRST function - uses document order
1183+
register(
1184+
FIRST,
1185+
(distinct, field, argList, ctx) -> {
1186+
// Use our custom FirstAggFunction for document order aggregation
1187+
return ctx.relBuilder.aggregateCall(PPLBuiltinOperators.FIRST, field);
1188+
},
1189+
wrapSqlOperandTypeChecker(
1190+
PPLBuiltinOperators.FIRST.getOperandTypeChecker(), FIRST.name(), false));
1191+
1192+
// Register LAST function - uses document order
1193+
register(
1194+
LAST,
1195+
(distinct, field, argList, ctx) -> {
1196+
// Use our custom LastAggFunction for document order aggregation
1197+
return ctx.relBuilder.aggregateCall(PPLBuiltinOperators.LAST, field);
1198+
},
1199+
wrapSqlOperandTypeChecker(
1200+
PPLBuiltinOperators.LAST.getOperandTypeChecker(), LAST.name(), false));
11791201
}
11801202
}
11811203

docs/user/ppl/cmd/stats.rst

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ The following table dataSources the aggregation functions and also indicates how
2828
+----------+-------------+-------------+
2929
| MIN | Ignore | Ignore |
3030
+----------+-------------+-------------+
31+
| FIRST | Ignore | Ignore |
32+
+----------+-------------+-------------+
33+
| LAST | Ignore | Ignore |
34+
+----------+-------------+-------------+
3135
| LIST | Ignore | Ignore |
3236
+----------+-------------+-------------+
3337

@@ -463,13 +467,83 @@ Example with custom time field::
463467
| inactive | users |
464468
+----------------------------+----------+
465469

470+
FIRST
471+
-----
472+
473+
Description
474+
>>>>>>>>>>>
475+
476+
Version: 3.3.0
477+
478+
Usage: FIRST(field). Return the first non-null value of a field based on natural document order. Returns NULL if no records exist, or if all records have NULL values for the field.
479+
480+
* field: mandatory. The field to return the first value for.
481+
482+
Note: This function requires Calcite to be enabled (see `Configuration`_ section above).
483+
484+
Example::
485+
486+
os> source=accounts | stats first(firstname) by gender;
487+
fetched rows / total rows = 2/2
488+
+------------------+--------+
489+
| first(firstname) | gender |
490+
|------------------+--------|
491+
| Nanette | F |
492+
| Amber | M |
493+
+------------------+--------+
494+
495+
Example with count aggregation::
496+
497+
os> source=accounts | stats first(firstname), count() by gender;
498+
fetched rows / total rows = 2/2
499+
+------------------+---------+--------+
500+
| first(firstname) | count() | gender |
501+
|------------------+---------+--------|
502+
| Nanette | 1 | F |
503+
| Amber | 3 | M |
504+
+------------------+---------+--------+
505+
506+
LAST
507+
----
508+
509+
Description
510+
>>>>>>>>>>>
511+
512+
Version: 3.3.0
513+
514+
Usage: LAST(field). Return the last non-null value of a field based on natural document order. Returns NULL if no records exist, or if all records have NULL values for the field.
515+
516+
* field: mandatory. The field to return the last value for.
517+
518+
Note: This function requires Calcite to be enabled (see `Configuration`_ section above).
519+
520+
Example::
521+
522+
os> source=accounts | stats last(firstname) by gender;
523+
fetched rows / total rows = 2/2
524+
+-----------------+--------+
525+
| last(firstname) | gender |
526+
|-----------------+--------|
527+
| Nanette | F |
528+
| Dale | M |
529+
+-----------------+--------+
530+
531+
Example with different fields::
532+
533+
os> source=accounts | stats first(account_number), last(balance), first(age);
534+
fetched rows / total rows = 1/1
535+
+-----------------------+---------------+------------+
536+
| first(account_number) | last(balance) | first(age) |
537+
|-----------------------+---------------+------------|
538+
| 1 | 4180 | 32 |
539+
+-----------------------+---------------+------------+
540+
466541
LIST
467542
----
468543

469544
Description
470545
>>>>>>>>>>>
471546

472-
=======
473547
Version: 3.3.0 (Calcite engine only)
474548

475549
Usage: LIST(expr). Collects all values from the specified expression into an array. Values are converted to strings, nulls are filtered, and duplicates are preserved.

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,19 @@ public void testExplainOnEarliestLatestWithCustomTimeField() throws IOException
438438
TEST_INDEX_LOGS)));
439439
}
440440

441+
// Only for Calcite
442+
@Test
443+
public void testExplainOnFirstLast() throws IOException {
444+
String expected = loadExpectedPlan("explain_first_last.json");
445+
assertJsonEqualsIgnoreId(
446+
expected,
447+
explainQueryToString(
448+
String.format(
449+
"source=%s | stats first(firstname) as first_name, last(firstname) as"
450+
+ " last_name by gender",
451+
TEST_INDEX_BANK)));
452+
}
453+
441454
@Test
442455
public void testListAggregationExplain() throws IOException {
443456
String expected = loadExpectedPlan("explain_list_aggregation.json");

0 commit comments

Comments
 (0)