Skip to content

Commit 8a9b591

Browse files
qianheng-awspenghuo
authored andcommitted
Enable push down optimization by default (opensearch-project#3366)
Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 064506a commit 8a9b591

File tree

4 files changed

+56
-13
lines changed

4 files changed

+56
-13
lines changed

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private Settings defaultSettings() {
104104
.put(Key.FIELD_TYPE_TOLERANCE, true)
105105
.put(Key.CALCITE_ENGINE_ENABLED, true)
106106
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
107-
.put(Key.CALCITE_PUSHDOWN_ENABLED, false)
107+
.put(Key.CALCITE_PUSHDOWN_ENABLED, true)
108108
.build();
109109

110110
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchProjectIndexScanRule.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,17 @@ public Void visitInputRef(RexInputRef inputRef) {
6767
}
6868
};
6969
visitor.visitEach(project.getProjects());
70+
// Only do push down when an actual projection happens
71+
if (!selectedColumns.isEmpty() && selectedColumns.size() != scan.getRowType().getFieldCount()) {
72+
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
73+
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
74+
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());
7075

71-
Mapping mapping = Mappings.target(selectedColumns, scan.getRowType().getFieldCount());
72-
CalciteOpenSearchIndexScan newScan = scan.pushDownProject(selectedColumns);
73-
final List<RexNode> newProjectRexNodes = RexUtil.apply(mapping, project.getProjects());
74-
75-
if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
76-
call.transformTo(newScan);
77-
} else {
78-
call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build());
76+
if (RexUtil.isIdentity(newProjectRexNodes, newScan.getRowType())) {
77+
call.transformTo(newScan);
78+
} else {
79+
call.transformTo(call.builder().push(newScan).project(newProjectRexNodes).build());
80+
}
7981
}
8082
}
8183

opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public class OpenSearchSettings extends Settings {
9393
public static final Setting<?> CALCITE_PUSHDOWN_ENABLED_SETTING =
9494
Setting.boolSetting(
9595
Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(),
96-
false,
96+
true,
9797
Setting.Property.NodeScope,
9898
Setting.Property.Dynamic);
9999

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.calcite.plan.RelOptTable;
2525
import org.apache.calcite.plan.RelTraitSet;
2626
import org.apache.calcite.rel.RelNode;
27+
import org.apache.calcite.rel.RelWriter;
2728
import org.apache.calcite.rel.core.Filter;
2829
import org.apache.calcite.rel.type.RelDataType;
2930
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -96,6 +97,12 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
9697
return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex);
9798
}
9899

100+
@Override
101+
public RelWriter explainTerms(RelWriter pw) {
102+
return super.explainTerms(pw)
103+
.itemIf("PushDownContext", pushDownContext, !pushDownContext.isEmpty());
104+
}
105+
99106
@Override
100107
public void register(RelOptPlanner planner) {
101108
super.register(planner);
@@ -147,7 +154,12 @@ public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) {
147154
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
148155
List<String> schema = this.getRowType().getFieldNames();
149156
QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema);
150-
newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder));
157+
newScan.pushDownContext.add(
158+
PushDownAction.of(
159+
PushDownType.FILTER,
160+
filter.getCondition(),
161+
requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)));
162+
151163
// TODO: handle the case where condition contains a score function
152164
return newScan;
153165
} catch (Exception e) {
@@ -169,18 +181,47 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns)
169181
RelDataType newSchema = builder.build();
170182
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema);
171183
newScan.pushDownContext.add(
172-
requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()));
184+
PushDownAction.of(
185+
PushDownType.PROJECT,
186+
newSchema.getFieldNames(),
187+
requestBuilder ->
188+
requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())));
173189
return newScan;
174190
}
175191

192+
// TODO: should we consider equivalent among PushDownContexts with different push down sequence?
176193
static class PushDownContext extends ArrayDeque<PushDownAction> {
177194
@Override
178195
public PushDownContext clone() {
179196
return (PushDownContext) super.clone();
180197
}
181198
}
182199

183-
private interface PushDownAction {
200+
private enum PushDownType {
201+
FILTER,
202+
PROJECT,
203+
// AGGREGATION,
204+
// SORT,
205+
// LIMIT,
206+
// HIGHLIGHT,
207+
// NESTED
208+
}
209+
210+
private record PushDownAction(PushDownType type, Object digest, AbstractAction action) {
211+
static PushDownAction of(PushDownType type, Object digest, AbstractAction action) {
212+
return new PushDownAction(type, digest, action);
213+
}
214+
215+
public String toString() {
216+
return type + ":" + digest;
217+
}
218+
219+
void apply(OpenSearchRequestBuilder requestBuilder) {
220+
action.apply(requestBuilder);
221+
}
222+
}
223+
224+
private interface AbstractAction {
184225
void apply(OpenSearchRequestBuilder requestBuilder);
185226
}
186227
}

0 commit comments

Comments
 (0)