Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -567,19 +568,30 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we assume the head of this collation list must come from the sort command right before reverse command right? Is it always true or the reverse logic below works even though it's not true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, this test uses the EMP table which has a natural ordering by EMPNO. When reverse is called, Calcite's metadata query detects this existing collation and reverses it directly, so we see LogicalSort(sort0=[$0], dir0=[DESC]) instead of ROW_NUMBER. The ROW_NUMBER fallback only occurs when there's no existing collation.
  2. No, the assumption isn't always true, but I think the logic works correctly when its not true. getMetadataQuery().collations() returns all collations Calcite detects (natural orderings, index orderings, sort operations), not just the immediate sort command. We take the first available RelCollation (which may contain multiple sort fields) and reverse all its field collations at once.

Copy link
Collaborator

@dai-chen dai-chen Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Q2: Take query ... | sort | X | Y | Z | reverse for example, we should be good as long as commands in-between (X, Y, Z) doesn't rely on the output order of sort command, right? Just thinking any counterexample, such as ... | sort | head | reverse?

I'm thinking is it too early to do this in visitReverse? Essentially, our CalciteRelNodeVisitor converts AST to its logical representation by RelNode. Just wondering what does it look like if we simply translate reverse to logical sort operator here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question about edge cases! I tested both | sort | head 10 | reverse and | sort | eval | head | reverse patterns and they work correctly. In the first part of visitReverse the SORT ASC and LIMIT for head are combined into LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10]) and then the reverseCollation is added with LogicalSort(sort0=[$0], dir0=[DESC-nulls-last]). The physical plan pushes SORT ASC + LIMIT N to OpenSearch, then applies DESC sort on the limited results. Let me know if there are any other edge cases, Thanks!

context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
context.relBuilder.sort(reversedCollation);
} else {
// Fallback: use ROW_NUMBER approach when no existing sort
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.opensearch.sql.calcite.rule.SortReverseOptimizationRule;

public class OpenSearchRules {
private static final PPLAggregateConvertRule AGGREGATE_CONVERT_RULE =
Expand All @@ -16,6 +17,11 @@ public class OpenSearchRules {
public static final List<RelOptRule> OPEN_SEARCH_OPT_RULES =
ImmutableList.of(AGGREGATE_CONVERT_RULE);

public static final List<RelOptRule> OPTIMIZATION_RULES = ImmutableList.of();

public static final List<RelOptRule> OPEN_SEARCH_POST_AGG_RULES =
ImmutableList.of(SortReverseOptimizationRule.INSTANCE);

// prevent instantiation
private OpenSearchRules() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public void register(RelOptPlanner planner) {
planner.addRule(rule);
}

// Register optimization rules
for (RelOptRule rule : OpenSearchRules.OPTIMIZATION_RULES) {
planner.addRule(rule);
}

// Register post-aggregation rules (run after aggregate optimizations)
for (RelOptRule rule : OpenSearchRules.OPEN_SEARCH_POST_AGG_RULES) {
planner.addRule(rule);
}

// remove this rule otherwise opensearch can't correctly interpret approx_count_distinct()
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.rule;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;

/** Combines sort then reverse into 1 sort. */
Copy link
Collaborator

@yuancu yuancu Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little about curious what's the behavior without this rule. Won't multiple sorts on the same fields be merged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Multiple sorts on the same fields are merged so without this rule physical plans are identical due to Calcite's built-in physical optimization. I think this rule ensures consistent optimization across different query patterns, especially in post-aggregation scenarios where physical rules might not apply.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide an example where default physical rule might not apply?

There is a SortRemoveRule in Calcite that changes the traits based on the Sort collation requirement. For this two sorts case, I think it will pick one of them anyway. Seems this optimization might be overlapped with existing rule.

In physical plan rules, Calcite will ensure query do not lose sort semantics by adding AbstractConverter node over applicable nodes that require sorting. The AbstractConverter node checks if the child collation satisfies the expected collation to decide whether to add a sort between parent and child. The sort added by AbstractConverter should be winner of the two sorts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both for the insightful discussion! You were absolutely correct, and I was initially wrong in my assessment. After further testing and analysis, I've confirmed that:
Calcite's built-in optimization is sufficient - SortRemoveRule and AbstractConverter nodes handle sort merging effectively
Physical plans are identical with or without the custom rule, so the custom rule was redundant
Based on your feedback, I've removed the SortDirectionOptRule entirely. The core improvement remains: reverse detection that flips existing sort directions. Calcite's built-in optimization then handles the sort merging at the physical level.

public class SortReverseOptimizationRule extends RelOptRule {
private static final Logger LOG = LogManager.getLogger(SortReverseOptimizationRule.class);

public static final SortReverseOptimizationRule INSTANCE = new SortReverseOptimizationRule();

private SortReverseOptimizationRule() {
super(
operand(LogicalSort.class, operand(LogicalSort.class, any())),
"SortReverseOptimizationRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
Copy link
Collaborator

@yuancu yuancu Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there specific reasons on overriding the method instead of using a config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case override provides more control for complex matching logic like checking fetch limits and intermediate nodes.

LogicalSort outerSort = call.rel(0);
LogicalSort innerSort = call.rel(1);

LOG.debug("SortReverseOptimizationRule.matches() called");
LOG.debug("Outer sort: {}", outerSort);
LOG.debug("Inner sort: {}", innerSort);
LOG.debug("Inner sort input: {}", innerSort.getInput());

// Don't optimize if outer sort is a LogicalSystemLimit
if (call.rel(0) instanceof LogicalSystemLimit) {
LOG.debug("Skipping: outer sort is LogicalSystemLimit");
return false;
}

// Don't optimize if inner sort has a fetch limit (head/limit before sort)
// This preserves limit-then-sort semantics
if (innerSort.fetch != null) {
LOG.debug("Skipping: inner sort has fetch limit: {}", innerSort.fetch);
return false;
}

// Must be same field with opposite directions (sort | reverse pattern)
boolean matches = hasSameFieldWithOppositeDirection(outerSort, innerSort);
LOG.debug("Same field with opposite direction: {}", matches);
return matches;
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalSort outerSort = call.rel(0);
LogicalSort innerSort = call.rel(1);

LOG.debug("SortReverseOptimizationRule.onMatch() applying transformation");
LOG.debug("Transforming from: {} -> {}", outerSort, innerSort);

LogicalSort optimizedSort =
LogicalSort.create(
innerSort.getInput(), outerSort.getCollation(), outerSort.offset, outerSort.fetch);

LOG.debug("Transformed to: {}", optimizedSort);
call.transformTo(optimizedSort);
}

private boolean hasSameFieldWithOppositeDirection(LogicalSort outerSort, LogicalSort innerSort) {
var outerFields = outerSort.getCollation().getFieldCollations();
var innerFields = innerSort.getCollation().getFieldCollations();

if (outerFields.isEmpty() || innerFields.isEmpty()) {
LOG.debug("No field collations found");
return false;
}

// Must have same number of fields
if (outerFields.size() != innerFields.size()) {
LOG.debug(
"Different number of sort fields: outer={}, inner={}",
outerFields.size(),
innerFields.size());
return false;
}

// Check all fields have same index but opposite directions
for (int i = 0; i < outerFields.size(); i++) {
var outerField = outerFields.get(i);
var innerField = innerFields.get(i);

LOG.debug(
"Field {}: outer(index={}, direction={}), inner(index={}, direction={})",
i,
outerField.getFieldIndex(),
outerField.getDirection(),
innerField.getFieldIndex(),
innerField.getDirection());

if (outerField.getFieldIndex() != innerField.getFieldIndex()
|| outerField.getDirection() == innerField.getDirection()) {
LOG.debug(
"Field {} mismatch: same index={}, opposite direction={}",
i,
outerField.getFieldIndex() == innerField.getFieldIndex(),
outerField.getDirection() != innerField.getDirection());
return false;
}
}

LOG.debug("All fields match with opposite directions");
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -391,4 +394,35 @@ public Void visitInputRef(RexInputRef inputRef) {
visitor.visitEach(rexNodes);
return selectedColumns;
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection =
field.nullDirection == RelFieldCollation.NullDirection.FIRST
? RelFieldCollation.NullDirection.LAST
: field.nullDirection == RelFieldCollation.NullDirection.LAST
? RelFieldCollation.NullDirection.FIRST
: field.nullDirection;

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ public void testFilterFunctionScriptPushDownExplain() throws Exception {
@Test
public void testExplainWithReverse() throws IOException {
String result =
executeWithReplace(
"explain source=opensearch-sql_test_index_account | sort age | reverse | head 5");
executeWithReplace("explain source=opensearch-sql_test_index_account | reverse | head 5");

// Verify that the plan contains a LogicalSort with fetch (from head 5)
assertTrue(result.contains("LogicalSort") && result.contains("fetch=[5]"));
Expand All @@ -217,6 +216,30 @@ public void testExplainWithReverse() throws IOException {
assertTrue(result.contains("dir0=[DESC]"));
}

@Test
public void testExplainWithReversePushdown() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age | reverse";
var result = explainQueryToString(query);
String expected =
isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_reverse_pushdown_single.json")
: loadFromFile(
"expectedOutput/calcite_no_pushdown/explain_reverse_pushdown_single.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithReversePushdownMultipleFields() throws IOException {
String query = "source=opensearch-sql_test_index_account | sort - age, + firstname | reverse";
var result = explainQueryToString(query);
String expected =
isPushdownEnabled()
? loadFromFile("expectedOutput/calcite/explain_reverse_pushdown_multiple.json")
: loadFromFile(
"expectedOutput/calcite_no_pushdown/explain_reverse_pushdown_multiple.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testExplainWithTimechartAvg() throws IOException {
var result = explainQueryToString("source=events | timechart span=1m avg(cpu_usage) by host");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,70 @@ public void testReverseWithComplexPipeline() throws IOException {
}

@Test
public void testReverseWithMultipleSorts() throws IOException {
// Use the existing BANK data but with a simpler, more predictable query
public void testReverseWithDescendingSort() throws IOException {
// Test reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort account_number | fields account_number | reverse | head 3",
"source=%s | sort - account_number | fields account_number | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(result, rows(32), rows(25), rows(20));
verifyDataRowsInOrder(
result, rows(1), rows(6), rows(13), rows(18), rows(20), rows(25), rows(32));
}

@Test
public void testReverseWithMixedSortDirections() throws IOException {
// Test reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(1, "Amber JOHnny"),
rows(6, "Hattie"),
rows(13, "Nanette"),
rows(18, "Dale"),
rows(20, "Elinor"),
rows(25, "Virginia"),
rows(32, "Dillard"));
}

@Test
public void testDoubleReverseWithDescendingSort() throws IOException {
// Test double reverse with descending sort (- age)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number | fields account_number | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"));
verifyDataRowsInOrder(
result, rows(32), rows(25), rows(20), rows(18), rows(13), rows(6), rows(1));
}

@Test
public void testDoubleReverseWithMixedSortDirections() throws IOException {
// Test double reverse with mixed sort directions (- age, + firstname)
JSONObject result =
executeQuery(
String.format(
"source=%s | sort - account_number, + firstname | fields account_number, firstname"
+ " | reverse | reverse",
TEST_INDEX_BANK));
verifySchema(result, schema("account_number", "bigint"), schema("firstname", "string"));
verifyDataRowsInOrder(
result,
rows(32, "Dillard"),
rows(25, "Virginia"),
rows(20, "Elinor"),
rows(18, "Dale"),
rows(13, "Nanette"),
rows(6, "Hattie"),
rows(1, "Amber JOHnny"));
}
}
Loading
Loading