Skip to content

Conversation

@xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Nov 20, 2025

Changes

Add maxRowsInDistinct/numRowsWithoutChangeInDistinct/maxExecutionTimeMsInDistinct plumbing throughout the distinct pipeline: query options, executors, dictionary plan, and broker/server metadata so early termination is reported consistently

Teach every distinct executor (raw & dictionary, single & multi column) to respect a per-query row budget by clamping block ranges, and surface the remaining allowance to DistinctOperator

Update DistinctOperator and DictionaryBasedDistinctOperator to program the row budget, compute accurate stats, and attach the standard early-termination reason on the results block

Add lightweight executor-level tests plus a custom integration suite that covers scalar/MV/multi-column distinct queries and both early-termination knobs for single- and multi-stage engines

Release Notes

Pinot now lets you short‑circuit expensive DISTINCT scans by telling the server when enough rows have been examined. Two broker query options control this behavior:

Option Type Effect
maxRowsInDistinct positive integer Stop reading once this many rows have been processed, even if more rows remain in the segment. Useful when you only need a best‑effort subset of keys.
numRowsWithoutChangeInDistinct positive integer Stop reading after this many additional rows fail to produce a new distinct key. Ideal for low‑cardinality columns—once the set stabilizes, you can quit early.
maxExecutionTimeMsInDistinct positive integer Stop reading after certain time for a block.

Both options apply per server. When either trigger fires, the server marks the response as a partial result (maxRowsInDistinctReached or numRowsWithoutChangeInDistinctReached), so the broker and client can decide whether to trust or retry the query.

Sample Queries

Limit the Total Rows Scanned

SET "maxRowsInDistinct" = 10_000;
SELECT DISTINCT city_id FROM trips WHERE status = 'COMPLETED';

Stops after 10k rows per server. The result set may contain fewer keys than the column’s full cardinality, but it returns quickly on large tables.

Stop Once the Distinct Set Stabilizes

SET "numRowsWithoutChangeInDistinct" = 5_000;
SELECT DISTINCT tenant_id FROM impressions WHERE date = '2024-09-01';

Once the server reads 5k additional rows without discovering a new tenant, it stops and marks the response as numRowsWithoutChangeInDistinctReached=true.

Combine Both Guards

SET "maxRowsInDistinct" = 20_000;
SET "numRowsWithoutChangeInDistinct" = 2_000;
SELECT DISTINCT campaign_id FROM clicks WHERE country = 'US';

This query will exit as soon as either budget is exhausted.

Response Metadata

When an early stop happens you’ll see the following broker response fields set to true:

maxRowsInDistinctReached
numRowsWithoutChangeInDistinctReached
partialResult

Downstream clients should check these flags before trusting the result.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements early-termination controls for DISTINCT queries by adding two new query options (maxRowsInDistinct and numRowsWithoutChangeInDistinct) and plumbing them through the distinct pipeline. The key changes enable operators to stop processing early when row budgets are exhausted or when no new distinct values are found, and properly surface these conditions in broker responses and statistics.

Key Changes

  • Added two new query options for controlling DISTINCT early termination
  • Implemented row budget tracking and enforcement in distinct executors
  • Added early termination reason enum and metadata propagation through the query pipeline

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java Added query option keys for distinct early termination
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java Added EarlyTerminationReason enum and metadata support
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java Added default methods for row budget tracking in distinct executors
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java Implemented row budget enforcement with proper range clamping
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawMultiColumnDistinctExecutor.java Added row budget tracking for multi-column raw executor
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctExecutor.java Added row budget tracking for multi-column dictionary executor
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java Implemented early termination logic with row budget and no-change detection
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java Added row budget clamping for dictionary-based distinct queries
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java Added utility methods to parse distinct early termination options
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java Added fields and methods for distinct early termination flags
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java Added multi-stage support for distinct early termination flags
pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java Added EARLY_TERMINATION_REASON metadata key
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java Added aggregation logic for distinct early termination reasons
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java Added stat key handling for distinct early termination
pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java Added unit tests for distinct early termination
pinot-core/src/test/java/org/apache/pinot/core/query/distinct/DistinctExecutorEarlyTerminationTest.java Added executor-level tests for row budget enforcement
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java Added tests for early termination stat recording
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java Added integration tests covering both single and multi-stage engines

@codecov-commenter
Copy link

codecov-commenter commented Nov 20, 2025

Codecov Report

❌ Patch coverage is 62.86127% with 257 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.15%. Comparing base (0a951fa) to head (417ab75).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...y/distinct/raw/RawMultiColumnDistinctExecutor.java 38.88% 79 Missing and 9 partials ⚠️
...ry/DictionaryBasedMultiColumnDistinctExecutor.java 50.87% 48 Missing and 8 partials ⚠️
...ery/distinct/BaseSingleColumnDistinctExecutor.java 58.33% 23 Missing and 7 partials ⚠️
...uery/distinct/DistinctEarlyTerminationContext.java 73.86% 12 Missing and 11 partials ⚠️
...common/response/broker/BrokerResponseNativeV2.java 26.66% 11 Missing ⚠️
...perator/query/DictionaryBasedDistinctOperator.java 80.00% 9 Missing ⚠️
...he/pinot/core/operator/query/DistinctOperator.java 81.63% 4 Missing and 5 partials ⚠️
...he/pinot/core/query/distinct/DistinctExecutor.java 0.00% 9 Missing ⚠️
...tor/combine/merger/DistinctResultsBlockMerger.java 87.69% 0 Missing and 8 partials ⚠️
...che/pinot/query/runtime/operator/LeafOperator.java 60.00% 6 Missing and 2 partials ⚠️
... and 2 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17247      +/-   ##
============================================
+ Coverage     63.11%   63.15%   +0.03%     
- Complexity     1478     1479       +1     
============================================
  Files          3173     3175       +2     
  Lines        189915   190535     +620     
  Branches      29064    29228     +164     
============================================
+ Hits         119872   120332     +460     
- Misses        60711    60837     +126     
- Partials       9332     9366      +34     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.12% <62.86%> (-36.88%) ⬇️
java-21 63.12% <62.86%> (+<0.01%) ⬆️
temurin 63.15% <62.86%> (+0.03%) ⬆️
unittests 63.15% <62.86%> (+0.03%) ⬆️
unittests1 55.55% <62.86%> (+0.06%) ⬆️
unittests2 33.92% <0.86%> (-0.09%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 2 times, most recently from 7b07625 to 5e2cbc9 Compare November 21, 2025 08:43
@xiangfu0 xiangfu0 requested a review from Copilot November 21, 2025 08:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from 5e2cbc9 to 20eb844 Compare November 21, 2025 09:21
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept is okay, but the support needs to be integrated into the combine operator. Currently it is a per segment limit, which is not very useful

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from 20eb844 to dc1ab0d Compare November 30, 2025 07:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 5 comments.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 5 times, most recently from 55bf800 to a7214d2 Compare December 3, 2025 04:56
@xiangfu0 xiangfu0 requested a review from Copilot December 10, 2025 03:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 5 times, most recently from d8910c4 to 7b038c8 Compare December 16, 2025 02:39
@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 3 times, most recently from a5f6d80 to d5cb462 Compare December 19, 2025 22:59
@xiangfu0 xiangfu0 changed the title Add distinct early‑termination controls with executor/unit/integration coverage Support DISTINCT early-termination budgets (row/no-change/time) and surface partial-result flags Dec 20, 2025
@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from d5cb462 to cd3d015 Compare December 21, 2025 09:10
@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 3 times, most recently from dc73f87 to 82adb3c Compare January 4, 2026 03:21
@xiangfu0 xiangfu0 requested a review from Copilot January 4, 2026 04:58
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 32 changed files in this pull request and generated 7 comments.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from 82adb3c to a641555 Compare January 4, 2026 15:00
@xiangfu0 xiangfu0 requested a review from Copilot January 4, 2026 15:03
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 32 changed files in this pull request and generated no new comments.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 2 times, most recently from 53cd208 to 50d9c44 Compare January 12, 2026 14:38
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you separate the query option related fix into a separate PR? We don't want to mix them

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No new values is not tracked above segment level. We should also early terminate the combine operator.

We should also track if there is no new value added during the results block merge. Applying it just within segment is not good enough. E.g. if there is no new value added when merging a new segment result, we should count the entire segment numDocsScanned as rows without adding new value

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from 50d9c44 to 488adaa Compare January 13, 2026 18:58
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do per-row check because it is too expensive. The check should be applied at block level at least.
We can probably do time and rows at segment level

@Jackie-Jiang
Copy link
Contributor

Please run a benchmark to quantify the performance overhead of enabling tracking

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 2 times, most recently from e87ce79 to 771e983 Compare January 15, 2026 19:55
@xiangfu0
Copy link
Contributor Author

Please run a benchmark to quantify the performance overhead of enabling tracking

did one round of the perf benchmark, no significant change:

Distinct MSQE Tracking Overhead

Run summary

  • Date: 2026-01-16 14:45:53
  • Benchmark: org.apache.pinot.perf.BenchmarkDistinctQueriesMSQE
  • Dataset rows per segment: 1500000 (2 segments)
  • Scenarios: EXP(0.001), EXP(0.5), EXP(0.999)
  • JMH: 1.37
  • JDK: OpenJDK 17.0.15
  • Warmup: 5 x 1s, Measurement: 10 x 1s, Forks: 3

Command

JAVA_HOME=/opt/homebrew/Cellar/openjdk@17/17.0.15/libexec/openjdk.jdk/Contents/Home \
PATH=/opt/homebrew/Cellar/openjdk@17/17.0.15/libexec/openjdk.jdk/Contents/Home/bin:$PATH \
JAVA_TOOL_OPTIONS="--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED" \
java -jar pinot-perf/target/benchmarks.jar BenchmarkDistinctQueriesMSQE \
  -p _scenario='EXP(0.001),EXP(0.5),EXP(0.999)' -p _trackingMode=enabled,disabled -wi 5 -i 10 -f 3 \
  -rf json -rff /tmp/jmh-distinct-msqe.json

Results (avg ms/op)

Scenario Query Enabled (ms ± err) Disabled (ms ± err) Delta (ms) Delta (%) Within combined err?
EXP(0.001) SELECT DISTINCT INT_COL FROM MyTable LIMIT 100000 4.401 ± 0.313 4.505 ± 0.372 -0.104 -2.3% yes
EXP(0.001) SELECT DISTINCT INT_COL FROM MyTable ORDER BY INT_COL DESC LIMIT 100000 5.897 ± 0.419 5.730 ± 0.331 0.167 2.9% yes
EXP(0.001) SELECT DISTINCT INT_COL, LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 100000 115.513 ± 6.820 109.707 ± 2.066 5.806 5.3% yes
EXP(0.001) SELECT DISTINCT LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 1000 2.513 ± 0.315 2.396 ± 0.356 0.117 4.9% yes
EXP(0.001) SELECT DISTINCT RAW_STRING_COL FROM MyTable LIMIT 100000 99.181 ± 1.644 97.187 ± 1.633 1.995 2.1% yes
EXP(0.001) SELECT DISTINCT RAW_STRING_COL FROM MyTable WHERE LOW_CARDINALITY_STRING_COL = 'value1' LIMIT 100000 32.749 ± 0.623 33.785 ± 1.160 -1.036 -3.1% yes
EXP(0.5) SELECT DISTINCT INT_COL FROM MyTable LIMIT 100000 2.546 ± 0.316 2.562 ± 0.365 -0.015 -0.6% yes
EXP(0.5) SELECT DISTINCT INT_COL FROM MyTable ORDER BY INT_COL DESC LIMIT 100000 2.527 ± 0.408 2.483 ± 0.363 0.045 1.8% yes
EXP(0.5) SELECT DISTINCT INT_COL, LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 100000 42.006 ± 0.785 44.234 ± 3.330 -2.229 -5.0% yes
EXP(0.5) SELECT DISTINCT LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 1000 2.458 ± 0.348 2.584 ± 0.261 -0.126 -4.9% yes
EXP(0.5) SELECT DISTINCT RAW_STRING_COL FROM MyTable LIMIT 100000 69.407 ± 2.116 70.811 ± 6.542 -1.404 -2.0% yes
EXP(0.5) SELECT DISTINCT RAW_STRING_COL FROM MyTable WHERE LOW_CARDINALITY_STRING_COL = 'value1' LIMIT 100000 17.463 ± 0.472 17.431 ± 0.384 0.033 0.2% yes
EXP(0.999) SELECT DISTINCT INT_COL FROM MyTable LIMIT 100000 2.407 ± 0.402 2.434 ± 0.312 -0.027 -1.1% yes
EXP(0.999) SELECT DISTINCT INT_COL FROM MyTable ORDER BY INT_COL DESC LIMIT 100000 2.446 ± 0.426 2.690 ± 0.425 -0.243 -9.0% yes
EXP(0.999) SELECT DISTINCT INT_COL, LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 100000 41.789 ± 0.519 43.100 ± 1.493 -1.311 -3.0% yes
EXP(0.999) SELECT DISTINCT LOW_CARDINALITY_STRING_COL FROM MyTable LIMIT 1000 2.534 ± 0.360 2.474 ± 0.353 0.060 2.4% yes
EXP(0.999) SELECT DISTINCT RAW_STRING_COL FROM MyTable LIMIT 100000 71.486 ± 2.025 70.870 ± 1.141 0.616 0.9% yes
EXP(0.999) SELECT DISTINCT RAW_STRING_COL FROM MyTable WHERE LOW_CARDINALITY_STRING_COL = 'value1' LIMIT 100000 18.834 ± 0.500 18.970 ± 0.660 -0.137 -0.7% yes

Overhead evaluation

  • Positive delta means tracking enabled is slower (overhead).
  • Negative delta means tracking enabled is faster.
  • For all scenario/query pairs, deltas remain within the combined 99.9% CI error bounds, so no significant difference is established.

Notes

  • JMH reported lingering Netty/async threads after completion; forks were force-terminated after the shutdown timeout.
  • The run logged warnings about direct reserved memory; consider running with more direct memory if noise persists.

@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch 4 times, most recently from 57c9c4c to f89ea74 Compare January 29, 2026 16:00
Support early termination in combine operator
@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from f407b7e to c39be1c Compare January 30, 2026 07:30
@xiangfu0 xiangfu0 force-pushed the early-termination-distinct-operator branch from c39be1c to 417ab75 Compare January 30, 2026 23:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants