Skip to content

Commit 7944f45

Browse files
qianheng-awspenghuo
authored andcommitted
Fix execution errors caused by plan gap (opensearch-project#3350)
* Transform to calcite plan before executing Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix bug for single column row Signed-off-by: Heng Qian <qianheng@amazon.com> * Add settings for calcite pushdown Signed-off-by: Heng Qian <qianheng@amazon.com> * Lazily construct OpenSearchRequestBuilder and do push down Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments and disable push down Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent c1546b1 commit 7944f45

File tree

9 files changed

+113
-33
lines changed

9 files changed

+113
-33
lines changed

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public enum Key {
3030
/** Enable Calcite as execution engine */
3131
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
3232
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),
33+
CALCITE_PUSHDOWN_ENABLED("plugins.calcite.pushdown.enabled"),
3334

3435
/** Query Settings. */
3536
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
import lombok.RequiredArgsConstructor;
1616
import org.apache.calcite.jdbc.CalciteSchema;
1717
import org.apache.calcite.plan.RelTraitDef;
18+
import org.apache.calcite.rel.RelCollation;
19+
import org.apache.calcite.rel.RelCollations;
1820
import org.apache.calcite.rel.RelNode;
21+
import org.apache.calcite.rel.core.Sort;
22+
import org.apache.calcite.rel.logical.LogicalSort;
1923
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
2024
import org.apache.calcite.schema.SchemaPlus;
2125
import org.apache.calcite.sql.parser.SqlParser;
@@ -129,7 +133,31 @@ public void executePlanByCalcite(
129133
RelNode plan,
130134
CalcitePlanContext context,
131135
ResponseListener<ExecutionEngine.QueryResponse> listener) {
132-
executionEngine.execute(optimize(plan), context, listener);
136+
executionEngine.execute(convertToCalcitePlan(optimize(plan)), context, listener);
137+
}
138+
139+
/**
140+
* Convert OpenSearch Plan to Calcite Plan. Although both plans consist of Calcite RelNodes, there
141+
* are some differences in the topological structures or semantics between them.
142+
*
143+
* @param osPlan Logical Plan derived from OpenSearch PPL
144+
*/
145+
private static RelNode convertToCalcitePlan(RelNode osPlan) {
146+
RelNode calcitePlan = osPlan;
147+
148+
/* Calcite only ensures collation of the final result produced from the root sort operator.
149+
* While we expect that the collation can be preserved through the pipes over PPL, we need to
150+
* explicitly add a sort operator on top of the original plan
151+
* to ensure the correct collation of the final result.
152+
* See logic in ${@link CalcitePrepareImpl}
153+
* For the redundant sort, we rely on Calcite optimizer to eliminate
154+
*/
155+
RelCollation collation = osPlan.getTraitSet().getCollation();
156+
if (!(osPlan instanceof Sort) && collation != RelCollations.EMPTY) {
157+
calcitePlan = LogicalSort.create(osPlan, collation, null, null);
158+
}
159+
160+
return calcitePlan;
133161
}
134162

135163
/**

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ private Settings defaultSettings() {
104104
.put(Key.FIELD_TYPE_TOLERANCE, true)
105105
.put(Key.CALCITE_ENGINE_ENABLED, true)
106106
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
107+
.put(Key.CALCITE_PUSHDOWN_ENABLED, false)
107108
.build();
108109

109110
@Override

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLSortIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@
88
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
99

1010
import java.io.IOException;
11-
import org.junit.Ignore;
1211
import org.junit.jupiter.api.Test;
1312

14-
/** testSortXXAndXX could fail. TODO Remove this @Ignore when the issue fixed. */
15-
@Ignore
1613
public class CalcitePPLSortIT extends CalcitePPLIntegTestCase {
1714

1815
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ public void onMatch(RelOptRuleCall call) {
4242
}
4343

4444
protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
45-
if (scan.pushDownFilter(filter)) {
46-
call.transformTo(scan);
45+
CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter);
46+
if (newScan != null) {
47+
call.transformTo(newScan);
4748
}
4849
}
4950

opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ public class OpenSearchSettings extends Settings {
9090
Setting.Property.NodeScope,
9191
Setting.Property.Dynamic);
9292

93+
public static final Setting<?> CALCITE_PUSHDOWN_ENABLED_SETTING =
94+
Setting.boolSetting(
95+
Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(),
96+
false,
97+
Setting.Property.NodeScope,
98+
Setting.Property.Dynamic);
99+
93100
public static final Setting<?> QUERY_MEMORY_LIMIT_SETTING =
94101
Setting.memorySizeSetting(
95102
Key.QUERY_MEMORY_LIMIT.getKeyValue(),
@@ -284,6 +291,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
284291
Key.CALCITE_FALLBACK_ALLOWED,
285292
CALCITE_FALLBACK_ALLOWED_SETTING,
286293
new Updater(Key.CALCITE_FALLBACK_ALLOWED));
294+
register(
295+
settingBuilder,
296+
clusterSettings,
297+
Key.CALCITE_PUSHDOWN_ENABLED,
298+
CALCITE_PUSHDOWN_ENABLED_SETTING,
299+
new Updater(Key.CALCITE_PUSHDOWN_ENABLED));
287300
register(
288301
settingBuilder,
289302
clusterSettings,
@@ -459,6 +472,7 @@ public static List<Setting<?>> pluginSettings() {
459472
.add(CALCITE_ENGINE_ENABLED_SETTING)
460473
.add(DEFAULT_PATTERN_METHOD_SETTING)
461474
.add(CALCITE_FALLBACK_ALLOWED_SETTING)
475+
.add(CALCITE_PUSHDOWN_ENABLED_SETTING)
462476
.add(QUERY_MEMORY_LIMIT_SETTING)
463477
.add(QUERY_SIZE_LIMIT_SETTING)
464478
.add(METRICS_ROLLING_WINDOW_SETTING)

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class OpenSearchIndex extends OpenSearchTable {
7070
/** OpenSearch client connection. */
7171
@Getter private final OpenSearchClient client;
7272

73-
private final Settings settings;
73+
@Getter private final Settings settings;
7474

7575
/** {@link OpenSearchRequest.IndexName}. */
7676
private final OpenSearchRequest.IndexName indexName;

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static java.util.Objects.requireNonNull;
99

10+
import java.util.ArrayDeque;
1011
import java.util.List;
1112
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
1213
import org.apache.calcite.adapter.enumerable.PhysType;
@@ -32,20 +33,26 @@
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334
import org.opensearch.index.query.QueryBuilder;
3435
import org.opensearch.sql.calcite.plan.OpenSearchTableScan;
36+
import org.opensearch.sql.common.setting.Settings;
3537
import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules;
3638
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
3739
import org.opensearch.sql.opensearch.request.PredicateAnalyzer;
38-
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException;
39-
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException;
4040
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
4141

4242
/** Relational expression representing a scan of an OpenSearchIndex type. */
4343
public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
4444
private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class);
4545

4646
private final OpenSearchIndex osIndex;
47-
private final OpenSearchRequestBuilder requestBuilder;
47+
// The schema of this scan operator, it's initialized with the row type of the table, but may be
48+
// changed by push down operations.
4849
private final RelDataType schema;
50+
// This context maintains all the push down actions, which will be applied to the requestBuilder
51+
// when it begins to scan data from OpenSearch.
52+
// Because OpenSearchRequestBuilder doesn't support deep copy while we want to keep the
53+
// requestBuilder independent among different plans produced in the optimization process,
54+
// so we cannot apply these actions right away.
55+
private final PushDownContext pushDownContext;
4956

5057
/**
5158
* Creates an CalciteOpenSearchIndexScan.
@@ -56,24 +63,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
5663
*/
5764
public CalciteOpenSearchIndexScan(
5865
RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) {
59-
this(cluster, table, index, index.createRequestBuilder(), table.getRowType());
66+
this(cluster, table, index, table.getRowType(), new PushDownContext());
6067
}
6168

62-
public CalciteOpenSearchIndexScan(
69+
private CalciteOpenSearchIndexScan(
6370
RelOptCluster cluster,
6471
RelOptTable table,
6572
OpenSearchIndex index,
66-
OpenSearchRequestBuilder requestBuilder,
67-
RelDataType schema) {
73+
RelDataType schema,
74+
PushDownContext pushDownContext) {
6875
super(cluster, table);
6976
this.osIndex = requireNonNull(index, "OpenSearch index");
70-
this.requestBuilder = requestBuilder;
7177
this.schema = schema;
78+
this.pushDownContext = pushDownContext;
79+
}
80+
81+
public CalciteOpenSearchIndexScan copy() {
82+
return new CalciteOpenSearchIndexScan(
83+
getCluster(), table, osIndex, this.schema, pushDownContext.clone());
7284
}
7385

7486
public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) {
75-
// TODO: need to do deep-copy on requestBuilder in case non-idempotent push down.
76-
return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema);
87+
// Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the
88+
// optimization process won't affect each other.
89+
return new CalciteOpenSearchIndexScan(
90+
getCluster(), table, osIndex, schema, pushDownContext.clone());
7791
}
7892

7993
@Override
@@ -85,8 +99,10 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
8599
@Override
86100
public void register(RelOptPlanner planner) {
87101
super.register(planner);
88-
for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) {
89-
planner.addRule(rule);
102+
if (osIndex.getSettings().getSettingValue(Settings.Key.CALCITE_PUSHDOWN_ENABLED)) {
103+
for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) {
104+
planner.addRule(rule);
105+
}
90106
}
91107
}
92108

@@ -97,15 +113,23 @@ public RelDataType deriveRowType() {
97113

98114
@Override
99115
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
100-
// Avoid optimizing the java row type since the scan will always return an array.
116+
/* In Calcite enumerable operators, row of single column will be optimized to a scalar value.
117+
* See {@link PhysTypeImpl}.
118+
* Since we need to combine this operator with their original ones,
119+
* let's follow this convention to apply the optimization here and ensure `scan` method
120+
* returns the correct data format for single column rows.
121+
* See {@link OpenSearchIndexEnumerator}
122+
*/
101123
PhysType physType =
102-
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray(), false);
124+
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
103125

104126
Expression scanOperator = implementor.stash(this, CalciteOpenSearchIndexScan.class);
105127
return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan")));
106128
}
107129

108130
public Enumerable<@Nullable Object> scan() {
131+
OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder();
132+
pushDownContext.forEach(action -> action.apply(requestBuilder));
109133
return new AbstractEnumerable<>() {
110134
@Override
111135
public Enumerator<Object> enumerator() {
@@ -118,17 +142,18 @@ public Enumerator<Object> enumerator() {
118142
};
119143
}
120144

121-
public boolean pushDownFilter(Filter filter) {
145+
public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) {
122146
try {
147+
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
123148
List<String> schema = this.getRowType().getFieldNames();
124149
QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema);
125-
requestBuilder.pushDownFilter(filterBuilder);
150+
newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder));
126151
// TODO: handle the case where condition contains a score function
127-
return true;
128-
} catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) {
152+
return newScan;
153+
} catch (Exception e) {
129154
LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e);
130155
}
131-
return false;
156+
return null;
132157
}
133158

134159
/**
@@ -143,7 +168,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns)
143168
}
144169
RelDataType newSchema = builder.build();
145170
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema);
146-
newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream());
171+
newScan.pushDownContext.add(
172+
requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()));
147173
return newScan;
148174
}
175+
176+
static class PushDownContext extends ArrayDeque<PushDownAction> {
177+
@Override
178+
public PushDownContext clone() {
179+
return (PushDownContext) super.clone();
180+
}
181+
}
182+
183+
private interface PushDownAction {
184+
void apply(OpenSearchRequestBuilder requestBuilder);
185+
}
149186
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import lombok.EqualsAndHashCode;
1212
import lombok.ToString;
1313
import org.apache.calcite.linq4j.Enumerator;
14-
import org.opensearch.sql.data.model.ExprNullValue;
1514
import org.opensearch.sql.data.model.ExprValue;
1615
import org.opensearch.sql.opensearch.client.OpenSearchClient;
1716
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
@@ -62,11 +61,13 @@ private void fetchNextBatch() {
6261

6362
@Override
6463
public Object current() {
65-
Object[] p =
66-
fields.stream()
67-
.map(k -> current.tupleValue().getOrDefault(k, ExprNullValue.of()).valueForCalcite())
68-
.toArray();
69-
return p;
64+
/* In Calcite enumerable operators, row of single column will be optimized to a scalar value.
65+
* See {@link PhysTypeImpl}
66+
*/
67+
if (fields.size() == 1) {
68+
return current.tupleValue().get(fields.getFirst()).valueForCalcite();
69+
}
70+
return fields.stream().map(k -> current.tupleValue().get(k).valueForCalcite()).toArray();
7071
}
7172

7273
@Override

0 commit comments

Comments
 (0)