2222import org .apache .pinot .core .common .BlockValSet ;
2323import org .apache .pinot .core .operator .blocks .ValueBlock ;
2424import org .apache .pinot .core .query .distinct .table .DistinctTable ;
25- import org .roaringbitmap .PeekableIntIterator ;
2625import org .roaringbitmap .RoaringBitmap ;
2726
2827
@@ -35,6 +34,10 @@ public abstract class BaseSingleColumnDistinctExecutor<T extends DistinctTable,
3534 protected final ExpressionContext _expression ;
3635 protected final T _distinctTable ;
3736 private int _rowsRemaining = UNLIMITED_ROWS ;
37+ private int _numRowsProcessed = 0 ;
38+ private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS ;
39+ private int _numRowsWithoutChange = 0 ;
40+ private boolean _numRowsWithoutChangeLimitReached = false ;
3841
3942 public BaseSingleColumnDistinctExecutor (ExpressionContext expression , T distinctTable ) {
4043 _expression = expression ;
@@ -47,86 +50,78 @@ public void setMaxRowsToProcess(int maxRows) {
4750 }
4851
4952 @ Override
50- public boolean process (ValueBlock valueBlock ) {
51- if (_rowsRemaining <= 0 ) {
52- return true ;
53- }
54- BlockValSet blockValueSet = valueBlock .getBlockValueSet (_expression );
55- int numDocs = valueBlock .getNumDocs ();
56- if (_distinctTable .isNullHandlingEnabled () && blockValueSet .isSingleValue ()) {
57- RoaringBitmap nullBitmap = blockValueSet .getNullBitmap ();
58- if (nullBitmap != null && !nullBitmap .isEmpty ()) {
59- return processWithNull (blockValueSet , numDocs , nullBitmap );
60- } else {
61- return processWithoutNull (blockValueSet , numDocs );
62- }
63- } else {
64- return processWithoutNull (blockValueSet , numDocs );
65- }
53+ public void setNumRowsWithoutChangeInDistinct (int numRowsWithoutChangeInDistinct ) {
54+ _numRowsWithoutChangeLimit = numRowsWithoutChangeInDistinct ;
6655 }
6756
68- private boolean processWithNull (BlockValSet blockValueSet , int numDocs , RoaringBitmap nullBitmap ) {
69- int limitedNumDocs = clampToRemaining (0 , numDocs );
70- if (limitedNumDocs <= 0 ) {
71- return true ;
72- }
73- _distinctTable .addNull ();
74- S values = getValuesSV (blockValueSet );
75- PeekableIntIterator nullIterator = nullBitmap .getIntIterator ();
76- int prev = 0 ;
77- while (nullIterator .hasNext () && prev < limitedNumDocs ) {
78- int nextNull = nullIterator .next ();
79- if (nextNull > prev ) {
80- int rangeEnd = Math .min (nextNull , limitedNumDocs );
81- if (processSVRange (values , prev , rangeEnd )) {
82- return true ;
83- }
84- }
85- prev = nextNull + 1 ;
86- }
87- if (prev < limitedNumDocs ) {
88- return processSVRange (values , prev , limitedNumDocs );
89- }
90- return false ;
57+ @ Override
58+ public boolean isNumRowsWithoutChangeLimitReached () {
59+ return _numRowsWithoutChangeLimitReached ;
9160 }
9261
93- /**
94- * Processes a range of single-value values, respecting the row budget.
95- * @param values the single-value values
96- * @param from the start index (inclusive)
97- * @param to the end index (exclusive)
98- * @return true if processing should stop early, false otherwise
99- */
100- private boolean processSVRange (S values , int from , int to ) {
101- int limitedTo = clampToRemaining (from , to );
102- if (limitedTo <= from ) {
62+ @ Override
63+ public int getNumRowsProcessed () {
64+ return _numRowsProcessed ;
65+ }
66+
67+ @ Override
68+ public boolean process (ValueBlock valueBlock ) {
69+ if (shouldStopProcessing ()) {
10370 return true ;
10471 }
105- if (processSV (values , from , limitedTo )) {
72+ BlockValSet blockValueSet = valueBlock .getBlockValueSet (_expression );
73+ int numDocs = clampToRemaining (valueBlock .getNumDocs ());
74+ if (numDocs <= 0 ) {
10675 return true ;
10776 }
108- consumeRows (limitedTo - from );
109- return _rowsRemaining <= 0 ;
110- }
111-
112- private boolean processWithoutNull (BlockValSet blockValueSet , int numDocs ) {
113- if (blockValueSet .isSingleValue ()) {
114- int limitedTo = clampToRemaining (0 , numDocs );
115- if (limitedTo <= 0 ) {
116- return true ;
77+ boolean limitReached = false ;
78+ if (_distinctTable .isNullHandlingEnabled () && blockValueSet .isSingleValue ()) {
79+ RoaringBitmap nullBitmap = blockValueSet .getNullBitmap ();
80+ S values = getValuesSV (blockValueSet );
81+ for (int docId = 0 ; docId < numDocs ; docId ++) {
82+ if (shouldStopProcessing ()) {
83+ break ;
84+ }
85+ boolean isNull = nullBitmap != null && nullBitmap .contains (docId );
86+ int sizeBefore = _distinctTable .size ();
87+ if (isNull ) {
88+ _distinctTable .addNull ();
89+ } else {
90+ limitReached = processSV (values , docId , docId + 1 );
91+ }
92+ recordRowProcessed (_distinctTable .size () > sizeBefore );
93+ if (limitReached ) {
94+ break ;
95+ }
96+ }
97+ } else if (blockValueSet .isSingleValue ()) {
98+ S values = getValuesSV (blockValueSet );
99+ for (int docId = 0 ; docId < numDocs ; docId ++) {
100+ if (shouldStopProcessing ()) {
101+ break ;
102+ }
103+ int sizeBefore = _distinctTable .size ();
104+ limitReached = processSV (values , docId , docId + 1 );
105+ recordRowProcessed (_distinctTable .size () > sizeBefore );
106+ if (limitReached ) {
107+ break ;
108+ }
117109 }
118- boolean satisfied = processSV (getValuesSV (blockValueSet ), 0 , limitedTo );
119- consumeRows (limitedTo );
120- return satisfied || _rowsRemaining <= 0 ;
121110 } else {
122- int limitedTo = clampToRemaining (0 , numDocs );
123- if (limitedTo <= 0 ) {
124- return true ;
111+ M values = getValuesMV (blockValueSet );
112+ for (int docId = 0 ; docId < numDocs ; docId ++) {
113+ if (shouldStopProcessing ()) {
114+ break ;
115+ }
116+ int sizeBefore = _distinctTable .size ();
117+ limitReached = processMV (values , docId , docId + 1 );
118+ recordRowProcessed (_distinctTable .size () > sizeBefore );
119+ if (limitReached ) {
120+ break ;
121+ }
125122 }
126- boolean satisfied = processMV (getValuesMV (blockValueSet ), 0 , limitedTo );
127- consumeRows (limitedTo );
128- return satisfied || _rowsRemaining <= 0 ;
129123 }
124+ return limitReached || shouldStopProcessing ();
130125 }
131126
132127 /**
@@ -164,19 +159,34 @@ public int getRemainingRowsToProcess() {
164159 return _rowsRemaining ;
165160 }
166161
167- private int clampToRemaining (int from , int to ) {
162+ private int clampToRemaining (int numDocs ) {
168163 if (_rowsRemaining == UNLIMITED_ROWS ) {
169- return to ;
164+ return numDocs ;
170165 }
171166 if (_rowsRemaining <= 0 ) {
172- return from ;
167+ return 0 ;
173168 }
174- return Math .min (to , from + _rowsRemaining );
169+ return Math .min (numDocs , _rowsRemaining );
175170 }
176171
177- private void consumeRows (int count ) {
172+ private void recordRowProcessed (boolean distinctChanged ) {
173+ _numRowsProcessed ++;
178174 if (_rowsRemaining != UNLIMITED_ROWS ) {
179- _rowsRemaining -= count ;
175+ _rowsRemaining -- ;
180176 }
177+ if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS ) {
178+ if (distinctChanged ) {
179+ _numRowsWithoutChange = 0 ;
180+ } else {
181+ _numRowsWithoutChange ++;
182+ if (_numRowsWithoutChange >= _numRowsWithoutChangeLimit ) {
183+ _numRowsWithoutChangeLimitReached = true ;
184+ }
185+ }
186+ }
187+ }
188+
189+ private boolean shouldStopProcessing () {
190+ return _rowsRemaining <= 0 || _numRowsWithoutChangeLimitReached ;
181191 }
182192}
0 commit comments