General UDAF pushdown as scripts#5064
General UDAF pushdown as scripts#5064songkant-aws wants to merge 16 commits intoopensearch-project:mainfrom
Conversation
📝 WalkthroughSummary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughAdds scripted-metric UDAF pushdown for pattern aggregation: introduces PatternAggregationHelpers, scripted-metric UDAF framework (UDAF interface, registry, per-phase factories, ScriptedMetricDataContext), Calcite/UDF integration, OpenSearch scripted-metric wiring, a runtime setting to toggle pushdown, tests, and docs/plan updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Coord as Coordinator
participant DN1 as DataNode1
participant DN2 as DataNode2
participant DN3 as DataNode3
participant Reducer as Reducer
Client->>Coord: Submit pattern aggregation query (udaf pushdown enabled)
Coord->>DN1: InitScript (state init)
DN1->>DN1: MapScript (per-doc add -> buffer/partial merge)
DN1->>DN1: CombineScript (emit shard state)
Coord->>DN2: InitScript
DN2->>DN2: MapScript
DN2->>DN2: CombineScript
Coord->>DN3: InitScript
DN3->>DN3: MapScript
DN3->>DN3: CombineScript
DN1-->>Reducer: Shard state 1 (map)
DN2-->>Reducer: Shard state 2
DN3-->>Reducer: Shard state 3
Reducer->>Reducer: ReduceScript (merge shard states -> PatternAggregationHelpers.produce)
Reducer-->>Client: Final pattern results (List<Map<String,Object>>)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
Signed-off-by: Songkan Tang <songkant@amazon.com>
d03bc0b to
ac5994e
Compare
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
| public static final Setting<?> CALCITE_UDAF_PUSHDOWN_ENABLED_SETTING = | ||
| Setting.boolSetting( | ||
| Key.CALCITE_UDAF_PUSHDOWN_ENABLED.getKeyValue(), | ||
| false, | ||
| Setting.Property.NodeScope, | ||
| Setting.Property.Dynamic); | ||
|
|
There was a problem hiding this comment.
Is it a a feature flag (cluster setting) that gates UDAF pushdown on/off at runtime? This flag will be deprecated without notice if push-down stable?
There was a problem hiding this comment.
Yes, it's a feature flag to gates pushdown feature at runtime.
But I'm still cautious about deprecation. There is a comment left in OpenSearch ScriptedMetricAggregator: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregator.java#L65-L73. It bypasses circuit breaker. It will be prudent to keep a kill switch.
| "UDAF pushdown is disabled. Enable it via cluster setting" | ||
| + " 'plugins.calcite.udaf_pushdown.enabled'"); | ||
| } | ||
| yield ScriptedMetricUDAFRegistry.INSTANCE |
There was a problem hiding this comment.
As this PR need to backport to 2.x which using JDK 11, Can we avoid yield keyword?
There was a problem hiding this comment.
I heard we will no longer backport. So could we keep it here?
| .orElseThrow( | ||
| () -> | ||
| new AggregateAnalyzerException( | ||
| String.format( | ||
| "No scripted metric UDAF registered for %s", functionName))); |
There was a problem hiding this comment.
when will it happen? is it a bug in our system?
There was a problem hiding this comment.
For now, it's not possible because we hardcoded the registry with only one supported UDAF(patterns) in this PR. It's a defensive check to provide more meaning info in case it really happens.
| ScriptParameterHelper initParamHelper = | ||
| new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder); | ||
| ScriptParameterHelper mapParamHelper = | ||
| new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder); | ||
| ScriptParameterHelper combineParamHelper = | ||
| new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder); | ||
| ScriptParameterHelper reduceParamHelper = | ||
| new ScriptParameterHelper(fieldList, fieldTypes, rexBuilder); |
There was a problem hiding this comment.
There 4 parameters seem to be exactly the same. Do we really need all of them?
There was a problem hiding this comment.
Oh, I get it. There is actually status in each param helper. We should clear these status if we want to reuse it, otherwise it will get messed.
It looks fine then.
There was a problem hiding this comment.
Actually, different phases of UDAF script will use different RexNodes to serialize script. So, yes. They will have different status of sources, digests in ScriptParameterHelper.
| new ScriptContext(rexBuilder, combineParamHelper, cluster, rowType, fieldTypes); | ||
| ScriptContext reduceContext = | ||
| new ScriptContext(rexBuilder, reduceParamHelper, cluster, rowType, fieldTypes); | ||
|
|
There was a problem hiding this comment.
Ditto. Since the param helpers are the same. The ScriptContext should be the same as well.
| mapArgs.add(getArgOrDefault(args, 1, makeIntLiteral(rexBuilder, DEFAULT_MAX_SAMPLE_COUNT))); | ||
| mapArgs.add(getArgOrDefault(args, 2, makeIntLiteral(rexBuilder, DEFAULT_BUFFER_LIMIT))); | ||
| mapArgs.add( | ||
| getArgOrDefault(args, 5, makeIntLiteral(rexBuilder, DEFAULT_VARIABLE_COUNT_THRESHOLD))); | ||
| mapArgs.add( | ||
| getArgOrDefault(args, 4, makeDoubleLiteral(rexBuilder, DEFAULT_THRESHOLD_PERCENTAGE))); | ||
|
|
There was a problem hiding this comment.
Shall these change to dynamic params? Otherwise, the script won't be reusable if these values differs.
There was a problem hiding this comment.
In this part, we only define RexNode. The assignment of dynamic params for literals will be performed at serialization by RexStandardizer.standardizeRexNodeExpression.
| reduceArgs.add(getArgOrDefault(args, 1, makeIntLiteral(rexBuilder, DEFAULT_MAX_SAMPLE_COUNT))); | ||
|
|
||
| // Determine variableCountThreshold and thresholdPercentage | ||
| RexNode variableCountThreshold = makeIntLiteral(rexBuilder, DEFAULT_VARIABLE_COUNT_THRESHOLD); | ||
| RexNode thresholdPercentage = makeDoubleLiteral(rexBuilder, DEFAULT_THRESHOLD_PERCENTAGE); |
There was a problem hiding this comment.
Same as above response.
|
This PR is stalled because it has been open for 2 weeks with no activity. |
|
Update the PR with a simple performance comparison between queries with pushdown disabled versus pushdown enabled. The test query was executed on a three-shard index containing approximately 2.7 million rows of log messages, with profiling enabled:
Baseline(Pushdown disabled) UDAF Pushdown enabled Results show that the query with pushdown enabled is 2.7 times faster than without pushdown when running across three shards. The performance improvement scales with the number of shards available for parallel processing. |
PR Reviewer Guide 🔍(Review updated until commit 16da4de)Here are some key observations to aid the review process:
|
|
Persistent review updated to latest commit a4a6e4e |
PR Code Suggestions ✨Latest suggestions up to 16da4de Explore these optional code suggestions:
Suggestions up to commit a4a6e4e
Suggestions up to commit a4a6e4e
|
…reduce Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 0dece50 |
Signed-off-by: Songkan Tang <songkant@amazon.com>
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit d815a87.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
|
Persistent review updated to latest commit d815a87 |
Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit bf583fa |
|
This PR is stalled because it has been open for 2 weeks with no activity. |
|
Hi @songkant-aws, any updates on this PR? |
|
This PR is stalled because it has been open for 2 weeks with no activity. |
|
Persistent review updated to latest commit 16da4de |
Description
Push down any UDAF as scripts to allow parallel evaluating sub aggregation result per shard and reduce them into a final aggregation result. We expect it will speed up some complex command like patterns or future UDAFs. Pending benchmark test.
Related Issues
Resolves #4354
Check List
--signoffor-s.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.