Skip to content

Commit d8910c4

Browse files
committed
Honor distinct early termination limits within blocks
1 parent 7a35b88 commit d8910c4

File tree

6 files changed

+377
-113
lines changed

6 files changed

+377
-113
lines changed

pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
7878
protected DistinctResultsBlock getNextBlock() {
7979
DistinctExecutor executor = DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext);
8080
executor.setMaxRowsToProcess(_maxRowsInDistinct);
81+
executor.setNumRowsWithoutChangeInDistinct(_numRowsWithoutChangeInDistinct);
8182
ValueBlock valueBlock;
8283
boolean enforceRowLimit = _maxRowsInDistinct != UNLIMITED_ROWS;
8384
boolean enforceNoChangeLimit = _numRowsWithoutChangeInDistinct != UNLIMITED_ROWS;
@@ -86,24 +87,30 @@ protected DistinctResultsBlock getNextBlock() {
8687
_hitMaxRowsLimit = true;
8788
break;
8889
}
89-
int rowsRemainingBefore = executor.getRemainingRowsToProcess();
90+
if (enforceNoChangeLimit && executor.isNumRowsWithoutChangeLimitReached()) {
91+
_hitNoChangeLimit = true;
92+
break;
93+
}
94+
int rowsProcessedBefore = executor.getNumRowsProcessed();
9095
int distinctCountBeforeBlock = enforceNoChangeLimit ? executor.getNumDistinctRowsCollected() : -1;
9196
boolean satisfied = executor.process(valueBlock);
92-
int rowsRemainingAfter = executor.getRemainingRowsToProcess();
93-
int docsProcessedForLimit =
94-
enforceRowLimit ? Math.max(0, rowsRemainingBefore - rowsRemainingAfter) : valueBlock.getNumDocs();
95-
_numDocsScanned += docsProcessedForLimit;
97+
int rowsProcessedForBlock = executor.getNumRowsProcessed() - rowsProcessedBefore;
98+
_numDocsScanned += rowsProcessedForBlock;
9699
if (enforceRowLimit && _numDocsScanned >= _maxRowsInDistinct) {
97100
_hitMaxRowsLimit = true;
98101
}
99102
if (enforceNoChangeLimit) {
100-
int distinctCountAfterBlock = executor.getNumDistinctRowsCollected();
101-
if (distinctCountAfterBlock > distinctCountBeforeBlock) {
102-
_numRowsWithoutNewDistinct = 0;
103+
if (executor.isNumRowsWithoutChangeLimitReached()) {
104+
_hitNoChangeLimit = true;
103105
} else {
104-
_numRowsWithoutNewDistinct += docsProcessedForLimit;
105-
if (_numRowsWithoutNewDistinct >= _numRowsWithoutChangeInDistinct) {
106-
_hitNoChangeLimit = true;
106+
int distinctCountAfterBlock = executor.getNumDistinctRowsCollected();
107+
if (distinctCountAfterBlock > distinctCountBeforeBlock) {
108+
_numRowsWithoutNewDistinct = 0;
109+
} else {
110+
_numRowsWithoutNewDistinct += rowsProcessedForBlock;
111+
if (_numRowsWithoutNewDistinct >= _numRowsWithoutChangeInDistinct) {
112+
_hitNoChangeLimit = true;
113+
}
107114
}
108115
}
109116
}

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java

Lines changed: 85 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.pinot.core.common.BlockValSet;
2323
import org.apache.pinot.core.operator.blocks.ValueBlock;
2424
import org.apache.pinot.core.query.distinct.table.DistinctTable;
25-
import org.roaringbitmap.PeekableIntIterator;
2625
import org.roaringbitmap.RoaringBitmap;
2726

2827

@@ -35,6 +34,10 @@ public abstract class BaseSingleColumnDistinctExecutor<T extends DistinctTable,
3534
protected final ExpressionContext _expression;
3635
protected final T _distinctTable;
3736
private int _rowsRemaining = UNLIMITED_ROWS;
37+
private int _numRowsProcessed = 0;
38+
private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
39+
private int _numRowsWithoutChange = 0;
40+
private boolean _numRowsWithoutChangeLimitReached = false;
3841

3942
public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinctTable) {
4043
_expression = expression;
@@ -47,86 +50,78 @@ public void setMaxRowsToProcess(int maxRows) {
4750
}
4851

4952
@Override
50-
public boolean process(ValueBlock valueBlock) {
51-
if (_rowsRemaining <= 0) {
52-
return true;
53-
}
54-
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
55-
int numDocs = valueBlock.getNumDocs();
56-
if (_distinctTable.isNullHandlingEnabled() && blockValueSet.isSingleValue()) {
57-
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
58-
if (nullBitmap != null && !nullBitmap.isEmpty()) {
59-
return processWithNull(blockValueSet, numDocs, nullBitmap);
60-
} else {
61-
return processWithoutNull(blockValueSet, numDocs);
62-
}
63-
} else {
64-
return processWithoutNull(blockValueSet, numDocs);
65-
}
53+
public void setNumRowsWithoutChangeInDistinct(int numRowsWithoutChangeInDistinct) {
54+
_numRowsWithoutChangeLimit = numRowsWithoutChangeInDistinct;
6655
}
6756

68-
private boolean processWithNull(BlockValSet blockValueSet, int numDocs, RoaringBitmap nullBitmap) {
69-
int limitedNumDocs = clampToRemaining(0, numDocs);
70-
if (limitedNumDocs <= 0) {
71-
return true;
72-
}
73-
_distinctTable.addNull();
74-
S values = getValuesSV(blockValueSet);
75-
PeekableIntIterator nullIterator = nullBitmap.getIntIterator();
76-
int prev = 0;
77-
while (nullIterator.hasNext() && prev < limitedNumDocs) {
78-
int nextNull = nullIterator.next();
79-
if (nextNull > prev) {
80-
int rangeEnd = Math.min(nextNull, limitedNumDocs);
81-
if (processSVRange(values, prev, rangeEnd)) {
82-
return true;
83-
}
84-
}
85-
prev = nextNull + 1;
86-
}
87-
if (prev < limitedNumDocs) {
88-
return processSVRange(values, prev, limitedNumDocs);
89-
}
90-
return false;
57+
@Override
58+
public boolean isNumRowsWithoutChangeLimitReached() {
59+
return _numRowsWithoutChangeLimitReached;
9160
}
9261

93-
/**
94-
* Processes a range of single-value values, respecting the row budget.
95-
* @param values the single-value values
96-
* @param from the start index (inclusive)
97-
* @param to the end index (exclusive)
98-
* @return true if processing should stop early, false otherwise
99-
*/
100-
private boolean processSVRange(S values, int from, int to) {
101-
int limitedTo = clampToRemaining(from, to);
102-
if (limitedTo <= from) {
62+
@Override
63+
public int getNumRowsProcessed() {
64+
return _numRowsProcessed;
65+
}
66+
67+
@Override
68+
public boolean process(ValueBlock valueBlock) {
69+
if (shouldStopProcessing()) {
10370
return true;
10471
}
105-
if (processSV(values, from, limitedTo)) {
72+
BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
73+
int numDocs = clampToRemaining(valueBlock.getNumDocs());
74+
if (numDocs <= 0) {
10675
return true;
10776
}
108-
consumeRows(limitedTo - from);
109-
return _rowsRemaining <= 0;
110-
}
111-
112-
private boolean processWithoutNull(BlockValSet blockValueSet, int numDocs) {
113-
if (blockValueSet.isSingleValue()) {
114-
int limitedTo = clampToRemaining(0, numDocs);
115-
if (limitedTo <= 0) {
116-
return true;
77+
boolean limitReached = false;
78+
if (_distinctTable.isNullHandlingEnabled() && blockValueSet.isSingleValue()) {
79+
RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
80+
S values = getValuesSV(blockValueSet);
81+
for (int docId = 0; docId < numDocs; docId++) {
82+
if (shouldStopProcessing()) {
83+
break;
84+
}
85+
boolean isNull = nullBitmap != null && nullBitmap.contains(docId);
86+
int sizeBefore = _distinctTable.size();
87+
if (isNull) {
88+
_distinctTable.addNull();
89+
} else {
90+
limitReached = processSV(values, docId, docId + 1);
91+
}
92+
recordRowProcessed(_distinctTable.size() > sizeBefore);
93+
if (limitReached) {
94+
break;
95+
}
96+
}
97+
} else if (blockValueSet.isSingleValue()) {
98+
S values = getValuesSV(blockValueSet);
99+
for (int docId = 0; docId < numDocs; docId++) {
100+
if (shouldStopProcessing()) {
101+
break;
102+
}
103+
int sizeBefore = _distinctTable.size();
104+
limitReached = processSV(values, docId, docId + 1);
105+
recordRowProcessed(_distinctTable.size() > sizeBefore);
106+
if (limitReached) {
107+
break;
108+
}
117109
}
118-
boolean satisfied = processSV(getValuesSV(blockValueSet), 0, limitedTo);
119-
consumeRows(limitedTo);
120-
return satisfied || _rowsRemaining <= 0;
121110
} else {
122-
int limitedTo = clampToRemaining(0, numDocs);
123-
if (limitedTo <= 0) {
124-
return true;
111+
M values = getValuesMV(blockValueSet);
112+
for (int docId = 0; docId < numDocs; docId++) {
113+
if (shouldStopProcessing()) {
114+
break;
115+
}
116+
int sizeBefore = _distinctTable.size();
117+
limitReached = processMV(values, docId, docId + 1);
118+
recordRowProcessed(_distinctTable.size() > sizeBefore);
119+
if (limitReached) {
120+
break;
121+
}
125122
}
126-
boolean satisfied = processMV(getValuesMV(blockValueSet), 0, limitedTo);
127-
consumeRows(limitedTo);
128-
return satisfied || _rowsRemaining <= 0;
129123
}
124+
return limitReached || shouldStopProcessing();
130125
}
131126

132127
/**
@@ -164,19 +159,34 @@ public int getRemainingRowsToProcess() {
164159
return _rowsRemaining;
165160
}
166161

167-
private int clampToRemaining(int from, int to) {
162+
private int clampToRemaining(int numDocs) {
168163
if (_rowsRemaining == UNLIMITED_ROWS) {
169-
return to;
164+
return numDocs;
170165
}
171166
if (_rowsRemaining <= 0) {
172-
return from;
167+
return 0;
173168
}
174-
return Math.min(to, from + _rowsRemaining);
169+
return Math.min(numDocs, _rowsRemaining);
175170
}
176171

177-
private void consumeRows(int count) {
172+
private void recordRowProcessed(boolean distinctChanged) {
173+
_numRowsProcessed++;
178174
if (_rowsRemaining != UNLIMITED_ROWS) {
179-
_rowsRemaining -= count;
175+
_rowsRemaining--;
180176
}
177+
if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
178+
if (distinctChanged) {
179+
_numRowsWithoutChange = 0;
180+
} else {
181+
_numRowsWithoutChange++;
182+
if (_numRowsWithoutChange >= _numRowsWithoutChangeLimit) {
183+
_numRowsWithoutChangeLimitReached = true;
184+
}
185+
}
186+
}
187+
}
188+
189+
private boolean shouldStopProcessing() {
190+
return _rowsRemaining <= 0 || _numRowsWithoutChangeLimitReached;
181191
}
182192
}

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,27 @@ default int getRemainingRowsToProcess() {
4444
return Integer.MAX_VALUE;
4545
}
4646

47+
/**
48+
* Sets the maximum number of rows to scan without adding any new distinct value before early-terminating.
49+
*/
50+
default void setNumRowsWithoutChangeInDistinct(int numRowsWithoutChangeInDistinct) {
51+
}
52+
53+
/**
54+
* Returns {@code true} if the executor has early-terminated because no new distinct values were found after scanning
55+
* the configured number of rows.
56+
*/
57+
default boolean isNumRowsWithoutChangeLimitReached() {
58+
return false;
59+
}
60+
61+
/**
62+
* Returns the total number of rows processed so far.
63+
*/
64+
default int getNumRowsProcessed() {
65+
return 0;
66+
}
67+
4768
/**
4869
* Processes the given value block, returns {@code true} if the query is already satisfied, {@code false}
4970
* otherwise. No more calls should be made after it returns {@code true}.

0 commit comments

Comments
 (0)