2020
2121import java .util .Collections ;
2222import java .util .List ;
23- import java .util .concurrent .TimeUnit ;
2423import javax .annotation .Nullable ;
2524import org .apache .pinot .common .request .context .OrderByExpressionContext ;
2625import org .apache .pinot .common .utils .DataSchema ;
2726import org .apache .pinot .common .utils .DataSchema .ColumnDataType ;
28- import org .apache .pinot .common .utils .config .QueryOptionsUtils ;
2927import org .apache .pinot .core .common .Operator ;
3028import org .apache .pinot .core .operator .BaseOperator ;
3129import org .apache .pinot .core .operator .ExecutionStatistics ;
32- import org .apache .pinot .core .operator .blocks .results .BaseResultsBlock ;
3330import org .apache .pinot .core .operator .blocks .results .DistinctResultsBlock ;
3431import org .apache .pinot .core .query .distinct .table .BigDecimalDistinctTable ;
3532import org .apache .pinot .core .query .distinct .table .BytesDistinctTable ;
5148 */
5249public class DictionaryBasedDistinctOperator extends BaseOperator <DistinctResultsBlock > {
5350 private static final String EXPLAIN_NAME = "DISTINCT_DICTIONARY" ;
54- private static final int TIME_CHECK_INTERVAL = 1024 ;
55- private static final int TIME_CHECK_MASK = TIME_CHECK_INTERVAL - 1 ;
5651
5752 private final DataSource _dataSource ;
5853 private final QueryContext _queryContext ;
59- private final long _maxExecutionTimeNs ;
6054
6155 private int _numDocsScanned ;
62- private long _startTimeNs ;
63- private boolean _hitTimeLimit ;
6456
6557 public DictionaryBasedDistinctOperator (DataSource dataSource , QueryContext queryContext ) {
6658 _dataSource = dataSource ;
6759 _queryContext = queryContext ;
68- Long maxExecutionTimeMs = QueryOptionsUtils .getMaxExecutionTimeMsInDistinct (queryContext .getQueryOptions ());
69- _maxExecutionTimeNs =
70- maxExecutionTimeMs != null ? TimeUnit .MILLISECONDS .toNanos (maxExecutionTimeMs ) : Long .MAX_VALUE ;
7160 }
7261
7362 @ Override
7463 protected DistinctResultsBlock getNextBlock () {
75- _startTimeNs = System .nanoTime ();
76- _hitTimeLimit = false ;
7764 String column = _queryContext .getSelectExpressions ().get (0 ).getIdentifier ();
7865 Dictionary dictionary = _dataSource .getDictionary ();
7966 assert dictionary != null ;
@@ -113,9 +100,6 @@ protected DistinctResultsBlock getNextBlock() {
113100 }
114101 DistinctResultsBlock resultsBlock = new DistinctResultsBlock (distinctTable , _queryContext );
115102 resultsBlock .setNumDocsScanned (_numDocsScanned );
116- if (_hitTimeLimit ) {
117- resultsBlock .setEarlyTerminationReason (BaseResultsBlock .EarlyTerminationReason .DISTINCT_TIME_LIMIT );
118- }
119103 return resultsBlock ;
120104 }
121105
@@ -129,9 +113,6 @@ private IntDistinctTable createIntDistinctTable(DataSchema dataSchema, Dictionar
129113 int rowsProcessed = 0 ;
130114 if (orderByExpression == null ) {
131115 for (int i = 0 ; i < numValuesToKeep ; i ++) {
132- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
133- break ;
134- }
135116 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
136117 distinctTable .addUnbounded (dictionary .getIntValue (i ));
137118 rowsProcessed ++;
@@ -140,28 +121,19 @@ private IntDistinctTable createIntDistinctTable(DataSchema dataSchema, Dictionar
140121 if (dictionary .isSorted ()) {
141122 if (orderByExpression .isAsc ()) {
142123 for (int i = 0 ; i < numValuesToKeep ; i ++) {
143- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
144- break ;
145- }
146124 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
147125 distinctTable .addUnbounded (dictionary .getIntValue (i ));
148126 rowsProcessed ++;
149127 }
150128 } else {
151129 for (int i = 0 ; i < numValuesToKeep ; i ++) {
152- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
153- break ;
154- }
155130 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
156131 distinctTable .addUnbounded (dictionary .getIntValue (dictLength - 1 - i ));
157132 rowsProcessed ++;
158133 }
159134 }
160135 } else {
161136 for (int i = 0 ; i < dictLength ; i ++) {
162- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
163- break ;
164- }
165137 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
166138 distinctTable .addWithOrderBy (dictionary .getIntValue (i ));
167139 rowsProcessed ++;
@@ -182,9 +154,6 @@ private LongDistinctTable createLongDistinctTable(DataSchema dataSchema, Diction
182154 int rowsProcessed = 0 ;
183155 if (orderByExpression == null ) {
184156 for (int i = 0 ; i < numValuesToKeep ; i ++) {
185- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
186- break ;
187- }
188157 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
189158 distinctTable .addUnbounded (dictionary .getLongValue (i ));
190159 rowsProcessed ++;
@@ -193,28 +162,19 @@ private LongDistinctTable createLongDistinctTable(DataSchema dataSchema, Diction
193162 if (dictionary .isSorted ()) {
194163 if (orderByExpression .isAsc ()) {
195164 for (int i = 0 ; i < numValuesToKeep ; i ++) {
196- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
197- break ;
198- }
199165 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
200166 distinctTable .addUnbounded (dictionary .getLongValue (i ));
201167 rowsProcessed ++;
202168 }
203169 } else {
204170 for (int i = 0 ; i < numValuesToKeep ; i ++) {
205- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
206- break ;
207- }
208171 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
209172 distinctTable .addUnbounded (dictionary .getLongValue (dictLength - 1 - i ));
210173 rowsProcessed ++;
211174 }
212175 }
213176 } else {
214177 for (int i = 0 ; i < dictLength ; i ++) {
215- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
216- break ;
217- }
218178 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
219179 distinctTable .addWithOrderBy (dictionary .getLongValue (i ));
220180 rowsProcessed ++;
@@ -235,9 +195,6 @@ private FloatDistinctTable createFloatDistinctTable(DataSchema dataSchema, Dicti
235195 int rowsProcessed = 0 ;
236196 if (orderByExpression == null ) {
237197 for (int i = 0 ; i < numValuesToKeep ; i ++) {
238- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
239- break ;
240- }
241198 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
242199 distinctTable .addUnbounded (dictionary .getFloatValue (i ));
243200 rowsProcessed ++;
@@ -246,28 +203,19 @@ private FloatDistinctTable createFloatDistinctTable(DataSchema dataSchema, Dicti
246203 if (dictionary .isSorted ()) {
247204 if (orderByExpression .isAsc ()) {
248205 for (int i = 0 ; i < numValuesToKeep ; i ++) {
249- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
250- break ;
251- }
252206 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
253207 distinctTable .addUnbounded (dictionary .getFloatValue (i ));
254208 rowsProcessed ++;
255209 }
256210 } else {
257211 for (int i = 0 ; i < numValuesToKeep ; i ++) {
258- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
259- break ;
260- }
261212 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
262213 distinctTable .addUnbounded (dictionary .getFloatValue (dictLength - 1 - i ));
263214 rowsProcessed ++;
264215 }
265216 }
266217 } else {
267218 for (int i = 0 ; i < dictLength ; i ++) {
268- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
269- break ;
270- }
271219 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
272220 distinctTable .addWithOrderBy (dictionary .getFloatValue (i ));
273221 rowsProcessed ++;
@@ -288,9 +236,6 @@ private DoubleDistinctTable createDoubleDistinctTable(DataSchema dataSchema, Dic
288236 int rowsProcessed = 0 ;
289237 if (orderByExpression == null ) {
290238 for (int i = 0 ; i < numValuesToKeep ; i ++) {
291- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
292- break ;
293- }
294239 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
295240 distinctTable .addUnbounded (dictionary .getDoubleValue (i ));
296241 rowsProcessed ++;
@@ -299,28 +244,19 @@ private DoubleDistinctTable createDoubleDistinctTable(DataSchema dataSchema, Dic
299244 if (dictionary .isSorted ()) {
300245 if (orderByExpression .isAsc ()) {
301246 for (int i = 0 ; i < numValuesToKeep ; i ++) {
302- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
303- break ;
304- }
305247 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
306248 distinctTable .addUnbounded (dictionary .getDoubleValue (i ));
307249 rowsProcessed ++;
308250 }
309251 } else {
310252 for (int i = 0 ; i < numValuesToKeep ; i ++) {
311- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
312- break ;
313- }
314253 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
315254 distinctTable .addUnbounded (dictionary .getDoubleValue (dictLength - 1 - i ));
316255 rowsProcessed ++;
317256 }
318257 }
319258 } else {
320259 for (int i = 0 ; i < dictLength ; i ++) {
321- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
322- break ;
323- }
324260 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
325261 distinctTable .addWithOrderBy (dictionary .getDoubleValue (i ));
326262 rowsProcessed ++;
@@ -341,9 +277,6 @@ private BigDecimalDistinctTable createBigDecimalDistinctTable(DataSchema dataSch
341277 int rowsProcessed = 0 ;
342278 if (orderByExpression == null ) {
343279 for (int i = 0 ; i < numValuesToKeep ; i ++) {
344- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
345- break ;
346- }
347280 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
348281 distinctTable .addUnbounded (dictionary .getBigDecimalValue (i ));
349282 rowsProcessed ++;
@@ -352,28 +285,19 @@ private BigDecimalDistinctTable createBigDecimalDistinctTable(DataSchema dataSch
352285 if (dictionary .isSorted ()) {
353286 if (orderByExpression .isAsc ()) {
354287 for (int i = 0 ; i < numValuesToKeep ; i ++) {
355- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
356- break ;
357- }
358288 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
359289 distinctTable .addUnbounded (dictionary .getBigDecimalValue (i ));
360290 rowsProcessed ++;
361291 }
362292 } else {
363293 for (int i = 0 ; i < numValuesToKeep ; i ++) {
364- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
365- break ;
366- }
367294 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
368295 distinctTable .addUnbounded (dictionary .getBigDecimalValue (dictLength - 1 - i ));
369296 rowsProcessed ++;
370297 }
371298 }
372299 } else {
373300 for (int i = 0 ; i < dictLength ; i ++) {
374- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
375- break ;
376- }
377301 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
378302 distinctTable .addWithOrderBy (dictionary .getBigDecimalValue (i ));
379303 rowsProcessed ++;
@@ -394,9 +318,6 @@ private StringDistinctTable createStringDistinctTable(DataSchema dataSchema, Dic
394318 int rowsProcessed = 0 ;
395319 if (orderByExpression == null ) {
396320 for (int i = 0 ; i < numValuesToKeep ; i ++) {
397- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
398- break ;
399- }
400321 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
401322 distinctTable .addUnbounded (dictionary .getStringValue (i ));
402323 rowsProcessed ++;
@@ -405,28 +326,19 @@ private StringDistinctTable createStringDistinctTable(DataSchema dataSchema, Dic
405326 if (dictionary .isSorted ()) {
406327 if (orderByExpression .isAsc ()) {
407328 for (int i = 0 ; i < numValuesToKeep ; i ++) {
408- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
409- break ;
410- }
411329 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
412330 distinctTable .addUnbounded (dictionary .getStringValue (i ));
413331 rowsProcessed ++;
414332 }
415333 } else {
416334 for (int i = 0 ; i < numValuesToKeep ; i ++) {
417- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
418- break ;
419- }
420335 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
421336 distinctTable .addUnbounded (dictionary .getStringValue (dictLength - 1 - i ));
422337 rowsProcessed ++;
423338 }
424339 }
425340 } else {
426341 for (int i = 0 ; i < dictLength ; i ++) {
427- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
428- break ;
429- }
430342 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
431343 distinctTable .addWithOrderBy (dictionary .getStringValue (i ));
432344 rowsProcessed ++;
@@ -447,9 +359,6 @@ private BytesDistinctTable createBytesDistinctTable(DataSchema dataSchema, Dicti
447359 int rowsProcessed = 0 ;
448360 if (orderByExpression == null ) {
449361 for (int i = 0 ; i < numValuesToKeep ; i ++) {
450- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
451- break ;
452- }
453362 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
454363 distinctTable .addUnbounded (dictionary .getByteArrayValue (i ));
455364 rowsProcessed ++;
@@ -458,28 +367,19 @@ private BytesDistinctTable createBytesDistinctTable(DataSchema dataSchema, Dicti
458367 if (dictionary .isSorted ()) {
459368 if (orderByExpression .isAsc ()) {
460369 for (int i = 0 ; i < numValuesToKeep ; i ++) {
461- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
462- break ;
463- }
464370 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
465371 distinctTable .addUnbounded (dictionary .getByteArrayValue (i ));
466372 rowsProcessed ++;
467373 }
468374 } else {
469375 for (int i = 0 ; i < numValuesToKeep ; i ++) {
470- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
471- break ;
472- }
473376 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
474377 distinctTable .addUnbounded (dictionary .getByteArrayValue (dictLength - 1 - i ));
475378 rowsProcessed ++;
476379 }
477380 }
478381 } else {
479382 for (int i = 0 ; i < dictLength ; i ++) {
480- if ((i & TIME_CHECK_MASK ) == 0 && hasExceededTimeLimit ()) {
481- break ;
482- }
483383 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (i , EXPLAIN_NAME );
484384 distinctTable .addWithOrderBy (dictionary .getByteArrayValue (i ));
485385 rowsProcessed ++;
@@ -490,17 +390,6 @@ private BytesDistinctTable createBytesDistinctTable(DataSchema dataSchema, Dicti
490390 return distinctTable ;
491391 }
492392
493- private boolean hasExceededTimeLimit () {
494- if (_maxExecutionTimeNs == Long .MAX_VALUE ) {
495- return false ;
496- }
497- if (System .nanoTime () - _startTimeNs >= _maxExecutionTimeNs ) {
498- _hitTimeLimit = true ;
499- return true ;
500- }
501- return false ;
502- }
503-
504393 @ Override
505394 public String toExplainString () {
506395 return EXPLAIN_NAME ;
0 commit comments