Skip to content

Commit ba803dc

Browse files
committed
Implement reverse performance optimization
This commit optimizes the `reverse` command in the Calcite planner by intelligently reversing existing sort collations instead of always using the ROW_NUMBER() approach. Key changes: - Added PlanUtils.reverseCollation() method to flip sort directions and null directions - Updated CalciteRelNodeVisitor.visitReverse() to: - Check for existing sort collations - Reverse them if present (more efficient) - Fall back to ROW_NUMBER() when no sort exists - Added comprehensive integration test expected outputs for: - Single field reverse pushdown - Multiple field reverse pushdown - Reverse fallback cases - Double reverse no-op optimizations This optimization significantly improves performance when reversing already-sorted data by leveraging database-native sort reversal. Based on PR opensearch-project#4056 by @selsong Signed-off-by: Kai Huang <[email protected]>
1 parent 74d67da commit ba803dc

File tree

10 files changed

+107
-13
lines changed

10 files changed

+107
-13
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
4747
import org.apache.calcite.plan.RelOptTable;
4848
import org.apache.calcite.plan.ViewExpanders;
49+
import org.apache.calcite.rel.RelCollation;
4950
import org.apache.calcite.rel.RelNode;
5051
import org.apache.calcite.rel.core.Aggregate;
5152
import org.apache.calcite.rel.core.JoinRelType;
@@ -652,19 +653,30 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
652653
public RelNode visitReverse(
653654
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
654655
visitChildren(node, context);
655-
// Add ROW_NUMBER() column
656-
RexNode rowNumber =
657-
context
658-
.relBuilder
659-
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
660-
.over()
661-
.rowsTo(RexWindowBounds.CURRENT_ROW)
662-
.as(REVERSE_ROW_NUM);
663-
context.relBuilder.projectPlus(rowNumber);
664-
// Sort by row number descending
665-
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
666-
// Remove row number column
667-
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));
656+
657+
// Check if there's an existing sort to reverse
658+
List<RelCollation> collations =
659+
context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
660+
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;
661+
662+
if (collation != null && !collation.getFieldCollations().isEmpty()) {
663+
// If there's an existing sort, reverse its direction
664+
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
665+
context.relBuilder.sort(reversedCollation);
666+
} else {
667+
// Fallback: use ROW_NUMBER approach when no existing sort
668+
RexNode rowNumber =
669+
context
670+
.relBuilder
671+
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
672+
.over()
673+
.rowsTo(RexWindowBounds.CURRENT_ROW)
674+
.as(REVERSE_ROW_NUM);
675+
context.relBuilder.projectPlus(rowNumber);
676+
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
677+
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));
678+
}
679+
668680
return context.relBuilder.peek();
669681
}
670682

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import java.util.stream.Collectors;
2323
import javax.annotation.Nullable;
2424
import org.apache.calcite.plan.RelOptTable;
25+
import org.apache.calcite.rel.RelCollation;
26+
import org.apache.calcite.rel.RelCollations;
27+
import org.apache.calcite.rel.RelFieldCollation;
2528
import org.apache.calcite.rel.RelHomogeneousShuttle;
2629
import org.apache.calcite.rel.RelNode;
2730
import org.apache.calcite.rel.RelShuttle;
@@ -568,6 +571,37 @@ public Void visitCorrelVariable(RexCorrelVariable correlVar) {
568571
}
569572
}
570573

574+
/**
575+
* Reverses the direction of a RelCollation.
576+
*
577+
* @param original The original collation to reverse
578+
* @return A new RelCollation with reversed directions
579+
*/
580+
public static RelCollation reverseCollation(RelCollation original) {
581+
if (original == null || original.getFieldCollations().isEmpty()) {
582+
return original;
583+
}
584+
585+
List<RelFieldCollation> reversedFields = new ArrayList<>();
586+
for (RelFieldCollation field : original.getFieldCollations()) {
587+
RelFieldCollation.Direction reversedDirection = field.direction.reverse();
588+
589+
// Handle null direction properly - reverse it as well
590+
RelFieldCollation.NullDirection reversedNullDirection =
591+
field.nullDirection == RelFieldCollation.NullDirection.FIRST
592+
? RelFieldCollation.NullDirection.LAST
593+
: field.nullDirection == RelFieldCollation.NullDirection.LAST
594+
? RelFieldCollation.NullDirection.FIRST
595+
: field.nullDirection;
596+
597+
RelFieldCollation reversedField =
598+
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
599+
reversedFields.add(reversedField);
600+
}
601+
602+
return RelCollations.of(reversedFields);
603+
}
604+
571605
/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
572606
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
573607
try {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1])\n LogicalSort(sort0=[$8], dir0=[DESC])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1], _id=[$2], _index=[$3], _score=[$4], _maxscore=[$5], _sort=[$6], _routing=[$7], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n LogicalSort(sort0=[$8], dir0=[DESC])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1], _id=[$2], _index=[$3], _score=[$4], _maxscore=[$5], _sort=[$6], _routing=[$7], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, events]])\n",
4+
"physical": "EnumerableCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}])\n EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$3], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableSort(sort0=[$2], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"cpu_usage\",\"@timestamp\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], SORT->[{\n \"age\" : {\n \"order\" : \"desc\",\n \"missing\" : \"_last\"\n }\n}, {\n \"firstname.keyword\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_first\"\n }\n}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"desc\",\"missing\":\"_last\"}},{\"firstname.keyword\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$17], dir0=[DESC], fetch=[5])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..11=[{inputs}], proj#0..10=[{exprs}])\n EnumerableLimit(fetch=[5])\n EnumerableSort(sort0=[$11], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_first\"\n }\n}, {\n \"firstname.keyword\" : {\n \"order\" : \"desc\",\n \"missing\" : \"_last\"\n }\n}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}},{\"firstname.keyword\":{\"order\":\"desc\",\"missing\":\"_last\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(sort0=[$8], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], dir0=[ASC-nulls-first])\n LogicalSort(sort0=[$8], dir0=[DESC-nulls-last])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_first\"\n }\n}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$17], dir0=[DESC], fetch=[5])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..17=[{inputs}], proj#0..10=[{exprs}])\n EnumerableLimit(fetch=[5])\n EnumerableSort(sort0=[$17], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last])\n EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(sort0=[$8], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], dir0=[ASC-nulls-first])\n LogicalSort(sort0=[$8], dir0=[DESC-nulls-last])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$8], dir0=[ASC-nulls-first])\n EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
5+
}
6+
}

0 commit comments

Comments
 (0)