Skip to content

Support Streamstats command with calcite#4297

Merged
qianheng-aws merged 34 commits intoopensearch-project:mainfrom
ishaoxy:streamstats-command
Nov 4, 2025
Merged

Support Streamstats command with calcite#4297
qianheng-aws merged 34 commits intoopensearch-project:mainfrom
ishaoxy:streamstats-command

Conversation

@ishaoxy
Copy link
Copy Markdown
Contributor

@ishaoxy ishaoxy commented Sep 15, 2025

Description

Support Streamstats command with arguments below:

streamstats
  [current=<bool>]
  [window=<int>]
  [global=<bool>]
  [reset_before="("<eval-expression>")"]
  [reset_after="("<eval-expression>")"]
  stats-agg-term
  [by-clause]

Also rule out aggregator in PPLAggregateMergeRule.

Implementation Details

The implementation handles three distinct execution paths, depending on the combination of window, global, group, and reset arguments:

Why This Design

  1. Default path can rely on native SQL OVER because there is no global/window-with-reset complexity.

  2. Specific SQL limitations:

  • Native SQL OVER clauses cannot implement per-group sliding windows over the entire stream . However, we want to combine a global sequence with group-level partitioning. In SQL, a window is either global without a BY clause or partitioned by a group with a BY clause; you cannot have a “global sequence plus per-group sliding frame” in one OVER.

  • ROWS BETWEEN ... PRECEDING cannot take a variable (it only supports constants like 1 PRECEDING, 1+1 PRECEDING).

  1. Global + window + group want "per-group sliding windows over entire stream," but SQL window functions do not allow fully flexible frame boundaries combined with lateral joins. Hence, we simulate it via ROW_NUMBER() + correlated join + aggregate.

  2. Reset path introduces segment semantics (seg_id) that cannot be represented natively in SQL OVER clauses. Each reset creates a new frame partition. By default, reset behaves like a global window, but when grouping exists, it applies per-group aggregation within each reset segment. So I use helper columns (before_flag, after_flag, seg_id) and a correlated join ensures correctness.

1. Default Path (No global in use / no reset)

  • Window functions are translated directly using visitWindowFunction().
  • Calcite OVER clauses are generated for each aggregate.
  • SQL-like plan:
SELECT *, <window_function_over(...)>
FROM source

2. global=true + window > 0 + group exists

To support sliding windows over the entire stream with optional grouping:

  • A global sequence column (ROW_NUMBER() OVER (ORDER BY ...) AS seq) is added.
  • Correlated LEFT JOINs simulate the sliding window using seq and by-clause equality filters.
  • Each window function is converted into a standard aggregate (AggregateFunction) and executed within the correlated subquery.
  • SQL-like plan:
WITH t AS (
  SELECT x.*,
         ROW_NUMBER() OVER (ORDER BY /* default ordering */) AS seq
  FROM source x
)
SELECT t.*, agg.*
FROM t
LEFT JOIN LATERAL (
  SELECT SUM(age) AS sum_age
  FROM t r
  WHERE r.seq BETWEEN t.seq - (:window - 1) AND t.seq
    AND r.gender IS NOT DISTINCT FROM t.gender
) AS agg ON TRUE;

3. Reset Path (reset_before / reset_after defined)

When reset_before or reset_after exist:

  • Helper columns are added:
  1. stream_seq: global row number.
  2. reset_before_flag / reset_after_flag: flags for reset conditions.
  3. seg_id: segment ID, computed via SUM over flags to identify partitions.
  • Correlated LEFT JOIN + aggregate simulates the frame while respecting segment boundaries (seg_id) and optional group filtering.
  • SQL-like plan:
WITH base AS (
  SELECT s.*,
         ROW_NUMBER() OVER (ORDER BY /* default */) AS seq,
         CASE WHEN (/reset_before predicate/) THEN 1 ELSE 0 END AS before_flag,
         CASE WHEN (/reset_after  predicate/) THEN 1 ELSE 0 END AS after_flag
  FROM source s
),
seg AS (
  SELECT b.*,
         COALESCE(SUM(before_flag) OVER (ORDER BY seq ROWS UNBOUNDED PRECEDING), 0)
       + COALESCE(SUM(after_flag)  OVER (ORDER BY seq ROWS UNBOUNDED PRECEDING
                                        RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0)
       AS seg_id
  FROM base b
)
SELECT t.*, agg.*
FROM seg t
LEFT JOIN LATERAL (
  SELECT /* window aggregates: SUM(age), AVG(salary), ... */
  FROM seg r
  WHERE (
    CASE
      WHEN :window = 0 AND :current  THEN r.seq <= t.seq
      WHEN :window = 0 AND NOT :current THEN r.seq <  t.seq
      WHEN :current  THEN r.seq BETWEEN t.seq - (:window - 1) AND t.seq
      ELSE               r.seq BETWEEN t.seq - :window AND t.seq - 1
    END
  )
  AND r.seg_id = t.seg_id
  AND (r.gender IS NOT DISTINCT FROM t.gender) -- optional by-clause
) AS agg ON TRUE;

Related Issues

Resolves #4207

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Copy link
Copy Markdown
Collaborator

@yuancu yuancu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks for your contribution. I left some comments for minor suggestions.

Also, please resolve the conflicts

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
@LantaoJin LantaoJin added feature PPL Piped processing language labels Sep 18, 2025

// aggregate all window functions on right side
List<AggCall> aggCalls = buildAggCallsForWindowFunctions(node.getWindowFunctionList(), context);
context.relBuilder.aggregate(context.relBuilder.groupKey(), aggCalls);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note (for reviewers): the group key is empty because the partition is done with buildGroupFilter(context, groupList, v.get())

yuancu
yuancu previously approved these changes Oct 28, 2025
Copy link
Copy Markdown
Collaborator

@yuancu yuancu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ishaoxy
Copy link
Copy Markdown
Contributor Author

ishaoxy commented Oct 28, 2025

SELECT
t.,
SUM(age) OVER (
PARTITION BY gender
ORDER BY seq
ROWS BETWEEN (:window - 1) PRECEDING AND CURRENT ROW
) as sum_age
FROM (
SELECT x.
,
ROW_NUMBER() OVER (ORDER BY /* default ordering */) AS seq
FROM source x
) t;

I think the second case is the same as the default solution window=n, global=false+by, but it cannot achieve the case of window=n, global=true+by, which is why I used the former expression.

And for Reset path, I have done a lot of experiments on SPL, and I think our current implementation is consistent with the behavior of it. @qianheng-aws

Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Copy link
Copy Markdown
Member

@LantaoJin LantaoJin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically looks good, please fix the IT.

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
@ishaoxy
Copy link
Copy Markdown
Contributor Author

ishaoxy commented Oct 31, 2025

The "global" path behavior is influenced by pr #4703 and resulted in an incorrect performance. Thanks to @yuancu 's help, now aggregator is excluded in PPLAggregateMergeRule in this pr.

Comment on lines +1632 to +1633
context,
left,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.relBuilder.projectExcept was out of the hasGroup branch, now you move it into the branch. Is it on purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change is just a small optimization. It only creates the row_num column and sorts and deletes it when hasGroup is true in the default path. Previously, row_num would be created in all cases but only sorted when hasGroup is true. I think this meets the expectations and does not introduce any logical changes.

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
@qianheng-aws qianheng-aws merged commit 5077062 into opensearch-project:main Nov 4, 2025
35 checks passed
@opensearch-trigger-bot
Copy link
Copy Markdown
Contributor

The backport to 2.19-dev failed:

The process '/usr/bin/git' failed with exit code 128

To backport manually, run these commands in your terminal:

# Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/sql/backport-2.19-dev 2.19-dev
# Navigate to the new working tree
pushd ../.worktrees/sql/backport-2.19-dev
# Create a new branch
git switch --create backport/backport-4297-to-2.19-dev
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 5077062932cfefe785f72716d2f2f7aa65177817
# Push it to GitHub
git push --set-upstream origin backport/backport-4297-to-2.19-dev
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/sql/backport-2.19-dev

Then, create a pull request where the base branch is 2.19-dev and the compare/head branch is backport/backport-4297-to-2.19-dev.

expani pushed a commit to vinaykpud/sql that referenced this pull request Nov 4, 2025
* support streamstats simply

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add some tests

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add UT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix some error

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add global

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* implement global

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* implement reset

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* implement all the arguments

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix test

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add all IT, UT and rst doc

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix anonymizer test

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix doctest

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* modify doc and IT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add explainIT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix import

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix typo

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix doctest

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix explainIT yaml format

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix dc nopushdown explainIT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* add explainIT for path2 and path3

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* typo error

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* handle resort case

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix IT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* change row_num

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* Rule out aggregator from PPLAggregateMergeRule

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* Rule out aggregator from PPLAggregateMergeRule

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

* fix explainIT

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>

---------

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
Co-authored-by: Yuanchun Shen <yuanchu@amazon.com>
@LantaoJin LantaoJin added the backport-manually Filed a PR to backport manually. label Nov 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport 2.19-dev backport-failed backport-manually Filed a PR to backport manually. feature PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Implement streamstats command in PPL

5 participants