Skip to content

Commit 64a8671

Browse files
authored
Add values stats function with UDAF (#4276)
* Add stats function Signed-off-by: ps48 <[email protected]> * add settings for max values Signed-off-by: ps48 <[email protected]> * update functiontypetest IT Signed-off-by: ps48 <[email protected]> * update documentation for values settings Signed-off-by: ps48 <[email protected]> * update the rst docs, remove settingsholder Signed-off-by: ps48 <[email protected]> * update AST additions Signed-off-by: ps48 <[email protected]> * updated the IT testValuesFunctionGroupBy Signed-off-by: ps48 <[email protected]> --------- Signed-off-by: ps48 <[email protected]>
1 parent 7088e08 commit 64a8671

File tree

20 files changed

+646
-24
lines changed

20 files changed

+646
-24
lines changed

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public enum Key {
3030
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
3131
PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"),
3232
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
33+
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
3334
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
3435

3536
/** Enable Calcite as execution engine */

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ListAggFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
* <li>Order of values in the result is non-deterministic
2222
* </ul>
2323
*
24-
* <p>Note: Similar to the TAKE function, LIST does not guarantee any specific order of values in
25-
* the result array. The order may vary between executions and depends on the underlying query
26-
* execution plan and optimizations.
24+
* <p>LIST does not guarantee any specific order of values in the result array. The order may vary
25+
* between executions and depends on the underlying query execution plan and optimizations.
2726
*/
2827
public class ListAggFunction implements UserDefinedAggFunction<ListAggFunction.ListAccumulator> {
2928

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.udf.udaf;
7+
8+
import java.util.ArrayList;
9+
import java.util.Set;
10+
import java.util.TreeSet;
11+
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
12+
13+
/**
14+
* VALUES aggregate function implementation. Returns distinct values from a field in lexicographical
15+
* order as a multivalue field.
16+
*
17+
* <p>Behavior:
18+
*
19+
* <ul>
20+
* <li>Returns unique values only (no duplicates)
21+
* <li>Values are sorted in lexicographical order
22+
* <li>Processes field values as strings (casts all inputs to strings)
23+
* <li>Configurable limit via plugins.ppl.values.max.limit setting (0 = unlimited)
24+
* <li>Supports only scalar data types (rejects STRUCT/ARRAY types)
25+
* <li>Implementation uses TreeSet for automatic sorting and deduplication
26+
* </ul>
27+
*/
28+
public class ValuesAggFunction
29+
implements UserDefinedAggFunction<ValuesAggFunction.ValuesAccumulator> {
30+
31+
@Override
32+
public ValuesAccumulator init() {
33+
return new ValuesAccumulator();
34+
}
35+
36+
@Override
37+
public Object result(ValuesAccumulator accumulator) {
38+
return accumulator.value();
39+
}
40+
41+
@Override
42+
public ValuesAccumulator add(ValuesAccumulator acc, Object... values) {
43+
// Handle case where no values are passed
44+
if (values == null || values.length == 0) {
45+
return acc;
46+
}
47+
48+
Object value = values[0];
49+
50+
// Get limit from second argument (passed from AST)
51+
int limit = 0; // Default to unlimited
52+
if (values.length > 1 && values[1] != null) {
53+
limit = (Integer) values[1];
54+
}
55+
56+
// Filter out null values and check limit
57+
if (value != null && (limit == 0 || acc.size() < limit)) {
58+
// Convert value to string
59+
String stringValue = String.valueOf(value);
60+
acc.add(stringValue, limit);
61+
}
62+
63+
return acc;
64+
}
65+
66+
public static class ValuesAccumulator implements Accumulator {
67+
private final Set<String> values;
68+
69+
public ValuesAccumulator() {
70+
this.values = new TreeSet<>(); // TreeSet maintains sorted order and uniqueness
71+
}
72+
73+
@Override
74+
public Object value(Object... argList) {
75+
return new ArrayList<>(values); // Return List<String> to match expected type
76+
}
77+
78+
public void add(String value, int limit) {
79+
if (limit == 0 || values.size() < limit) {
80+
values.add(value);
81+
}
82+
}
83+
84+
public int size() {
85+
return values.size();
86+
}
87+
}
88+
}

core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,45 @@ public class PPLOperandTypes {
2222
// This class is not meant to be instantiated.
2323
private PPLOperandTypes() {}
2424

25+
/** List of all scalar type signatures (single parameter each) */
26+
private static final java.util.List<java.util.List<org.opensearch.sql.data.type.ExprType>>
27+
SCALAR_TYPES =
28+
java.util.List.of(
29+
// Numeric types
30+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BYTE),
31+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.SHORT),
32+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.INTEGER),
33+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.LONG),
34+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.FLOAT),
35+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DOUBLE),
36+
// String type
37+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.STRING),
38+
// Boolean type
39+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BOOLEAN),
40+
// Temporal types
41+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DATE),
42+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIME),
43+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP),
44+
// Special scalar types
45+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.IP),
46+
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BINARY));
47+
48+
/** Helper method to create scalar types with optional integer parameter */
49+
private static java.util.List<java.util.List<org.opensearch.sql.data.type.ExprType>>
50+
createScalarWithOptionalInteger() {
51+
java.util.List<java.util.List<org.opensearch.sql.data.type.ExprType>> result =
52+
new java.util.ArrayList<>(SCALAR_TYPES);
53+
54+
// Add scalar + integer combinations
55+
SCALAR_TYPES.forEach(
56+
scalarType ->
57+
result.add(
58+
java.util.List.of(
59+
scalarType.get(0), org.opensearch.sql.data.type.ExprCoreType.INTEGER)));
60+
61+
return result;
62+
}
63+
2564
public static final UDFOperandMetadata NONE = UDFOperandMetadata.wrap(OperandTypes.family());
2665
public static final UDFOperandMetadata OPTIONAL_ANY =
2766
UDFOperandMetadata.wrap(
@@ -200,25 +239,12 @@ private PPLOperandTypes() {}
200239
* booleans, datetime types, and special scalar types like IP and BINARY. Excludes complex types
201240
* like arrays, structs, and maps.
202241
*/
203-
public static final UDFOperandMetadata ANY_SCALAR =
204-
UDFOperandMetadata.wrapUDT(
205-
java.util.List.of(
206-
// Numeric types
207-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BYTE),
208-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.SHORT),
209-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.INTEGER),
210-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.LONG),
211-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.FLOAT),
212-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DOUBLE),
213-
// String type
214-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.STRING),
215-
// Boolean type
216-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BOOLEAN),
217-
// Temporal types
218-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DATE),
219-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIME),
220-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP),
221-
// Special scalar types
222-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.IP),
223-
java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BINARY)));
242+
public static final UDFOperandMetadata ANY_SCALAR = UDFOperandMetadata.wrapUDT(SCALAR_TYPES);
243+
244+
/**
245+
* Operand type checker that accepts any scalar type with an optional integer argument. This is
246+
* used for aggregation functions that take a field and an optional limit/size parameter.
247+
*/
248+
public static final UDFOperandMetadata ANY_SCALAR_OPTIONAL_INTEGER =
249+
UDFOperandMetadata.wrapUDT(createScalarWithOptionalInteger());
224250
}

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ public enum BuiltinFunctionName {
207207

208208
// Multivalue aggregation function
209209
LIST(FunctionName.of("list")),
210+
VALUES(FunctionName.of("values")),
210211
// Not always an aggregation query
211212
NESTED(FunctionName.of("nested")),
212213
// Document order aggregation functions
@@ -364,6 +365,7 @@ public enum BuiltinFunctionName {
364365
.put("latest", BuiltinFunctionName.LATEST)
365366
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
366367
.put("list", BuiltinFunctionName.LIST)
368+
.put("values", BuiltinFunctionName.VALUES)
367369
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
368370
.put("first", BuiltinFunctionName.FIRST)
369371
.put("last", BuiltinFunctionName.LAST)

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction;
3737
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
3838
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
39+
import org.opensearch.sql.calcite.udf.udaf.ValuesAggFunction;
3940
import org.opensearch.sql.calcite.utils.PPLOperandTypes;
4041
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
4142
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
@@ -450,6 +451,12 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
450451
public static final SqlAggFunction LIST =
451452
createUserDefinedAggFunction(
452453
ListAggFunction.class, "LIST", PPLReturnTypes.STRING_ARRAY, PPLOperandTypes.ANY_SCALAR);
454+
public static final SqlAggFunction VALUES =
455+
createUserDefinedAggFunction(
456+
ValuesAggFunction.class,
457+
"VALUES",
458+
PPLReturnTypes.STRING_ARRAY,
459+
PPLOperandTypes.ANY_SCALAR_OPTIONAL_INTEGER);
453460

454461
public static final SqlOperator ENHANCED_COALESCE =
455462
new EnhancedCoalesceFunction().toUDF("COALESCE");

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@
217217
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_DATE;
218218
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIME;
219219
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIMESTAMP;
220+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VALUES;
220221
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARPOP;
221222
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARSAMP;
222223
import static org.opensearch.sql.expression.function.BuiltinFunctionName.WEEK;
@@ -1120,6 +1121,7 @@ void populate() {
11201121
registerOperator(TAKE, PPLBuiltinOperators.TAKE);
11211122
registerOperator(INTERNAL_PATTERN, PPLBuiltinOperators.INTERNAL_PATTERN);
11221123
registerOperator(LIST, PPLBuiltinOperators.LIST);
1124+
registerOperator(VALUES, PPLBuiltinOperators.VALUES);
11231125

11241126
register(
11251127
AVG,

docs/user/ppl/admin/settings.rst

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,85 @@ PPL query::
226226
}
227227
}
228228
}
229+
230+
plugins.ppl.values.max.limit
231+
============================
232+
233+
Description
234+
-----------
235+
236+
This setting controls the maximum number of unique values that the ``VALUES`` aggregation function can return. When set to 0 (the default), there is no limit on the number of unique values returned. When set to a positive integer, the function will return at most that many unique values.
237+
238+
1. The default value is 0 (unlimited).
239+
2. This setting is node scope.
240+
3. This setting can be updated dynamically.
241+
242+
The ``VALUES`` function collects all unique values from a field and returns them in lexicographical order. This setting helps manage memory usage by limiting the number of values collected.
243+
244+
Example 1
245+
---------
246+
247+
Set the limit to 1000 unique values:
248+
249+
PPL query::
250+
251+
sh$ curl -sS -H 'Content-Type: application/json' \
252+
... -X PUT localhost:9200/_plugins/_query/settings \
253+
... -d '{"transient" : {"plugins.ppl.values.max.limit" : "1000"}}'
254+
{
255+
"acknowledged": true,
256+
"persistent": {},
257+
"transient": {
258+
"plugins": {
259+
"ppl": {
260+
"values": {
261+
"max": {
262+
"limit": "1000"
263+
}
264+
}
265+
}
266+
}
267+
}
268+
}
269+
270+
Example 2
271+
---------
272+
273+
Reset to default (unlimited) by setting to null:
274+
275+
PPL query::
276+
277+
sh$ curl -sS -H 'Content-Type: application/json' \
278+
... -X PUT localhost:9200/_plugins/_query/settings \
279+
... -d '{"transient" : {"plugins.ppl.values.max.limit" : null}}'
280+
{
281+
"acknowledged": true,
282+
"persistent": {},
283+
"transient": {}
284+
}
285+
286+
Example 3
287+
---------
288+
289+
Set to 0 explicitly for unlimited values:
290+
291+
PPL query::
292+
293+
sh$ curl -sS -H 'Content-Type: application/json' \
294+
... -X PUT localhost:9200/_plugins/_query/settings \
295+
... -d '{"transient" : {"plugins.ppl.values.max.limit" : "0"}}'
296+
{
297+
"acknowledged": true,
298+
"persistent": {},
299+
"transient": {
300+
"plugins": {
301+
"ppl": {
302+
"values": {
303+
"max": {
304+
"limit": "0"
305+
}
306+
}
307+
}
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)