Skip to content

Commit 30639c5

Browse files
qianheng-awsxinyual
authored andcommitted
[Calcite Engine] Push down project and filter operator into index scan (#3327)
* Support Filter and Project pushdown Signed-off-by: Heng Qian <qianheng@amazon.com> * Support Filter and Project pushdown v2 Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Add original license for PredicateAnalyzer Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com> Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 9000e06 commit 30639c5

File tree

14 files changed

+1501
-104
lines changed

14 files changed

+1501
-104
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.util.function.BiFunction;
99
import lombok.Getter;
10-
import org.apache.calcite.jdbc.CalciteConnection;
1110
import org.apache.calcite.rex.RexNode;
1211
import org.apache.calcite.tools.FrameworkConfig;
1312
import org.apache.calcite.tools.RelBuilder;
@@ -16,15 +15,13 @@
1615
public class CalcitePlanContext {
1716

1817
public FrameworkConfig config;
19-
public CalciteConnection connection;
2018
public final RelBuilder relBuilder;
2119
public final ExtendedRexBuilder rexBuilder;
2220

2321
@Getter private boolean isResolvingJoinCondition = false;
2422

25-
public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) {
23+
public CalcitePlanContext(FrameworkConfig config) {
2624
this.config = config;
27-
this.connection = connection;
2825
this.relBuilder = RelBuilder.create(config);
2926
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
3027
}
@@ -40,6 +37,6 @@ public RexNode resolveJoinCondition(
4037

4138
// for testing only
4239
public static CalcitePlanContext create(FrameworkConfig config) {
43-
return new CalcitePlanContext(config, null);
40+
return new CalcitePlanContext(config);
4441
}
4542
}

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111
import org.apache.calcite.linq4j.QueryProvider;
1212
import org.apache.calcite.linq4j.Queryable;
1313
import org.apache.calcite.linq4j.tree.Expression;
14-
import org.apache.calcite.plan.RelOptCluster;
15-
import org.apache.calcite.plan.RelOptTable;
16-
import org.apache.calcite.rel.RelNode;
1714
import org.apache.calcite.rel.type.RelDataType;
1815
import org.apache.calcite.rel.type.RelDataTypeFactory;
1916
import org.apache.calcite.schema.SchemaPlus;
@@ -33,12 +30,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
3330
return OpenSearchRelDataTypes.convertSchema(this);
3431
}
3532

36-
@Override
37-
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
38-
final RelOptCluster cluster = context.getCluster();
39-
return new OpenSearchTableScan(cluster, relOptTable, this);
40-
}
41-
4233
@Override
4334
public <T> Queryable<T> asQueryable(
4435
QueryProvider queryProvider, SchemaPlus schema, String tableName) {

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,26 @@
55

66
package org.opensearch.sql.calcite.plan;
77

8-
import static java.util.Objects.requireNonNull;
9-
108
import com.google.common.collect.ImmutableList;
11-
import java.util.List;
129
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
1310
import org.apache.calcite.adapter.enumerable.EnumerableRel;
14-
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
15-
import org.apache.calcite.adapter.enumerable.PhysType;
16-
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
17-
import org.apache.calcite.linq4j.tree.Blocks;
18-
import org.apache.calcite.linq4j.tree.Expressions;
1911
import org.apache.calcite.plan.RelOptCluster;
2012
import org.apache.calcite.plan.RelOptPlanner;
2113
import org.apache.calcite.plan.RelOptRule;
2214
import org.apache.calcite.plan.RelOptTable;
23-
import org.apache.calcite.plan.RelTraitSet;
24-
import org.apache.calcite.rel.RelNode;
2515
import org.apache.calcite.rel.core.TableScan;
2616
import org.apache.calcite.rel.rules.CoreRules;
2717

2818
/** Relational expression representing a scan of an OpenSearch type. */
29-
public class OpenSearchTableScan extends TableScan implements EnumerableRel {
30-
private final OpenSearchTable osTable;
31-
19+
public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel {
3220
/**
3321
* Creates an OpenSearchTableScan.
3422
*
3523
* @param cluster Cluster
3624
* @param table Table
37-
* @param osTable OpenSearch table
3825
*/
39-
OpenSearchTableScan(RelOptCluster cluster, RelOptTable table, OpenSearchTable osTable) {
26+
protected OpenSearchTableScan(RelOptCluster cluster, RelOptTable table) {
4027
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table);
41-
this.osTable = requireNonNull(osTable, "OpenSearch table");
42-
}
43-
44-
@Override
45-
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
46-
assert inputs.isEmpty();
47-
return new OpenSearchTableScan(getCluster(), table, osTable);
4828
}
4929

5030
@Override
@@ -57,16 +37,4 @@ public void register(RelOptPlanner planner) {
5737
// it is converted to cardinality aggregation in OpenSearch
5838
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);
5939
}
60-
61-
@Override
62-
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
63-
PhysType physType =
64-
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
65-
66-
return implementor.result(
67-
physType,
68-
Blocks.toBlock(
69-
Expressions.call(
70-
requireNonNull(table.getExpression(OpenSearchTable.class)), "search")));
71-
}
7240
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
import java.util.List;
1414
import lombok.AllArgsConstructor;
1515
import lombok.RequiredArgsConstructor;
16-
import org.apache.calcite.jdbc.CalciteConnection;
17-
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
1816
import org.apache.calcite.jdbc.CalciteSchema;
19-
import org.apache.calcite.jdbc.Driver;
2017
import org.apache.calcite.plan.RelTraitDef;
2118
import org.apache.calcite.rel.RelNode;
2219
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
@@ -80,29 +77,8 @@ public void execute(
8077
AccessController.doPrivileged(
8178
(PrivilegedAction<Void>)
8279
() -> {
83-
// Use simple calcite schema since we don't compute tables in advance of the
84-
// query.
85-
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
86-
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
87-
CalciteConnection connection =
88-
factory.newConnection(
89-
new Driver(),
90-
factory,
91-
"",
92-
new java.util.Properties(),
93-
rootSchema,
94-
null);
95-
final SchemaPlus defaultSchema =
96-
connection
97-
.getRootSchema()
98-
.add(
99-
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
100-
new OpenSearchSchema(dataSourceService));
101-
// Set opensearch schema as the default schema in config, otherwise we need to
102-
// explicitly
103-
// add schema path 'OpenSearch' before the opensearch table name
104-
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
105-
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
80+
final FrameworkConfig config = buildFrameworkConfig();
81+
final CalcitePlanContext context = new CalcitePlanContext(config);
10682
executePlanByCalcite(analyze(plan, context), context, listener);
10783
return null;
10884
});
@@ -174,10 +150,15 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
174150
return relNodeVisitor.analyze(plan, context);
175151
}
176152

177-
private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) {
153+
private FrameworkConfig buildFrameworkConfig() {
154+
// Use simple calcite schema since we don't compute tables in advance of the query.
155+
final SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
156+
final SchemaPlus opensearchSchema =
157+
rootSchema.add(
158+
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService));
178159
return Frameworks.newConfigBuilder()
179160
.parserConfig(SqlParser.Config.DEFAULT) // TODO check
180-
.defaultSchema(defaultSchema)
161+
.defaultSchema(opensearchSchema)
181162
.traitDefs((List<RelTraitDef>) null)
182163
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
183164
.typeSystem(OpenSearchTypeSystem.INSTANCE)

opensearch/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ dependencies {
4040
compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
4141
implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
4242

43+
annotationProcessor 'org.immutables:value:2.8.8'
44+
compileOnly 'org.immutables:value-annotations:2.8.8'
45+
4346
testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3')
4447
testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3')
4548
testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.3')

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.security.AccessController;
99
import java.security.PrivilegedAction;
10-
import java.sql.Connection;
1110
import java.sql.PreparedStatement;
1211
import java.sql.ResultSet;
1312
import java.sql.ResultSetMetaData;
@@ -18,7 +17,7 @@
1817
import java.util.Map;
1918
import lombok.RequiredArgsConstructor;
2019
import org.apache.calcite.rel.RelNode;
21-
import org.apache.calcite.tools.RelRunner;
20+
import org.apache.calcite.tools.RelRunners;
2221
import org.opensearch.sql.calcite.CalcitePlanContext;
2322
import org.opensearch.sql.common.response.ResponseListener;
2423
import org.opensearch.sql.data.model.ExprTupleValue;
@@ -111,13 +110,9 @@ public void execute(
111110
AccessController.doPrivileged(
112111
(PrivilegedAction<Void>)
113112
() -> {
114-
Connection connection = context.connection;
115-
try {
116-
RelRunner relRunner = connection.unwrap(RelRunner.class);
117-
try (PreparedStatement statement = relRunner.prepareStatement(rel)) {
118-
ResultSet resultSet = statement.executeQuery();
119-
buildResultSet(resultSet, listener);
120-
}
113+
try (PreparedStatement statement = RelRunners.run(rel)) {
114+
ResultSet result = statement.executeQuery();
115+
buildResultSet(result, listener);
121116
return null;
122117
} catch (SQLException e) {
123118
throw new RuntimeException(e);
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.sql.opensearch.planner.physical;
6+
7+
import org.apache.calcite.plan.RelOptRuleCall;
8+
import org.apache.calcite.plan.RelOptTable;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.core.Filter;
11+
import org.immutables.value.Value;
12+
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
13+
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;
14+
15+
/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */
16+
@Value.Enclosing
17+
public class OpenSearchFilterIndexScanRule extends RelRule<OpenSearchFilterIndexScanRule.Config> {
18+
19+
/** Creates a OpenSearchFilterIndexScanRule. */
20+
protected OpenSearchFilterIndexScanRule(Config config) {
21+
super(config);
22+
}
23+
24+
protected static boolean test(CalciteOpenSearchIndexScan scan) {
25+
final RelOptTable table = scan.getTable();
26+
return table.unwrap(OpenSearchIndex.class) != null;
27+
}
28+
29+
@Override
30+
public void onMatch(RelOptRuleCall call) {
31+
if (call.rels.length == 2) {
32+
// the ordinary variant
33+
final Filter filter = call.rel(0);
34+
final CalciteOpenSearchIndexScan scan = call.rel(1);
35+
apply(call, filter, scan);
36+
} else {
37+
throw new AssertionError(
38+
String.format(
39+
"The length of rels should be %s but got %s",
40+
this.operands.size(), call.rels.length));
41+
}
42+
}
43+
44+
protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
45+
if (scan.pushDownFilter(filter)) {
46+
call.transformTo(scan);
47+
}
48+
}
49+
50+
/** Rule configuration. */
51+
@Value.Immutable
52+
public interface Config extends RelRule.Config {
53+
/** Config that matches Filter on CalciteOpenSearchIndexScan. */
54+
Config DEFAULT =
55+
ImmutableOpenSearchFilterIndexScanRule.Config.builder()
56+
.build()
57+
.withOperandSupplier(
58+
b0 ->
59+
b0.operand(Filter.class)
60+
.oneInput(
61+
b1 ->
62+
b1.operand(CalciteOpenSearchIndexScan.class)
63+
.predicate(OpenSearchFilterIndexScanRule::test)
64+
.noInputs()));
65+
66+
@Override
67+
default OpenSearchFilterIndexScanRule toRule() {
68+
return new OpenSearchFilterIndexScanRule(this);
69+
}
70+
}
71+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.planner.physical;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import org.apache.calcite.plan.RelOptRule;
11+
12+
public class OpenSearchIndexRules {
13+
private static final OpenSearchProjectIndexScanRule PROJECT_INDEX_SCAN =
14+
OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule();
15+
private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN =
16+
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
17+
18+
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
19+
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN);
20+
21+
// prevent instantiation
22+
private OpenSearchIndexRules() {}
23+
}

0 commit comments

Comments
 (0)