Skip to content

Commit 6fa3d95

Browse files
LantaoJinpenghuo
authored andcommitted
[Calcite] Build integration test framework (opensearch-project#3342)
* Build integration test framework Signed-off-by: Lantao Jin <ltjin@amazon.com> * make local work Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix the timestamp issue Signed-off-by: Lantao Jin <ltjin@amazon.com> * address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix java style and rename CalcitePPLTestCase back to CalcitePPLIntegTestCase Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent f136008 commit 6fa3d95

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1549
-231
lines changed

build.gradle

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,15 @@ allprojects {
130130
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.9.10"
131131
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10"
132132
resolutionStrategy.force "net.bytebuddy:byte-buddy:1.14.9"
133-
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.3.1"
134-
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.5'
135-
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.5'
136-
resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
137-
resolutionStrategy.force 'com.fasterxml.jackson:jackson-bom:2.17.2'
133+
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
134+
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
135+
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
136+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
137+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
138+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
139+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
140+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
141+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
138142
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
139143
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
140144
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public enum Key {
2929

3030
/** Enable Calcite as execution engine */
3131
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
32+
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),
3233

3334
/** Query Settings. */
3435
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),

core/build.gradle

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,14 @@ jacocoTestCoverageVerification {
119119
'org.opensearch.sql.datasource.model.DataSource',
120120
'org.opensearch.sql.datasource.model.DataSourceStatus',
121121
'org.opensearch.sql.datasource.model.DataSourceType',
122-
'org.opensearch.sql.executor.ExecutionEngine'
123122
]
124123
limit {
125124
counter = 'LINE'
126-
minimum = 0.5 // calcite dev only
125+
minimum = 0.0 // calcite dev only
127126
}
128127
limit {
129128
counter = 'BRANCH'
130-
minimum = 0.5 // calcite dev only
129+
minimum = 0.0 // calcite dev only
131130
}
132131
}
133132
}

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,29 @@
55

66
package org.opensearch.sql.calcite;
77

8+
import java.sql.Connection;
89
import java.util.function.BiFunction;
910
import lombok.Getter;
11+
import org.apache.calcite.adapter.java.JavaTypeFactory;
1012
import org.apache.calcite.rex.RexNode;
1113
import org.apache.calcite.tools.FrameworkConfig;
1214
import org.apache.calcite.tools.RelBuilder;
1315
import org.opensearch.sql.ast.expression.UnresolvedExpression;
16+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
1417

1518
public class CalcitePlanContext {
1619

1720
public FrameworkConfig config;
21+
public final Connection connection;
1822
public final RelBuilder relBuilder;
1923
public final ExtendedRexBuilder rexBuilder;
2024

2125
@Getter private boolean isResolvingJoinCondition = false;
2226

23-
public CalcitePlanContext(FrameworkConfig config) {
27+
private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
2428
this.config = config;
25-
this.relBuilder = RelBuilder.create(config);
29+
this.connection = CalciteToolsHelper.connect(config, typeFactory);
30+
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
2631
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
2732
}
2833

@@ -35,8 +40,11 @@ public RexNode resolveJoinCondition(
3540
return result;
3641
}
3742

38-
// for testing only
3943
public static CalcitePlanContext create(FrameworkConfig config) {
40-
return new CalcitePlanContext(config);
44+
return new CalcitePlanContext(config, null);
45+
}
46+
47+
public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
48+
return new CalcitePlanContext(config, typeFactory);
4149
}
4250
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,14 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
9292
@Override
9393
public RelNode visitProject(Project node, CalcitePlanContext context) {
9494
visitChildren(node, context);
95-
List<RexNode> projectList =
96-
node.getProjectList().stream()
97-
.filter(expr -> !(expr instanceof AllFields))
98-
.map(expr -> rexVisitor.analyze(expr, context))
99-
.collect(Collectors.toList());
100-
if (projectList.isEmpty()) {
95+
List<RexNode> projectList;
96+
if (node.getProjectList().stream().anyMatch(e -> e instanceof AllFields)) {
10197
return context.relBuilder.peek();
98+
} else {
99+
projectList =
100+
node.getProjectList().stream()
101+
.map(expr -> rexVisitor.analyze(expr, context))
102+
.collect(Collectors.toList());
102103
}
103104
if (node.isExcluded()) {
104105
context.relBuilder.projectExcept(projectList);

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.apache.calcite.schema.SchemaPlus;
1717
import org.apache.calcite.schema.Schemas;
1818
import org.apache.calcite.schema.TranslatableTable;
19-
import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes;
19+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
2020

2121
public abstract class OpenSearchTable extends AbstractQueryableTable
2222
implements TranslatableTable, org.opensearch.sql.storage.Table {
@@ -27,7 +27,7 @@ protected OpenSearchTable(Type elementType) {
2727

2828
@Override
2929
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
30-
return OpenSearchRelDataTypes.convertSchema(this);
30+
return OpenSearchTypeFactory.convertSchema(this);
3131
}
3232

3333
@Override
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
/*
7+
* This file contains code from the Apache Calcite project (original license below).
8+
* It contains modifications, which are licensed as above:
9+
*/
10+
11+
/*
12+
* Licensed to the Apache Software Foundation (ASF) under one or more
13+
* contributor license agreements. See the NOTICE file distributed with
14+
* this work for additional information regarding copyright ownership.
15+
* The ASF licenses this file to you under the Apache License, Version 2.0
16+
* (the "License"); you may not use this file except in compliance with
17+
* the License. You may obtain a copy of the License at
18+
*
19+
* http://www.apache.org/licenses/LICENSE-2.0
20+
*
21+
* Unless required by applicable law or agreed to in writing, software
22+
* distributed under the License is distributed on an "AS IS" BASIS,
23+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24+
* See the License for the specific language governing permissions and
25+
* limitations under the License.
26+
*/
27+
28+
package org.opensearch.sql.calcite.utils;
29+
30+
import java.sql.Connection;
31+
import java.sql.PreparedStatement;
32+
import java.sql.SQLException;
33+
import java.util.Properties;
34+
import org.apache.calcite.adapter.java.JavaTypeFactory;
35+
import org.apache.calcite.avatica.AvaticaConnection;
36+
import org.apache.calcite.avatica.AvaticaFactory;
37+
import org.apache.calcite.avatica.UnregisteredDriver;
38+
import org.apache.calcite.config.CalciteConnectionProperty;
39+
import org.apache.calcite.interpreter.Bindables;
40+
import org.apache.calcite.jdbc.CalciteFactory;
41+
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
42+
import org.apache.calcite.jdbc.CalcitePrepare;
43+
import org.apache.calcite.jdbc.CalciteSchema;
44+
import org.apache.calcite.jdbc.Driver;
45+
import org.apache.calcite.plan.Context;
46+
import org.apache.calcite.plan.RelOptCluster;
47+
import org.apache.calcite.plan.RelOptPlanner;
48+
import org.apache.calcite.plan.RelOptSchema;
49+
import org.apache.calcite.plan.RelOptTable;
50+
import org.apache.calcite.prepare.CalciteCatalogReader;
51+
import org.apache.calcite.prepare.CalcitePrepareImpl;
52+
import org.apache.calcite.rel.RelHomogeneousShuttle;
53+
import org.apache.calcite.rel.RelNode;
54+
import org.apache.calcite.rel.RelShuttle;
55+
import org.apache.calcite.rel.core.TableScan;
56+
import org.apache.calcite.rel.logical.LogicalTableScan;
57+
import org.apache.calcite.rel.type.RelDataTypeSystem;
58+
import org.apache.calcite.rex.RexBuilder;
59+
import org.apache.calcite.schema.SchemaPlus;
60+
import org.apache.calcite.server.CalciteServerStatement;
61+
import org.apache.calcite.tools.FrameworkConfig;
62+
import org.apache.calcite.tools.Frameworks;
63+
import org.apache.calcite.tools.RelBuilder;
64+
import org.apache.calcite.tools.RelRunner;
65+
import org.apache.calcite.util.Util;
66+
import org.opensearch.sql.calcite.CalcitePlanContext;
67+
68+
/**
69+
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
70+
* 3. RelBuilder 4. RelRunner TODO delete it in future if possible.
71+
*/
72+
public class CalciteToolsHelper {
73+
74+
/** Create a RelBuilder with testing */
75+
public static RelBuilder create(FrameworkConfig config) {
76+
return RelBuilder.create(config);
77+
}
78+
79+
/** Create a RelBuilder with typeFactory */
80+
public static RelBuilder create(
81+
FrameworkConfig config, JavaTypeFactory typeFactory, Connection connection) {
82+
return withPrepare(
83+
config,
84+
typeFactory,
85+
connection,
86+
(cluster, relOptSchema, rootSchema, statement) ->
87+
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
88+
}
89+
90+
public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
91+
final Properties info = new Properties();
92+
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
93+
info.setProperty(
94+
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
95+
config.getTypeSystem().getClass().getName());
96+
}
97+
try {
98+
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
99+
} catch (SQLException e) {
100+
throw new RuntimeException(e);
101+
}
102+
}
103+
104+
/**
105+
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
106+
* Frameworks.BasePrepareAction)}. The purpose is the method {@link
107+
* CalciteFactory#newConnection(UnregisteredDriver, AvaticaFactory, String, Properties)} create
108+
* connection with null instance of JavaTypeFactory. So we add a parameter JavaTypeFactory.
109+
*/
110+
private static <R> R withPrepare(
111+
FrameworkConfig config,
112+
JavaTypeFactory typeFactory,
113+
Connection connection,
114+
Frameworks.BasePrepareAction<R> action) {
115+
try {
116+
final Properties info = new Properties();
117+
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
118+
info.setProperty(
119+
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
120+
config.getTypeSystem().getClass().getName());
121+
}
122+
final CalciteServerStatement statement =
123+
connection.createStatement().unwrap(CalciteServerStatement.class);
124+
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
125+
} catch (Exception e) {
126+
throw new RuntimeException(e);
127+
}
128+
}
129+
130+
public static class OpenSearchDriver extends Driver {
131+
132+
public Connection connect(
133+
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
134+
throws SQLException {
135+
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
136+
AvaticaConnection connection =
137+
factory.newConnection((Driver) this, factory, url, info, rootSchema, typeFactory);
138+
this.handler.onConnectionInit(connection);
139+
return connection;
140+
}
141+
}
142+
143+
/** do nothing, just extend for a public construct for new */
144+
public static class OpenSearchRelBuilder extends RelBuilder {
145+
public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
146+
super(context, cluster, relOptSchema);
147+
}
148+
}
149+
150+
public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
151+
/**
152+
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
153+
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
154+
*/
155+
public <R> R perform(
156+
CalciteServerStatement statement,
157+
FrameworkConfig config,
158+
JavaTypeFactory typeFactory,
159+
Frameworks.BasePrepareAction<R> action) {
160+
final CalcitePrepare.Context prepareContext = statement.createPrepareContext();
161+
SchemaPlus defaultSchema = config.getDefaultSchema();
162+
final CalciteSchema schema =
163+
defaultSchema != null
164+
? CalciteSchema.from(defaultSchema)
165+
: prepareContext.getRootSchema();
166+
CalciteCatalogReader catalogReader =
167+
new CalciteCatalogReader(
168+
schema.root(), schema.path(null), typeFactory, prepareContext.config());
169+
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
170+
final RelOptPlanner planner =
171+
createPlanner(prepareContext, config.getContext(), config.getCostFactory());
172+
final RelOptCluster cluster = createCluster(planner, rexBuilder);
173+
return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement);
174+
}
175+
}
176+
177+
public static class OpenSearchRelRunners {
178+
/**
179+
* Runs a relational expression by existing connection. This class copied from {@link
180+
* org.apache.calcite.tools.RelRunners#run(RelNode)}
181+
*/
182+
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
183+
final RelShuttle shuttle =
184+
new RelHomogeneousShuttle() {
185+
@Override
186+
public RelNode visit(TableScan scan) {
187+
final RelOptTable table = scan.getTable();
188+
if (scan instanceof LogicalTableScan
189+
&& Bindables.BindableTableScan.canHandle(table)) {
190+
// Always replace the LogicalTableScan with BindableTableScan
191+
// because it's implementation does not require a "schema" as context.
192+
return Bindables.BindableTableScan.create(scan.getCluster(), table);
193+
}
194+
return super.visit(scan);
195+
}
196+
};
197+
rel = rel.accept(shuttle);
198+
// the line we changed here
199+
try (Connection connection = context.connection) {
200+
final RelRunner runner = connection.unwrap(RelRunner.class);
201+
return runner.prepareStatement(rel);
202+
} catch (SQLException e) {
203+
throw Util.throwAsRuntime(e);
204+
}
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)