Skip to content

Commit 0e2dc7c

Browse files
authored
Speed up aggregation pushdown for single group-by expression (#3550)
* Speed up aggregation pushdown for single group-by expression Signed-off-by: Lantao Jin <[email protected]> * Add configs nullable_bucket Signed-off-by: Lantao Jin <[email protected]> * Fix IT Signed-off-by: Lantao Jin <[email protected]> * revert typo Signed-off-by: Lantao Jin <[email protected]> * Fix conflicts error Signed-off-by: Lantao Jin <[email protected]> * fix unit tests Signed-off-by: Lantao Jin <[email protected]> * Fix order Signed-off-by: Lantao Jin <[email protected]> * Fix UT Signed-off-by: Lantao Jin <[email protected]> * Fix UT in windows Signed-off-by: Lantao Jin <[email protected]> * fix compile error of conflicts Signed-off-by: Lantao Jin <[email protected]> * Add more ITs after merging push down limit to agg buckets Signed-off-by: Lantao Jin <[email protected]> * fix IT Signed-off-by: Lantao Jin <[email protected]> * address comments Signed-off-by: Lantao Jin <[email protected]> * Clear sorts in source builder for aggregation pushdown Signed-off-by: Lantao Jin <[email protected]> * Delete the TODO of v2, it's resolved now Signed-off-by: Lantao Jin <[email protected]> * fix doctest Signed-off-by: Lantao Jin <[email protected]> --------- Signed-off-by: Lantao Jin <[email protected]>
1 parent 7de8545 commit 0e2dc7c

File tree

63 files changed

+1757
-535
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1757
-535
lines changed

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public enum Key {
3030
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
3131
PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"),
3232
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
33+
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
3334

3435
/** Enable Calcite as execution engine */
3536
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,11 @@ private LogicalAggregation analyzeAggregation(
880880
groupBys.forEach(
881881
group ->
882882
newEnv.define(new Symbol(Namespace.FIELD_NAME, group.getNameOrAlias()), group.type()));
883-
return new LogicalAggregation(child, aggregators, groupBys);
883+
884+
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
885+
boolean bucketNullable =
886+
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
887+
return new LogicalAggregation(child, aggregators, groupBys, bucketNullable);
884888
}
885889

886890
private Aggregation analyzePatternsAgg(Patterns node) {

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ public static List<Argument> defaultStatsArgs() {
449449
argument("partitions", intLiteral(1)),
450450
argument("allnum", booleanLiteral(false)),
451451
argument("delim", stringLiteral(" ")),
452+
argument(Argument.BUCKET_NULLABLE, booleanLiteral(true)),
452453
argument("dedupsplit", booleanLiteral(false)));
453454
}
454455

core/src/main/java/org/opensearch/sql/ast/expression/Argument.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
@RequiredArgsConstructor
2121
@EqualsAndHashCode(callSuper = false)
2222
public class Argument extends UnresolvedExpression {
23+
public static final String BUCKET_NULLABLE = "bucket_nullable";
24+
2325
private final String argName;
2426
private final Literal value;
2527

@@ -66,5 +68,9 @@ public static ArgumentMap empty() {
6668
public Literal get(String name) {
6769
return map.get(name);
6870
}
71+
72+
public Literal getOrDefault(String name, Literal literal) {
73+
return map.getOrDefault(name, literal);
74+
}
6975
}
7076
}

core/src/main/java/org/opensearch/sql/ast/expression/Literal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,6 @@ public String toString() {
4949

5050
public static Literal TRUE = new Literal(true, DataType.BOOLEAN);
5151
public static Literal FALSE = new Literal(false, DataType.BOOLEAN);
52-
public static Literal ZERO = new Literal(Integer.valueOf("0"), DataType.INTEGER);
52+
public static Literal ZERO = new Literal(0, DataType.INTEGER);
53+
public static Literal ONE = new Literal(1, DataType.INTEGER);
5354
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
import org.apache.calcite.rel.RelNode;
4242
import org.apache.calcite.rel.core.Aggregate;
4343
import org.apache.calcite.rel.core.JoinRelType;
44+
import org.apache.calcite.rel.hint.HintStrategyTable;
45+
import org.apache.calcite.rel.hint.RelHint;
46+
import org.apache.calcite.rel.logical.LogicalAggregate;
4447
import org.apache.calcite.rel.logical.LogicalValues;
4548
import org.apache.calcite.rel.type.RelDataTypeField;
4649
import org.apache.calcite.rex.RexCall;
@@ -880,6 +883,40 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
880883
groupExprList.addAll(node.getGroupExprList());
881884
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
882885
aggregateWithTrimming(groupExprList, aggExprList, context);
886+
// Add group by columns
887+
List<RexNode> aliasedGroupByList =
888+
aggregationAttributes.getLeft().stream()
889+
.map(this::extractAliasLiteral)
890+
.flatMap(Optional::stream)
891+
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
892+
.map(context.relBuilder::field)
893+
.map(f -> (RexNode) f)
894+
.toList();
895+
896+
// add stats hint to LogicalAggregation
897+
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
898+
Boolean bucketNullable =
899+
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
900+
if (!bucketNullable && !aliasedGroupByList.isEmpty()) {
901+
final RelHint statHits =
902+
RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build();
903+
assert context.relBuilder.peek() instanceof LogicalAggregate
904+
: "Stats hits should be added to LogicalAggregate";
905+
context.relBuilder.hints(statHits);
906+
context
907+
.relBuilder
908+
.getCluster()
909+
.setHintStrategies(
910+
HintStrategyTable.builder()
911+
.hintStrategy(
912+
"stats_args",
913+
(hint, rel) -> {
914+
return rel instanceof LogicalAggregate;
915+
})
916+
.build());
917+
context.relBuilder.filter(
918+
aliasedGroupByList.stream().map(context.relBuilder::isNotNull).toList());
919+
}
883920

884921
// schema reordering
885922
// As an example, in command `stats count() by colA, colB`,
@@ -892,15 +929,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
892929
List<RexNode> aggRexList =
893930
outputFields.subList(numOfOutputFields - numOfAggList, numOfOutputFields);
894931
reordered.addAll(aggRexList);
895-
// Add group by columns
896-
List<RexNode> aliasedGroupByList =
897-
aggregationAttributes.getLeft().stream()
898-
.map(this::extractAliasLiteral)
899-
.flatMap(Optional::stream)
900-
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
901-
.map(context.relBuilder::field)
902-
.map(f -> (RexNode) f)
903-
.toList();
904932
reordered.addAll(aliasedGroupByList);
905933
context.relBuilder.project(reordered);
906934

core/src/main/java/org/opensearch/sql/planner/logical/LogicalAggregation.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,27 @@
1414
import org.opensearch.sql.expression.aggregation.NamedAggregator;
1515

1616
/** Logical Aggregation. */
17+
@Getter
1718
@ToString
1819
@EqualsAndHashCode(callSuper = true)
1920
public class LogicalAggregation extends LogicalPlan {
2021

21-
@Getter private final List<NamedAggregator> aggregatorList;
22+
private final List<NamedAggregator> aggregatorList;
2223

23-
@Getter private final List<NamedExpression> groupByList;
24+
private final List<NamedExpression> groupByList;
25+
26+
private final boolean bucketNullable;
2427

2528
/** Constructor of LogicalAggregation. */
2629
public LogicalAggregation(
27-
LogicalPlan child, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
30+
LogicalPlan child,
31+
List<NamedAggregator> aggregatorList,
32+
List<NamedExpression> groupByList,
33+
boolean bucketNullable) {
2834
super(Collections.singletonList(child));
2935
this.aggregatorList = aggregatorList;
3036
this.groupByList = groupByList;
37+
this.bucketNullable = bucketNullable;
3138
}
3239

3340
@Override

core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,19 @@ public static LogicalPlan write(LogicalPlan input, Table table, List<String> col
3838
return new LogicalWrite(input, table, columns);
3939
}
4040

41+
/** Build a logical aggregation with nullable bucket always true. */
4142
public static LogicalPlan aggregation(
4243
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
43-
return new LogicalAggregation(input, aggregatorList, groupByList);
44+
return new LogicalAggregation(input, aggregatorList, groupByList, true);
45+
}
46+
47+
/** Build a logical aggregation with nullable bucket parameter */
48+
public static LogicalPlan aggregation(
49+
LogicalPlan input,
50+
List<NamedAggregator> aggregatorList,
51+
List<NamedExpression> groupByList,
52+
boolean bucketNullable) {
53+
return new LogicalAggregation(input, aggregatorList, groupByList, bucketNullable);
4454
}
4555

4656
public static LogicalPlan filter(LogicalPlan input, Expression expression) {

core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral;
2020
import static org.opensearch.sql.ast.dsl.AstDSL.compare;
2121
import static org.opensearch.sql.ast.dsl.AstDSL.computation;
22+
import static org.opensearch.sql.ast.dsl.AstDSL.exprList;
2223
import static org.opensearch.sql.ast.dsl.AstDSL.field;
2324
import static org.opensearch.sql.ast.dsl.AstDSL.filter;
2425
import static org.opensearch.sql.ast.dsl.AstDSL.filteredAggregate;
@@ -434,7 +435,7 @@ public void stats_source() {
434435
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING)))),
435436
AstDSL.agg(
436437
AstDSL.relation("schema"),
437-
AstDSL.exprList(
438+
exprList(
438439
AstDSL.alias(
439440
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
440441
null,
@@ -486,7 +487,7 @@ public void rename_to_invalid_expression() {
486487
AstDSL.rename(
487488
AstDSL.agg(
488489
AstDSL.relation("schema"),
489-
AstDSL.exprList(
490+
exprList(
490491
AstDSL.alias(
491492
"avg(integer_value)",
492493
AstDSL.aggregate("avg", field("integer_value")))),
@@ -1956,4 +1957,28 @@ public void rex_command_throws_unsupported_operation_exception_in_legacy_engine(
19561957
.attach(relation("schema"))));
19571958
assertEquals("Rex is supported only when plugins.calcite.enabled=true", exception.getMessage());
19581959
}
1960+
1961+
@Test
1962+
public void stats_non_bucket_nullable_test() {
1963+
assertAnalyzeEqual(
1964+
LogicalPlanDSL.aggregation(
1965+
LogicalPlanDSL.relation("schema", table),
1966+
ImmutableList.of(
1967+
DSL.named("avg(integer_value)", DSL.avg(DSL.ref("integer_value", INTEGER)))),
1968+
ImmutableList.of(DSL.named("string_value", DSL.ref("string_value", STRING))),
1969+
false),
1970+
AstDSL.agg(
1971+
AstDSL.relation("schema"),
1972+
exprList(
1973+
AstDSL.alias(
1974+
"avg(integer_value)", AstDSL.aggregate("avg", field("integer_value")))),
1975+
null,
1976+
ImmutableList.of(AstDSL.alias("string_value", field("string_value"))),
1977+
exprList(
1978+
argument("partitions", intLiteral(1)),
1979+
argument("allnum", booleanLiteral(false)),
1980+
argument("delim", stringLiteral(" ")),
1981+
argument(Argument.BUCKET_NULLABLE, booleanLiteral(false)),
1982+
argument("dedupsplit", booleanLiteral(false)))));
1983+
}
19591984
}

docs/user/ppl/admin/settings.rst

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,40 @@ PPL query::
189189
}
190190
}
191191
}
192+
193+
plugins.ppl.syntax.legacy.preferred
194+
===================================
195+
196+
Description
197+
-----------
198+
199+
This configuration is introduced since 3.3.0 which is used to switch some behaviours in PPL syntax. The current default value is ``true``.
200+
The behaviours it controlled includes:
201+
202+
- The default value of argument ``bucket_nullable`` in ``stats`` command. Check `stats command <../cmd/stats.rst>`_ for details.
203+
204+
Example
205+
-------
206+
207+
You can update the setting with a new value like this.
208+
209+
PPL query::
210+
211+
sh$ curl -sS -H 'Content-Type: application/json' \
212+
... -X PUT localhost:9200/_plugins/_query/settings \
213+
... -d '{"transient" : {"plugins.ppl.syntax.legacy.preferred" : "false"}}'
214+
{
215+
"acknowledged": true,
216+
"persistent": {},
217+
"transient": {
218+
"plugins": {
219+
"ppl": {
220+
"syntax": {
221+
"legacy": {
222+
"preferred": "false"
223+
}
224+
}
225+
}
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)