Skip to content

Commit 66662ae

Browse files
authored
Fix bug that Streamstats command incorrectly treats null as a valid group (#4777)
* use null to cover the aggregate value when group value is null Signed-off-by: Xinyu Hao <[email protected]> * fix test Signed-off-by: Xinyu Hao <[email protected]> * fix some problems Signed-off-by: Xinyu Hao <[email protected]> * add IT case Signed-off-by: Xinyu Hao <[email protected]> --------- Signed-off-by: Xinyu Hao <[email protected]>
1 parent daf1795 commit 66662ae

File tree

10 files changed

+310
-194
lines changed

10 files changed

+310
-194
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1703,7 +1703,10 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
17031703
new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS});
17041704
}
17051705

1706-
// Default
1706+
// Default: first get rawExpr
1707+
List<RexNode> overExpressions =
1708+
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
1709+
17071710
if (hasGroup) {
17081711
// only build sequence when there is by condition
17091712
RexNode streamSeq =
@@ -1714,21 +1717,54 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
17141717
.rowsTo(RexWindowBounds.CURRENT_ROW)
17151718
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
17161719
context.relBuilder.projectPlus(streamSeq);
1717-
}
17181720

1719-
List<RexNode> overExpressions =
1720-
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
1721-
context.relBuilder.projectPlus(overExpressions);
1721+
// construct groupNotNull predicate
1722+
List<RexNode> groupByList =
1723+
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1724+
List<RexNode> notNullList =
1725+
PlanUtils.getSelectColumns(groupByList).stream()
1726+
.map(context.relBuilder::field)
1727+
.map(context.relBuilder::isNotNull)
1728+
.toList();
1729+
RexNode groupNotNull = context.relBuilder.and(notNullList);
17221730

1723-
// resort when there is by condition
1724-
if (hasGroup) {
1731+
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1732+
List<RexNode> wrappedOverExprs =
1733+
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1734+
context.relBuilder.projectPlus(wrappedOverExprs);
1735+
// resort when there is by condition
17251736
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
17261737
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
1738+
} else {
1739+
context.relBuilder.projectPlus(overExpressions);
17271740
}
17281741

17291742
return context.relBuilder.peek();
17301743
}
17311744

1745+
private List<RexNode> wrapWindowFunctionsWithGroupNotNull(
1746+
List<RexNode> overExpressions, RexNode groupNotNull, CalcitePlanContext context) {
1747+
List<RexNode> wrappedOverExprs = new ArrayList<>(overExpressions.size());
1748+
for (RexNode overExpr : overExpressions) {
1749+
RexNode rawExpr = overExpr;
1750+
String aliasName = null;
1751+
if (overExpr instanceof RexCall rc && rc.getOperator() == SqlStdOperatorTable.AS) {
1752+
rawExpr = rc.getOperands().get(0);
1753+
if (rc.getOperands().size() >= 2 && rc.getOperands().get(1) instanceof RexLiteral lit) {
1754+
aliasName = lit.getValueAs(String.class);
1755+
}
1756+
}
1757+
RexNode nullLiteral = context.rexBuilder.makeNullLiteral(rawExpr.getType());
1758+
RexNode caseExpr =
1759+
context.rexBuilder.makeCall(SqlStdOperatorTable.CASE, groupNotNull, rawExpr, nullLiteral);
1760+
if (aliasName != null) {
1761+
caseExpr = context.relBuilder.alias(caseExpr, aliasName);
1762+
}
1763+
wrappedOverExprs.add(caseExpr);
1764+
}
1765+
return wrappedOverExprs;
1766+
}
1767+
17321768
private RelNode buildStreamWindowJoinPlan(
17331769
CalcitePlanContext context,
17341770
RelNode leftWithHelpers,

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,7 @@ public void testExplainOnFirstLast() throws IOException {
745745
}
746746

747747
// Only for Calcite
748+
@Test
748749
public void testExplainOnEventstatsEarliestLatest() throws IOException {
749750
String expected = loadExpectedPlan("explain_eventstats_earliest_latest.json");
750751
assertJsonEqualsIgnoreId(
@@ -782,6 +783,7 @@ public void testExplainOnEventstatsEarliestLatestNoGroupBy() throws IOException
782783
TEST_INDEX_LOGS)));
783784
}
784785

786+
@Test
785787
public void testExplainOnStreamstatsEarliestLatest() throws IOException {
786788
String expected = loadExpectedPlan("explain_streamstats_earliest_latest.yaml");
787789
assertYamlEqualsIgnoreId(

0 commit comments

Comments
 (0)