Skip to content

Commit 25320f9

Browse files
qianheng-awspenghuo
authored andcommitted
Calcite enable pushdown aggregation (opensearch-project#3389)
* Change push down to logical index scan Signed-off-by: Heng Qian <qianheng@amazon.com> * Support Aggregate Push Down Signed-off-by: Heng Qian <qianheng@amazon.com> * Rebase and resolve conflict Signed-off-by: Heng Qian <qianheng@amazon.com> * Add TODO Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 9254bfe commit 25320f9

File tree

16 files changed

+804
-320
lines changed

16 files changed

+804
-320
lines changed

core/src/main/java/org/opensearch/sql/calcite/utils/AggregateUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ static RelBuilder.AggCall translate(
4747
// case STDDEV:
4848
// return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV,
4949
// field);
50+
case VARSAMP:
51+
return context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, field);
52+
case VARPOP:
53+
return context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, field);
5054
case STDDEV_POP:
5155
return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, field);
5256
case STDDEV_SAMP:

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLAggregationIT.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,21 @@ public void testApproxCountDistinct() {
178178
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
179179
}
180180

181+
@Test
182+
public void testVarSampVarPop() {
183+
JSONObject actual =
184+
executeQuery(
185+
String.format(
186+
"source=%s | stats var_samp(balance) as vs, var_pop(balance) as vp by gender",
187+
TEST_INDEX_BANK));
188+
verifySchema(
189+
actual, schema("gender", "string"), schema("vs", "double"), schema("vp", "double"));
190+
verifyDataRows(
191+
actual,
192+
rows("F", 58127404, 38751602.666666664),
193+
rows("M", 261699024.91666666, 196274268.6875));
194+
}
195+
181196
@Test
182197
public void testStddevSampStddevPop() {
183198
JSONObject actual =

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/EnumerableIndexScanRule.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@
1010
import org.apache.calcite.plan.RelOptRuleCall;
1111
import org.apache.calcite.rel.RelNode;
1212
import org.apache.calcite.rel.convert.ConverterRule;
13-
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalTableScan;
14-
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
13+
import org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan;
14+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
1515

16-
/** Rule to convert a {@link CalciteLogicalTableScan} to a {@link CalciteOpenSearchIndexScan}. */
16+
/** Rule to convert a {@link CalciteLogicalIndexScan} to a {@link CalciteEnumerableIndexScan}. */
1717
public class EnumerableIndexScanRule extends ConverterRule {
1818
/** Default configuration. */
1919
public static final Config DEFAULT_CONFIG =
2020
Config.INSTANCE
2121
.as(Config.class)
2222
.withConversion(
23-
CalciteLogicalTableScan.class,
23+
CalciteLogicalIndexScan.class,
2424
s -> s.getOsIndex() != null,
2525
Convention.NONE,
2626
EnumerableConvention.INSTANCE,
@@ -34,13 +34,19 @@ protected EnumerableIndexScanRule(Config config) {
3434

3535
@Override
3636
public boolean matches(RelOptRuleCall call) {
37-
CalciteLogicalTableScan scan = call.rel(0);
37+
CalciteLogicalIndexScan scan = call.rel(0);
3838
return scan.getVariablesSet().isEmpty();
3939
}
4040

4141
@Override
4242
public RelNode convert(RelNode rel) {
43-
final CalciteLogicalTableScan scan = (CalciteLogicalTableScan) rel;
44-
return new CalciteOpenSearchIndexScan(scan.getCluster(), scan.getTable(), scan.getOsIndex());
43+
final CalciteLogicalIndexScan scan = (CalciteLogicalIndexScan) rel;
44+
return new CalciteEnumerableIndexScan(
45+
scan.getCluster(),
46+
scan.getHints(),
47+
scan.getTable(),
48+
scan.getOsIndex(),
49+
scan.getSchema(),
50+
scan.getPushDownContext());
4551
}
4652
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.sql.opensearch.planner.physical;
6+
7+
import org.apache.calcite.plan.RelOptRuleCall;
8+
import org.apache.calcite.plan.RelRule;
9+
import org.apache.calcite.rel.logical.LogicalAggregate;
10+
import org.immutables.value.Value;
11+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
12+
13+
/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
14+
@Value.Enclosing
15+
public class OpenSearchAggregateIndexScanRule
16+
extends RelRule<OpenSearchAggregateIndexScanRule.Config> {
17+
18+
/** Creates a OpenSearchAggregateIndexScanRule. */
19+
protected OpenSearchAggregateIndexScanRule(Config config) {
20+
super(config);
21+
}
22+
23+
@Override
24+
public void onMatch(RelOptRuleCall call) {
25+
if (call.rels.length == 2) {
26+
// the ordinary variant
27+
final LogicalAggregate aggregate = call.rel(0);
28+
final CalciteLogicalIndexScan scan = call.rel(1);
29+
apply(call, aggregate, scan);
30+
} else {
31+
throw new AssertionError(
32+
String.format(
33+
"The length of rels should be %s but got %s",
34+
this.operands.size(), call.rels.length));
35+
}
36+
}
37+
38+
protected void apply(
39+
RelOptRuleCall call, LogicalAggregate aggregate, CalciteLogicalIndexScan scan) {
40+
CalciteLogicalIndexScan newScan = scan.pushDownAggregate(aggregate);
41+
if (newScan != null) {
42+
call.transformTo(newScan);
43+
}
44+
}
45+
46+
/** Rule configuration. */
47+
@Value.Immutable
48+
public interface Config extends RelRule.Config {
49+
/** Config that matches Aggregate on OpenSearchProjectIndexScanRule. */
50+
Config DEFAULT =
51+
ImmutableOpenSearchAggregateIndexScanRule.Config.builder()
52+
.build()
53+
.withOperandSupplier(
54+
b0 ->
55+
b0.operand(LogicalAggregate.class)
56+
.oneInput(
57+
b1 ->
58+
b1.operand(CalciteLogicalIndexScan.class)
59+
.predicate(OpenSearchIndexScanRule::test)
60+
.noInputs()));
61+
62+
@Override
63+
default OpenSearchAggregateIndexScanRule toRule() {
64+
return new OpenSearchAggregateIndexScanRule(this);
65+
}
66+
}
67+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@
55
package org.opensearch.sql.opensearch.planner.physical;
66

77
import org.apache.calcite.plan.RelOptRuleCall;
8-
import org.apache.calcite.plan.RelOptTable;
98
import org.apache.calcite.plan.RelRule;
109
import org.apache.calcite.rel.core.Filter;
10+
import org.apache.calcite.rel.logical.LogicalFilter;
1111
import org.immutables.value.Value;
12-
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
13-
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
12+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
1413

15-
/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */
14+
/** Planner rule that push a {@link LogicalFilter} down to {@link CalciteLogicalIndexScan} */
1615
@Value.Enclosing
1716
public class OpenSearchFilterIndexScanRule extends RelRule<OpenSearchFilterIndexScanRule.Config> {
1817

@@ -21,17 +20,12 @@ protected OpenSearchFilterIndexScanRule(Config config) {
2120
super(config);
2221
}
2322

24-
protected static boolean test(CalciteOpenSearchIndexScan scan) {
25-
final RelOptTable table = scan.getTable();
26-
return table.unwrap(OpenSearchIndex.class) != null;
27-
}
28-
2923
@Override
3024
public void onMatch(RelOptRuleCall call) {
3125
if (call.rels.length == 2) {
3226
// the ordinary variant
33-
final Filter filter = call.rel(0);
34-
final CalciteOpenSearchIndexScan scan = call.rel(1);
27+
final LogicalFilter filter = call.rel(0);
28+
final CalciteLogicalIndexScan scan = call.rel(1);
3529
apply(call, filter, scan);
3630
} else {
3731
throw new AssertionError(
@@ -41,8 +35,8 @@ public void onMatch(RelOptRuleCall call) {
4135
}
4236
}
4337

44-
protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
45-
CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter);
38+
protected void apply(RelOptRuleCall call, Filter filter, CalciteLogicalIndexScan scan) {
39+
CalciteLogicalIndexScan newScan = scan.pushDownFilter(filter);
4640
if (newScan != null) {
4741
call.transformTo(newScan);
4842
}
@@ -51,17 +45,17 @@ protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexS
5145
/** Rule configuration. */
5246
@Value.Immutable
5347
public interface Config extends RelRule.Config {
54-
/** Config that matches Filter on CalciteOpenSearchIndexScan. */
48+
/** Config that matches Filter on CalciteLogicalIndexScan. */
5549
Config DEFAULT =
5650
ImmutableOpenSearchFilterIndexScanRule.Config.builder()
5751
.build()
5852
.withOperandSupplier(
5953
b0 ->
60-
b0.operand(Filter.class)
54+
b0.operand(LogicalFilter.class)
6155
.oneInput(
6256
b1 ->
63-
b1.operand(CalciteOpenSearchIndexScan.class)
64-
.predicate(OpenSearchFilterIndexScanRule::test)
57+
b1.operand(CalciteLogicalIndexScan.class)
58+
.predicate(OpenSearchIndexScanRule::test)
6559
.noInputs()));
6660

6761
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ public class OpenSearchIndexRules {
1414
OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule();
1515
private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN =
1616
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
17+
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
18+
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
1719

1820
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
19-
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN);
21+
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN);
2022

2123
// prevent instantiation
2224
private OpenSearchIndexRules() {}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.opensearch.sql.opensearch.planner.physical;
2+
3+
import org.apache.calcite.plan.RelOptTable;
4+
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
5+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
6+
7+
public interface OpenSearchIndexScanRule {
8+
9+
// CalciteOpenSearchIndexScan doesn't allow push-down anymore (except Sort under some strict
10+
// condition) after Aggregate push-down.
11+
static boolean test(CalciteLogicalIndexScan scan) {
12+
if (scan.getPushDownContext().isAggregatePushed()) return false;
13+
final RelOptTable table = scan.getTable();
14+
return table.unwrap(OpenSearchIndex.class) != null;
15+
}
16+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99
import java.util.ArrayList;
1010
import java.util.List;
11+
import org.apache.calcite.adapter.enumerable.EnumerableProject;
1112
import org.apache.calcite.plan.RelOptRuleCall;
1213
import org.apache.calcite.plan.RelOptTable;
1314
import org.apache.calcite.plan.RelRule;
14-
import org.apache.calcite.rel.core.Project;
1515
import org.apache.calcite.rex.RexInputRef;
1616
import org.apache.calcite.rex.RexNode;
1717
import org.apache.calcite.rex.RexUtil;
@@ -20,9 +20,9 @@
2020
import org.apache.calcite.util.mapping.Mappings;
2121
import org.immutables.value.Value;
2222
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
23-
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
23+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
2424

25-
/** Planner rule that push a {@link Project} down to {@link CalciteOpenSearchIndexScan} */
25+
/** Planner rule that push a {@link EnumerableProject} down to {@link CalciteLogicalIndexScan} */
2626
@Value.Enclosing
2727
public class OpenSearchProjectIndexScanRule extends RelRule<OpenSearchProjectIndexScanRule.Config> {
2828

@@ -31,17 +31,12 @@ protected OpenSearchProjectIndexScanRule(Config config) {
3131
super(config);
3232
}
3333

34-
protected static boolean test(CalciteOpenSearchIndexScan scan) {
35-
final RelOptTable table = scan.getTable();
36-
return table.unwrap(OpenSearchIndex.class) != null;
37-
}
38-
3934
@Override
4035
public void onMatch(RelOptRuleCall call) {
4136
if (call.rels.length == 2) {
4237
// the ordinary variant
43-
final Project project = call.rel(0);
44-
final CalciteOpenSearchIndexScan scan = call.rel(1);
38+
final EnumerableProject project = call.rel(0);
39+
final CalciteLogicalIndexScan scan = call.rel(1);
4540
apply(call, project, scan);
4641
} else {
4742
throw new AssertionError(
@@ -51,10 +46,13 @@ public void onMatch(RelOptRuleCall call) {
5146
}
5247
}
5348

54-
protected void apply(RelOptRuleCall call, Project project, CalciteOpenSearchIndexScan scan) {
49+
protected void apply(
50+
RelOptRuleCall call, EnumerableProject project, CalciteLogicalIndexScan scan) {
5551
final RelOptTable table = scan.getTable();
5652
requireNonNull(table.unwrap(OpenSearchIndex.class));
5753

54+
// TODO: support script pushdown for project instead of only reference
55+
// https://github.com/opensearch-project/sql/issues/3387
5856
final List<Integer> selectedColumns = new ArrayList<>();
5957
final RexVisitorImpl<Void> visitor =
6058
new RexVisitorImpl<Void>(true) {
@@ -70,7 +68,7 @@ public Void visitInputRef(RexInputRef inputRef) {
7068
// Only do push down when an actual projection happens
7169
if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) {
7270
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
73-
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
71+
CalciteLogicalIndexScan newScan = scan.pushDownProject(selectedColumns);
7472
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());
7573

7674
if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
@@ -90,11 +88,11 @@ public interface Config extends RelRule.Config {
9088
.build()
9189
.withOperandSupplier(
9290
b0 ->
93-
b0.operand(Project.class)
91+
b0.operand(EnumerableProject.class)
9492
.oneInput(
9593
b1 ->
96-
b1.operand(CalciteOpenSearchIndexScan.class)
97-
.predicate(OpenSearchProjectIndexScanRule::test)
94+
b1.operand(CalciteLogicalIndexScan.class)
95+
.predicate(OpenSearchIndexScanRule::test)
9896
.noInputs()));
9997

10098
@Override

0 commit comments

Comments
 (0)