Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -567,19 +568,30 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we assume the head of this collation list must come from the sort command right before reverse command right? Is it always true or the reverse logic below works even though it's not true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, this test uses the EMP table which has a natural ordering by EMPNO. When reverse is called, Calcite's metadata query detects this existing collation and reverses it directly, so we see LogicalSort(sort0=[$0], dir0=[DESC]) instead of ROW_NUMBER. The ROW_NUMBER fallback only occurs when there's no existing collation.
  2. No, the assumption isn't always true, but I think the logic works correctly when its not true. getMetadataQuery().collations() returns all collations Calcite detects (natural orderings, index orderings, sort operations), not just the immediate sort command. We take the first available RelCollation (which may contain multiple sort fields) and reverse all its field collations at once.

Copy link
Collaborator

@dai-chen dai-chen Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Q2: Take query ... | sort | X | Y | Z | reverse for example, we should be good as long as commands in-between (X, Y, Z) doesn't rely on the output order of sort command, right? Just thinking any counterexample, such as ... | sort | head | reverse?

I'm thinking is it too early to do this in visitReverse? Essentially, our CalciteRelNodeVisitor converts AST to its logical representation by RelNode. Just wondering what does it look like if we simply translate reverse to logical sort operator here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question about edge cases! I tested both | sort | head 10 | reverse and | sort | eval | head | reverse patterns and they work correctly. In the first part of visitReverse the SORT ASC and LIMIT for head are combined into LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10]) and then the reverseCollation is added with LogicalSort(sort0=[$0], dir0=[DESC-nulls-last]). The physical plan pushes SORT ASC + LIMIT N to OpenSearch, then applies DESC sort on the limited results. Let me know if there are any other edge cases, Thanks!

context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
context.relBuilder.sort(reversedCollation);
} else {
// Fallback: use ROW_NUMBER approach when no existing sort
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.opensearch.sql.calcite.rule.SortDirectionOptRule;

public class OpenSearchRules {
private static final PPLAggregateConvertRule AGGREGATE_CONVERT_RULE =
Expand All @@ -16,6 +17,9 @@ public class OpenSearchRules {
public static final List<RelOptRule> OPEN_SEARCH_OPT_RULES =
ImmutableList.of(AGGREGATE_CONVERT_RULE);

public static final List<RelOptRule> OPEN_SEARCH_POST_AGG_RULES =
ImmutableList.of(SortDirectionOptRule.INSTANCE);

// prevent instantiation
private OpenSearchRules() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public void register(RelOptPlanner planner) {
planner.addRule(rule);
}

// Register post-aggregation rules (run after aggregate optimizations)
for (RelOptRule rule : OpenSearchRules.OPEN_SEARCH_POST_AGG_RULES) {
planner.addRule(rule);
}

// remove this rule otherwise opensearch can't correctly interpret approx_count_distinct()
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.rule;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;

/** Combines consecutive sorts with opposite directions into 1 sort. */
public class SortDirectionOptRule extends RelOptRule {
private static final Logger LOG = LogManager.getLogger(SortDirectionOptRule.class);

public static final SortDirectionOptRule INSTANCE = new SortDirectionOptRule();

private SortDirectionOptRule() {
super(
operand(LogicalSort.class,
operand(org.apache.calcite.rel.RelNode.class,
operand(LogicalSort.class, any()))),
"SortDirectionOptRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
LogicalSort outerSort = call.rel(0);
org.apache.calcite.rel.RelNode intermediate = call.rel(1);
LogicalSort innerSort = call.rel(2);

LOG.debug("SortDirectionOptRule.matches() - outer: {}, intermediate: {}, inner: {}",
outerSort, intermediate, innerSort);

// Only allow single-input intermediate nodes (like LogicalProject)
if (intermediate.getInputs().size() != 1) {
LOG.debug("Intermediate node has {} inputs, expected 1", intermediate.getInputs().size());
return false;
}

// Don't optimize if inner sort has a fetch limit (head/limit before sort)
// This preserves limit-then-sort semantics
// Example: source=t | head 5 | sort field | reverse
// Plan: Sort(reverse) -> Sort(field, fetch=5) -> Scan
// Should NOT be optimized to preserve the "take first 5, then sort" behavior
if (innerSort.fetch != null) {
LOG.debug("Skipping: inner sort has fetch limit: {}", innerSort.fetch);
return false;
}

// Must be same field with opposite directions (sort | reverse pattern)
boolean matches = hasSameFieldWithOppositeDirection(outerSort, innerSort);
LOG.debug("Same field with opposite direction: {}", matches);
return matches;
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalSort outerSort = call.rel(0);
org.apache.calcite.rel.RelNode intermediate = call.rel(1);
LogicalSort innerSort = call.rel(2);
Copy link
Contributor

@songkant-aws songkant-aws Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this pattern needs careful consideration. There are lots of cases to be considered.

If intermediate is a Project node without WINDOW function, the pattern is supposed to be optimized by SortProjectTransposeRule. And it will be changed to Project - outerSort - innerSort. Other bunch of rules may optimize the changed pattern to keep outerSort.

If the Project contains WINDOW function, we should not change outerSort under the intermediate node. Because it may change its semantics.

If the intermediate node is aggregation, suppose the sort is on the group by column, I think the outerSort over aggregated result will be more efficient because the row count will be reduced a lot after aggregation. Check if Calcite can make it like outerSort(groupByColumn) - Aggregate(count) by groupByColumn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, you're right about these cases. Given that Calcite's built-in optimization already handles these cases correctly and efficiently, I've removed the custom optimization rule entirely. This eliminates the risk of interfering with Calcite's sophisticated rule interactions while preserving the core performance improvement.


LOG.debug("SortDirectionOptRule.onMatch() transforming: {} -> {}", outerSort, innerSort);

// Create optimized sort with the final direction
LogicalSort optimizedSort =
LogicalSort.create(
innerSort.getInput(), outerSort.getCollation(), outerSort.offset, outerSort.fetch);

// Recreate the intermediate node with the optimized sort as input
org.apache.calcite.rel.RelNode newIntermediate =
intermediate.copy(intermediate.getTraitSet(),
java.util.Collections.singletonList(optimizedSort));

LOG.debug("Transformed to: {}", newIntermediate);
call.transformTo(newIntermediate);
}

private boolean hasSameFieldWithOppositeDirection(LogicalSort outerSort, LogicalSort innerSort) {
var outerFields = outerSort.getCollation().getFieldCollations();
var innerFields = innerSort.getCollation().getFieldCollations();

if (outerFields.isEmpty() || innerFields.isEmpty()) {
LOG.debug("No field collations found");
return false;
}

// Must have same number of fields
if (outerFields.size() != innerFields.size()) {
LOG.debug(
"Different number of sort fields: outer={}, inner={}",
outerFields.size(),
innerFields.size());
return false;
}

// Check all fields have same index but opposite directions
for (int i = 0; i < outerFields.size(); i++) {
var outerField = outerFields.get(i);
var innerField = innerFields.get(i);

LOG.debug(
"Field {}: outer(index={}, direction={}), inner(index={}, direction={})",
i,
outerField.getFieldIndex(),
outerField.getDirection(),
innerField.getFieldIndex(),
innerField.getDirection());

if (outerField.getFieldIndex() != innerField.getFieldIndex()
|| outerField.getDirection() == innerField.getDirection()) {
LOG.debug(
"Field {} mismatch: same index={}, opposite direction={}",
i,
outerField.getFieldIndex() == innerField.getFieldIndex(),
outerField.getDirection() != innerField.getDirection());
return false;
}
}

LOG.debug("All fields match with opposite directions");
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -391,4 +394,35 @@ public Void visitInputRef(RexInputRef inputRef) {
visitor.visitEach(rexNodes);
return selectedColumns;
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection =
field.nullDirection == RelFieldCollation.NullDirection.FIRST
? RelFieldCollation.NullDirection.LAST
: field.nullDirection == RelFieldCollation.NullDirection.LAST
? RelFieldCollation.NullDirection.FIRST
: field.nullDirection;

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ public void testFilterFunctionScriptPushDownExplain() throws Exception {
@Test
public void testExplainWithReverse() throws IOException {
String result =
executeWithReplace(
"explain source=opensearch-sql_test_index_account | sort age | reverse | head 5");
executeWithReplace("explain source=opensearch-sql_test_index_account | reverse | head 5");

// Verify that the plan contains a LogicalSort with fetch (from head 5)
assertTrue(result.contains("LogicalSort") && result.contains("fetch=[5]"));
Expand All @@ -217,23 +216,37 @@ public void testExplainWithReverse() throws IOException {
assertTrue(result.contains("dir0=[DESC]"));
}

@Test
public void testExplainWithReversePushdown() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age | reverse";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_reverse_pushdown_single.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithReversePushdownMultipleFields() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age, + firstname | reverse";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_reverse_pushdown_multiple.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithTimechartAvg() throws IOException {
var result = explainQueryToString("source=events | timechart span=1m avg(cpu_usage) by host");
String expected =
isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_timechart.json")
: loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.json");
String expected = isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_timechart.json")
: loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithTimechartCount() throws IOException {
var result = explainQueryToString("source=events | timechart span=1m count() by host");
String expected =
isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_timechart_count.json")
: loadFromFile("expectedOutput/calcite/explain_timechart_count_no_pushdown.json");
String expected = isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_timechart_count.json")
: loadFromFile("expectedOutput/calcite/explain_timechart_count_no_pushdown.json");
assertJsonEqualsIgnoreId(expected, result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,70 @@ public void testReverseWithComplexPipeline() throws IOException {
}

@Test
public void testReverseWithMultipleSorts() throws IOException {
// Use the existing BANK data but with a simpler, more predictable query
public void testReverseWithDescendingSort() throws IOException {
// Test reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort account_number | fields account_number | reverse | head 3",
"source=%s | sort - account_number | fields account_number | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(result, rows(32), rows(25), rows(20));
verifyDataRowsInOrder(
result, rows(1), rows(6), rows(13), rows(18), rows(20), rows(25), rows(32));
}

@Test
public void testReverseWithMixedSortDirections() throws IOException {
// Test reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(1, "Amber JOHnny"),
rows(6, "Hattie"),
rows(13, "Nanette"),
rows(18, "Dale"),
rows(20, "Elinor"),
rows(25, "Virginia"),
rows(32, "Dillard"));
}

@Test
public void testDoubleReverseWithDescendingSort() throws IOException {
// Test double reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number | fields account_number | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(
result, rows(32), rows(25), rows(20), rows(18), rows(13), rows(6), rows(1));
}

@Test
public void testDoubleReverseWithMixedSortDirections() throws IOException {
// Test double reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(32, "Dillard"),
rows(25, "Virginia"),
rows(20, "Elinor"),
rows(18, "Dale"),
rows(13, "Nanette"),
rows(6, "Hattie"),
rows(1, "Amber JOHnny"));
}
}
Loading
Loading