Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -567,19 +569,30 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
context.relBuilder.sort(reversedCollation);
} else {
// Fallback: use ROW_NUMBER approach when no existing sort
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.convert.ConverterRule;
import org.opensearch.sql.calcite.rule.SortReverseOptimizationRule;

public class OpenSearchRules {
private static final PPLAggregateConvertRule AGGREGATE_CONVERT_RULE =
PPLAggregateConvertRule.Config.SUM_CONVERTER.toRule();

public static final List<RelOptRule> OPEN_SEARCH_OPT_RULES =
public static final List<ConverterRule> OPEN_SEARCH_OPT_RULES = ImmutableList.of();

public static final List<RelOptRule> OPTIMIZATION_RULES =
ImmutableList.of(SortReverseOptimizationRule.INSTANCE);

public static final List<RelOptRule> OPEN_SEARCH_OPT_RULES_OLD =
ImmutableList.of(AGGREGATE_CONVERT_RULE);

// prevent instantiation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void register(RelOptPlanner planner) {
for (RelOptRule rule : OpenSearchRules.OPEN_SEARCH_OPT_RULES) {
planner.addRule(rule);
}

// Register optimization rules
for (RelOptRule rule : OpenSearchRules.OPTIMIZATION_RULES) {
planner.addRule(rule);
}

// remove this rule otherwise opensearch can't correctly interpret approx_count_distinct()
// it is converted to cardinality aggregation in OpenSearch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.rule;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.logical.LogicalSort;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;

/**
* Optimization rule that eliminates redundant consecutive sorts on the same field.
* Detects: LogicalSort(field, direction1) -> LogicalSort(field, direction2)
* Converts to: LogicalSort(field, direction1) (keeps outer sort)
*/
public class SortReverseOptimizationRule extends RelOptRule {

public static final SortReverseOptimizationRule INSTANCE = new SortReverseOptimizationRule();

private SortReverseOptimizationRule() {
super(operand(LogicalSort.class,
operand(LogicalSort.class, any())),
"SortReverseOptimizationRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
Copy link
Collaborator

@yuancu yuancu Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there specific reasons on overriding the method instead of using a config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case override provides more control for complex matching logic like checking fetch limits and intermediate nodes.

LogicalSort outerSort = call.rel(0);
LogicalSort innerSort = call.rel(1);

// Don't optimize if outer sort is a LogicalSystemLimit - we want to preserve system limits
if (call.rel(0) instanceof LogicalSystemLimit) {
return false;
}

return hasSameField(outerSort, innerSort);
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalSort outerSort = call.rel(0);
LogicalSort innerSort = call.rel(1);

LogicalSort optimizedSort = LogicalSort.create(
innerSort.getInput(),
outerSort.getCollation(),
outerSort.offset,
outerSort.fetch);

call.transformTo(optimizedSort);
}

private boolean hasSameField(LogicalSort outerSort, LogicalSort innerSort) {
if (outerSort.getCollation().getFieldCollations().isEmpty()
|| innerSort.getCollation().getFieldCollations().isEmpty()) {
return false;
}

int outerField = outerSort.getCollation().getFieldCollations().get(0).getFieldIndex();
int innerField = innerSort.getCollation().getFieldCollations().get(0).getFieldIndex();
return outerField == innerField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -391,4 +394,36 @@ public Void visitInputRef(RexInputRef inputRef) {
visitor.visitEach(rexNodes);
return selectedColumns;
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection = field.nullDirection;
if (reversedNullDirection == RelFieldCollation.NullDirection.FIRST) {
reversedNullDirection = RelFieldCollation.NullDirection.LAST;
} else if (reversedNullDirection == RelFieldCollation.NullDirection.LAST) {
reversedNullDirection = RelFieldCollation.NullDirection.FIRST;
}
// UNSPECIFIED remains UNSPECIFIED

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ public void testFilterFunctionScriptPushDownExplain() throws Exception {
@Test
public void testExplainWithReverse() throws IOException {
String result =
executeWithReplace(
"explain source=opensearch-sql_test_index_account | sort age | reverse | head 5");
executeWithReplace("explain source=opensearch-sql_test_index_account | reverse | head 5");

// Verify that the plan contains a LogicalSort with fetch (from head 5)
assertTrue(result.contains("LogicalSort") && result.contains("fetch=[5]"));
Expand All @@ -217,6 +216,23 @@ public void testExplainWithReverse() throws IOException {
assertTrue(result.contains("dir0=[DESC]"));
}

@Test
public void testExplainWithReversePushdown() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age | reverse";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_reverse_pushdown_single.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithReversePushdownMultipleFields() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age, + firstname | reverse";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_reverse_pushdown_multiple.json");
assertJsonEqualsIgnoreId(expected, result);
}

<<<<<<< HEAD
@Test
public void testExplainWithTimechartAvg() throws IOException {
var result = explainQueryToString("source=events | timechart span=1m avg(cpu_usage) by host");
Expand All @@ -237,6 +253,44 @@ public void testExplainWithTimechartCount() throws IOException {
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testDoubleReverseWithSortNoOp() throws IOException {
String query =
"source=opensearch-sql_test_index_account | sort - age, + firstname | reverse | reverse";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_double_reverse_sort_no_op.json");
assertJsonEqualsIgnoreId(expected, result);
}
=======
// @Test
// public void testDoubleReverseNoOp() throws IOException {
// String query =
// "source=opensearch-sql_test_index_account | fields account_number | reverse | reverse";
// var result = explainQueryToString(query);
// String expected = loadFromFile("expectedOutput/calcite/explain_double_reverse_no_op.json");
// assertJsonEqualsIgnoreId(expected, result);
// }

// @Test
// public void testTripleReverseOneOp() throws IOException {
// String query =
// "source=opensearch-sql_test_index_account | fields account_number | reverse | reverse |"
// + " reverse";
// var result = explainQueryToString(query);
// String expected = loadFromFile("expectedOutput/calcite/explain_triple_reverse_one_op.json");
// assertJsonEqualsIgnoreId(expected, result);
// }
//
// @Test
// public void testDoubleReverseWithSortNoOp() throws IOException {
// String query =
// "source=opensearch-sql_test_index_account | sort - age, + firstname | reverse | reverse";
// var result = explainQueryToString(query);
// String expected = loadFromFile("expectedOutput/calcite/explain_double_reverse_sort_no_op.json");
// assertJsonEqualsIgnoreId(expected, result);
// }
>>>>>>> f6a6803f3 (add opt rule for sort flip)

@Test
public void noPushDownForAggOnWindow() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,70 @@ public void testReverseWithComplexPipeline() throws IOException {
}

@Test
public void testReverseWithMultipleSorts() throws IOException {
// Use the existing BANK data but with a simpler, more predictable query
public void testReverseWithDescendingSort() throws IOException {
// Test reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort account_number | fields account_number | reverse | head 3",
"source=%s | sort - account_number | fields account_number | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(result, rows(32), rows(25), rows(20));
verifyDataRowsInOrder(
result, rows(1), rows(6), rows(13), rows(18), rows(20), rows(25), rows(32));
}

@Test
public void testReverseWithMixedSortDirections() throws IOException {
// Test reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(1, "Amber JOHnny"),
rows(6, "Hattie"),
rows(13, "Nanette"),
rows(18, "Dale"),
rows(20, "Elinor"),
rows(25, "Virginia"),
rows(32, "Dillard"));
}

@Test
public void testDoubleReverseWithDescendingSort() throws IOException {
// Test double reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number | fields account_number | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(
result, rows(32), rows(25), rows(20), rows(18), rows(13), rows(6), rows(1));
}

@Test
public void testDoubleReverseWithMixedSortDirections() throws IOException {
// Test double reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(32, "Dillard"),
rows(25, "Virginia"),
rows(20, "Elinor"),
rows(18, "Dale"),
rows(13, "Nanette"),
rows(6, "Hattie"),
rows(1, "Amber JOHnny"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1])\n LogicalSort(sort0=[$8], dir0=[DESC])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1], _id=[$2], _index=[$3], _score=[$4], _maxscore=[$5], _sort=[$6], _routing=[$7], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n LogicalSort(sort0=[$8], dir0=[DESC])\n LogicalProject(cpu_usage=[$0], @timestamp=[$1], _id=[$2], _index=[$3], _score=[$4], _maxscore=[$5], _sort=[$6], _routing=[$7], __reverse_row_num__=[ROW_NUMBER() OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, events]])\n",
"physical": "EnumerableCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}])\n EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$3], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableSort(sort0=[$2], dir0=[DESC])\n EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"cpu_usage\",\"@timestamp\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Comment on lines +1 to +6
Copy link
Collaborator

@yuancu yuancu Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this explain result and explain_double_reverse_sort_no_op used somewhere? I guess they were used for the elimination rule of double reverse. Then you delete the rule and relevant tests

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], SORT->[{\n \"age\" : {\n \"order\" : \"desc\",\n \"missing\" : \"_last\"\n }\n}, {\n \"firstname.keyword\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_first\"\n }\n}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"desc\",\"missing\":\"_last\"}},{\"firstname.keyword\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age=[$8])\n LogicalSort(sort0=[$8], dir0=[ASC-nulls-first])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
"physical": "EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC-nulls-last])\n LogicalSort(sort0=[$8], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_first\"\n }\n}, {\n \"firstname.keyword\" : {\n \"order\" : \"desc\",\n \"missing\" : \"_last\"\n }\n}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}},{\"firstname.keyword\":{\"order\":\"desc\",\"missing\":\"_last\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Loading
Loading