Skip to content

Commit cc61524

Browse files
committed
Refactor a bit of SortExprIndexScanRule
Signed-off-by: Songkan Tang <[email protected]>
1 parent cadac00 commit cc61524

File tree

5 files changed

+38
-52
lines changed

5 files changed

+38
-52
lines changed

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.opensearch.planner.rules;
77

88
import java.util.Optional;
9+
import java.util.function.Predicate;
910
import org.apache.calcite.adapter.enumerable.EnumerableProject;
1011
import org.apache.calcite.plan.RelOptRuleCall;
1112
import org.apache.calcite.plan.RelRule;
@@ -176,7 +177,7 @@ private static CalciteEnumerableIndexScan extractScanFromInput(RelNode input) {
176177
return extractScanFromInput(bestPlan);
177178
}
178179

179-
// During physical optimization, we should have a best plan, but if not available yet,
180+
// During physical optimization, we should have the best plan. But if not available yet,
180181
// we can check the original node (though it's less likely to be CalciteEnumerableIndexScan)
181182
RelNode original = subset.getOriginal();
182183
if (original != null) {
@@ -203,8 +204,9 @@ public interface Config extends RelRule.Config {
203204
.oneInput(
204205
b1 ->
205206
b1.operand(EnumerableProject.class)
206-
.predicate(PlanUtils::projectContainsExpr)
207-
.predicate(p -> !p.containsOver())
207+
.predicate(
208+
Predicate.not(Project::containsOver)
209+
.and(PlanUtils::projectContainsExpr))
208210
.anyInputs()));
209211

210212
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortExprIndexScanRule.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ public void onMatch(RelOptRuleCall call) {
4646
final LogicalProject project = call.rel(1);
4747
final CalciteLogicalIndexScan scan = call.rel(2);
4848

49-
// Only match sort - project - scan when sort keys have expression
49+
// Only match sort - project - scan when any sort key references an expression
5050
if (!PlanUtils.sortReferencesExpr(sort, project)) {
5151
return;
5252
}
5353

5454
boolean scanProvidesRequiredCollation =
5555
OpenSearchRelOptUtil.canScanProvideSortCollation(scan, project, sort.collation);
56-
if (scan.isLimitPushed() && !scanProvidesRequiredCollation) {
56+
if (scan.isTopKPushed() && !scanProvidesRequiredCollation) {
5757
return;
5858
}
5959

@@ -66,43 +66,22 @@ public void onMatch(RelOptRuleCall call) {
6666
}
6767

6868
CalciteLogicalIndexScan newScan;
69-
// If the pushed sort is the same with new one, just pushdown limit if there is
70-
if (scanProvidesRequiredCollation
71-
&& sortExpressionInfos.size() == sort.getCollation().getFieldCollations().size()) {
72-
newScan = scan.copyWithNewSchema(scan.getSchema());
73-
if (sort.fetch != null || sort.offset != null) {
74-
Integer limitValue = LimitIndexScanRule.extractLimitValue(sort.fetch);
75-
Integer offsetValue = LimitIndexScanRule.extractOffsetValue(sort.offset);
76-
if (limitValue != null && offsetValue != null) {
77-
newScan = newScan.pushDownLimitToScan(limitValue, offsetValue);
78-
}
79-
Project newProject =
80-
project.copy(sort.getTraitSet(), newScan, project.getProjects(), project.getRowType());
81-
call.transformTo(newProject);
82-
}
69+
// If the scan's sort info already satisfies new sort, just pushdown limit if there is
70+
if (scanProvidesRequiredCollation && (sort.fetch != null || sort.offset != null)) {
71+
newScan = scan.pushDownLimitToScan(sort.fetch, sort.offset);
8372
} else {
8473
// Attempt to push down sort expressions
8574
newScan = scan.pushdownSortExpr(sortExpressionInfos);
86-
if (newScan != null) {
87-
if (sort.fetch != null || sort.offset != null) {
88-
Integer limitValue = LimitIndexScanRule.extractLimitValue(sort.fetch);
89-
Integer offsetValue = LimitIndexScanRule.extractOffsetValue(sort.offset);
90-
if (limitValue != null && offsetValue != null) {
91-
newScan = newScan.pushDownLimitToScan(limitValue, offsetValue);
92-
}
93-
Project newProject =
94-
project.copy(
95-
sort.getTraitSet(), newScan, project.getProjects(), project.getRowType());
96-
call.transformTo(newProject);
97-
} else {
98-
// Transform the plan to use the new scan with pushed down sort expressions
99-
Project newProject =
100-
project.copy(
101-
sort.getTraitSet(), newScan, project.getProjects(), project.getRowType());
102-
call.transformTo(newProject);
103-
}
75+
if (newScan != null && (sort.fetch != null || sort.offset != null)) {
76+
newScan = newScan.pushDownLimitToScan(sort.fetch, sort.offset);
10477
}
10578
}
79+
80+
if (newScan != null) {
81+
Project newProject =
82+
project.copy(sort.getTraitSet(), newScan, project.getProjects(), project.getRowType());
83+
call.transformTo(newProject);
84+
}
10685
}
10786

10887
/**
@@ -243,6 +222,7 @@ public interface Config extends RelRule.Config {
243222
.oneInput(
244223
b1 ->
245224
b1.operand(LogicalProject.class)
225+
.predicate(Predicate.not(Project::containsOver))
246226
.oneInput(
247227
b2 ->
248228
b2.operand(CalciteLogicalIndexScan.class)

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,6 @@ public void onMatch(RelOptRuleCall call) {
9595
RelCollations.EMPTY,
9696
sort.offset,
9797
sort.fetch);
98-
// Sort limitSort =
99-
// sort.copy(
100-
// sort.getTraitSet().replace(lowerSort.getCollation()),
101-
// project.getInput(),
102-
// lowerSort.getCollation(),
103-
// sort.offset,
104-
// sort.fetch);
10598
result =
10699
project.copy(sort.getTraitSet(), limitSort, project.getProjects(), project.getRowType());
107100
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
4747
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
4848
import org.opensearch.sql.opensearch.planner.rules.EnumerableIndexScanRule;
49+
import org.opensearch.sql.opensearch.planner.rules.LimitIndexScanRule;
4950
import org.opensearch.sql.opensearch.planner.rules.OpenSearchIndexRules;
5051
import org.opensearch.sql.opensearch.request.AggregateAnalyzer;
5152
import org.opensearch.sql.opensearch.request.PredicateAnalyzer;
@@ -427,13 +428,19 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of
427428
return null;
428429
}
429430

430-
public CalciteLogicalIndexScan pushDownLimitToScan(Integer limit, Integer offset) {
431-
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());
432-
newScan.pushDownContext.add(
433-
PushDownType.LIMIT,
434-
new LimitDigest(limit, offset),
435-
(OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownLimit(limit, offset));
436-
return newScan;
431+
public CalciteLogicalIndexScan pushDownLimitToScan(RexNode limit, RexNode offset) {
432+
Integer limitValue = LimitIndexScanRule.extractLimitValue(limit);
433+
Integer offsetValue = LimitIndexScanRule.extractOffsetValue(offset);
434+
if (limitValue != null && offsetValue != null) {
435+
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());
436+
newScan.pushDownContext.add(
437+
PushDownType.LIMIT,
438+
new LimitDigest(limitValue, offsetValue),
439+
(OSRequestBuilderAction)
440+
requestBuilder -> requestBuilder.pushDownLimit(limitValue, offsetValue));
441+
return newScan;
442+
}
443+
return null;
437444
}
438445

439446
/**

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class PushDownContext extends AbstractCollection<PushDownOperation> {
3030
private boolean isProjectPushed = false;
3131
private boolean isMeasureOrderPushed = false;
3232
private boolean isSortPushed = false;
33+
private boolean isSortExprPushed = false;
3334
private boolean isTopKPushed = false;
3435
private boolean isRareTopPushed = false;
3536

@@ -100,7 +101,7 @@ public boolean add(PushDownOperation operation) {
100101
}
101102
if (operation.type() == PushDownType.LIMIT) {
102103
isLimitPushed = true;
103-
if (isSortPushed || isMeasureOrderPushed) {
104+
if (isSortPushed || isMeasureOrderPushed || isSortExprPushed) {
104105
isTopKPushed = true;
105106
}
106107
}
@@ -110,6 +111,9 @@ public boolean add(PushDownOperation operation) {
110111
if (operation.type() == PushDownType.SORT) {
111112
isSortPushed = true;
112113
}
114+
if (operation.type() == PushDownType.SORT_EXPR) {
115+
isSortExprPushed = true;
116+
}
113117
if (operation.type() == PushDownType.SORT_AGG_METRICS) {
114118
isMeasureOrderPushed = true;
115119
}

0 commit comments

Comments
 (0)