Skip to content

Commit a7214d2

Browse files
committed
Support early termination in combine operator
1 parent c901ebf commit a7214d2

File tree

13 files changed

+348
-59
lines changed

13 files changed

+348
-59
lines changed

pinot-common/src/test/java/org/apache/pinot/common/utils/request/RequestUtilsTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.calcite.sql.parser.SqlParserPos;
2525
import org.apache.pinot.common.request.Expression;
2626
import org.apache.pinot.common.request.ExpressionType;
27+
import org.apache.pinot.spi.utils.JsonUtils;
2728
import org.apache.pinot.sql.parsers.CalciteSqlParser;
2829
import org.apache.pinot.sql.parsers.PinotSqlType;
2930
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -61,6 +62,16 @@ public void testParseQuery() {
6162
"SELECT `foo`\n" + "FROM `countries`\n" + "WHERE `bar` > 1");
6263
}
6364

65+
@Test
66+
public void testParseQueryOptionsFromJson()
67+
throws Exception {
68+
SqlNodeAndOptions result = RequestUtils.parseQuery("select foo from countries", JsonUtils.stringToJsonNode(
69+
"{\"sql\":\"select foo from countries\","
70+
+ "\"queryOptions\":\"maxRowsInDistinct=5;numRowsWithoutChangeInDistinct=10\"}"));
71+
assertEquals(result.getOptions().get("maxRowsInDistinct"), "5");
72+
assertEquals(result.getOptions().get("numRowsWithoutChangeInDistinct"), "10");
73+
}
74+
6475
@DataProvider(name = "queryProvider")
6576
public Object[][] queryProvider() {
6677
return new Object[][] {

pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class DistinctCombineOperator extends BaseSingleBlockCombineOperator<Dist
3434
private static final String EXPLAIN_NAME = "COMBINE_DISTINCT";
3535

3636
public DistinctCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
37-
super(new DistinctResultsBlockMerger(), operators, queryContext, executorService);
37+
super(new DistinctResultsBlockMerger(queryContext), operators, queryContext, executorService);
3838
}
3939

4040
@Override

pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,53 @@
1818
*/
1919
package org.apache.pinot.core.operator.combine.merger;
2020

21+
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
22+
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
2123
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
24+
import org.apache.pinot.core.query.request.context.QueryContext;
2225

2326

2427
public class DistinctResultsBlockMerger implements ResultsBlockMerger<DistinctResultsBlock> {
28+
private final int _maxRowsAcrossSegments;
29+
private boolean _rowBudgetReached;
30+
31+
public DistinctResultsBlockMerger(QueryContext queryContext) {
32+
if (queryContext.getQueryOptions() != null) {
33+
Integer maxRows = QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
34+
_maxRowsAcrossSegments = maxRows != null ? maxRows : Integer.MAX_VALUE;
35+
} else {
36+
_maxRowsAcrossSegments = Integer.MAX_VALUE;
37+
}
38+
}
2539

2640
@Override
2741
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
42+
if (_rowBudgetReached) {
43+
return true;
44+
}
45+
if (_maxRowsAcrossSegments != Integer.MAX_VALUE
46+
&& resultsBlock.getDistinctTable().size() >= _maxRowsAcrossSegments) {
47+
if (resultsBlock.getEarlyTerminationReason() == BaseResultsBlock.EarlyTerminationReason.NONE) {
48+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
49+
}
50+
_rowBudgetReached = true;
51+
return true;
52+
}
2853
return resultsBlock.getDistinctTable().isSatisfied();
2954
}
3055

3156
@Override
3257
public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, DistinctResultsBlock blockToMerge) {
58+
if (_rowBudgetReached) {
59+
return;
60+
}
3361
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
62+
if (_maxRowsAcrossSegments != Integer.MAX_VALUE
63+
&& mergedBlock.getDistinctTable().size() >= _maxRowsAcrossSegments) {
64+
if (mergedBlock.getEarlyTerminationReason() == BaseResultsBlock.EarlyTerminationReason.NONE) {
65+
mergedBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
66+
}
67+
_rowBudgetReached = true;
68+
}
3469
}
3570
}

pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public DictionaryBasedDistinctOperator(DataSource dataSource, QueryContext query
6363
_queryContext = queryContext;
6464
if (queryContext.getQueryOptions() != null) {
6565
Integer maxRows = QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
66-
_maxRowsInDistinct = maxRows != null ? maxRows : Integer.MAX_VALUE;
66+
int limit = queryContext.getLimit();
67+
_maxRowsInDistinct = maxRows != null ? Math.min(limit, maxRows) : limit;
6768
} else {
68-
_maxRowsInDistinct = Integer.MAX_VALUE;
69+
_maxRowsInDistinct = queryContext.getLimit();
6970
}
7071
}
7172

pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,8 @@ protected DistinctResultsBlock getNextBlock() {
9090
int distinctCountBeforeBlock = enforceNoChangeLimit ? executor.getNumDistinctRowsCollected() : -1;
9191
boolean satisfied = executor.process(valueBlock);
9292
int rowsRemainingAfter = executor.getRemainingRowsToProcess();
93-
int docsProcessedForLimit;
94-
if (enforceRowLimit && rowsRemainingBefore != UNLIMITED_ROWS && rowsRemainingAfter != UNLIMITED_ROWS) {
95-
docsProcessedForLimit = Math.max(0, rowsRemainingBefore - rowsRemainingAfter);
96-
} else {
97-
docsProcessedForLimit = valueBlock.getNumDocs();
98-
}
93+
int docsProcessedForLimit =
94+
enforceRowLimit ? Math.max(0, rowsRemainingBefore - rowsRemainingAfter) : valueBlock.getNumDocs();
9995
_numDocsScanned += docsProcessedForLimit;
10096
if (enforceRowLimit && _numDocsScanned >= _maxRowsInDistinct) {
10197
_hitMaxRowsLimit = true;

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
* Base implementation of {@link DistinctExecutor} for single column.
3131
*/
3232
public abstract class BaseSingleColumnDistinctExecutor<T extends DistinctTable, S, M> implements DistinctExecutor {
33+
private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
34+
3335
protected final ExpressionContext _expression;
3436
protected final T _distinctTable;
35-
private int _rowsRemaining = Integer.MAX_VALUE;
37+
private int _rowsRemaining = UNLIMITED_ROWS;
3638

3739
public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinctTable) {
3840
_expression = expression;
@@ -64,21 +66,26 @@ public boolean process(ValueBlock valueBlock) {
6466
}
6567

6668
private boolean processWithNull(BlockValSet blockValueSet, int numDocs, RoaringBitmap nullBitmap) {
69+
int limitedNumDocs = clampToRemaining(0, numDocs);
70+
if (limitedNumDocs <= 0) {
71+
return true;
72+
}
6773
_distinctTable.addNull();
6874
S values = getValuesSV(blockValueSet);
6975
PeekableIntIterator nullIterator = nullBitmap.getIntIterator();
7076
int prev = 0;
71-
while (nullIterator.hasNext()) {
77+
while (nullIterator.hasNext() && prev < limitedNumDocs) {
7278
int nextNull = nullIterator.next();
7379
if (nextNull > prev) {
74-
if (processSVRange(values, prev, nextNull)) {
80+
int rangeEnd = Math.min(nextNull, limitedNumDocs);
81+
if (processSVRange(values, prev, rangeEnd)) {
7582
return true;
7683
}
7784
}
7885
prev = nextNull + 1;
7986
}
80-
if (prev < numDocs) {
81-
return processSVRange(values, prev, numDocs);
87+
if (prev < limitedNumDocs) {
88+
return processSVRange(values, prev, limitedNumDocs);
8289
}
8390
return false;
8491
}
@@ -158,7 +165,7 @@ public int getRemainingRowsToProcess() {
158165
}
159166

160167
private int clampToRemaining(int from, int to) {
161-
if (_rowsRemaining == Integer.MAX_VALUE) {
168+
if (_rowsRemaining == UNLIMITED_ROWS) {
162169
return to;
163170
}
164171
if (_rowsRemaining <= 0) {
@@ -168,7 +175,7 @@ private int clampToRemaining(int from, int to) {
168175
}
169176

170177
private void consumeRows(int count) {
171-
if (_rowsRemaining != Integer.MAX_VALUE) {
178+
if (_rowsRemaining != UNLIMITED_ROWS) {
172179
_rowsRemaining -= count;
173180
}
174181
}

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutorFactory.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,18 @@ private DistinctExecutorFactory() {
5555
public static DistinctExecutor getDistinctExecutor(BaseProjectOperator<?> projectOperator,
5656
QueryContext queryContext) {
5757
List<ExpressionContext> expressions = queryContext.getSelectExpressions();
58-
int limit = queryContext.getLimit();
58+
int queryLimit = queryContext.getLimit();
59+
// The distinct table capacity is independent from the early-termination row budgets enforced by DistinctOperator.
60+
// Keep a dedicated variable so it is clear we are only constraining how many unique rows we can store.
61+
int tableCapacity = queryLimit;
62+
if (queryContext.getQueryOptions() != null) {
63+
Integer maxRowsInDistinct =
64+
org.apache.pinot.common.utils.config.QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
65+
if (maxRowsInDistinct != null) {
66+
// Only retain enough space to hold what the server will be allowed to scan.
67+
tableCapacity = Math.min(queryLimit, maxRowsInDistinct);
68+
}
69+
}
5970
boolean nullHandlingEnabled = queryContext.isNullHandlingEnabled();
6071
List<OrderByExpressionContext> orderByExpressions = queryContext.getOrderByExpressions();
6172
int numExpressions = expressions.size();
@@ -76,25 +87,32 @@ public static DistinctExecutor getDistinctExecutor(BaseProjectOperator<?> projec
7687
// Note: Use raw value based when ordering is needed and dictionary is not sorted (consuming segments).
7788
if (dictionary != null && (orderByExpression == null || dictionary.isSorted())) {
7889
// Dictionary based
79-
return new DictionaryBasedSingleColumnDistinctExecutor(expression, dictionary, dataType, limit,
90+
return new DictionaryBasedSingleColumnDistinctExecutor(expression, dictionary, dataType, tableCapacity,
8091
nullHandlingEnabled, orderByExpression);
8192
} else {
8293
// Raw value based
8394
switch (dataType.getStoredType()) {
8495
case INT:
85-
return new IntDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
96+
return new IntDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
97+
orderByExpression);
8698
case LONG:
87-
return new LongDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
99+
return new LongDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
100+
orderByExpression);
88101
case FLOAT:
89-
return new FloatDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
102+
return new FloatDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
103+
orderByExpression);
90104
case DOUBLE:
91-
return new DoubleDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
105+
return new DoubleDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
106+
orderByExpression);
92107
case BIG_DECIMAL:
93-
return new BigDecimalDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
108+
return new BigDecimalDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
109+
orderByExpression);
94110
case STRING:
95-
return new StringDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
111+
return new StringDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
112+
orderByExpression);
96113
case BYTES:
97-
return new BytesDistinctExecutor(expression, dataType, limit, nullHandlingEnabled, orderByExpression);
114+
return new BytesDistinctExecutor(expression, dataType, tableCapacity, nullHandlingEnabled,
115+
orderByExpression);
98116
default:
99117
throw new IllegalStateException("Unsupported data type: " + dataType);
100118
}
@@ -138,11 +156,11 @@ public static DistinctExecutor getDistinctExecutor(BaseProjectOperator<?> projec
138156
if (dictionaryBased) {
139157
// Dictionary based
140158
return new DictionaryBasedMultiColumnDistinctExecutor(expressions, hasMVExpression, dataSchema, dictionaries,
141-
limit, nullHandlingEnabled, orderByExpressions);
159+
tableCapacity, nullHandlingEnabled, orderByExpressions);
142160
} else {
143161
// Raw value based
144-
return new RawMultiColumnDistinctExecutor(expressions, hasMVExpression, dataSchema, limit, nullHandlingEnabled,
145-
orderByExpressions);
162+
return new RawMultiColumnDistinctExecutor(expressions, hasMVExpression, dataSchema, tableCapacity,
163+
nullHandlingEnabled, orderByExpressions);
146164
}
147165
}
148166
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.core.operator.combine;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Set;
26+
import org.apache.pinot.common.datatable.DataTable;
27+
import org.apache.pinot.common.response.broker.ResultTable;
28+
import org.apache.pinot.common.utils.DataSchema;
29+
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
30+
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
31+
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
32+
import org.apache.pinot.core.operator.combine.merger.DistinctResultsBlockMerger;
33+
import org.apache.pinot.core.query.distinct.table.DistinctTable;
34+
import org.apache.pinot.core.query.request.context.QueryContext;
35+
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
36+
import org.testng.annotations.Test;
37+
38+
import static org.testng.Assert.assertEquals;
39+
import static org.testng.Assert.assertFalse;
40+
import static org.testng.Assert.assertTrue;
41+
42+
43+
public class DistinctResultsBlockMergerTest {
44+
45+
private static final DataSchema SCHEMA =
46+
new DataSchema(new String[]{"col"}, new ColumnDataType[]{ColumnDataType.INT});
47+
48+
@Test
49+
public void shouldRespectMaxRowsAcrossSegments() {
50+
QueryContext queryContext =
51+
QueryContextConverterUtils.getQueryContext("SET \"maxRowsInDistinct\"=1000; SELECT DISTINCT col FROM myTable");
52+
DistinctResultsBlockMerger merger = new DistinctResultsBlockMerger(queryContext);
53+
54+
DistinctResultsBlock merged = new DistinctResultsBlock(fakeTable(0, 800), queryContext);
55+
// First block under budget
56+
assertFalse(merger.isQuerySatisfied(merged));
57+
58+
// Merge second block that pushes us over the server-level budget (1600 rows total)
59+
DistinctResultsBlock blockToMerge = new DistinctResultsBlock(fakeTable(800, 800), queryContext);
60+
merger.mergeResultsBlocks(merged, blockToMerge);
61+
62+
assertEquals(merged.getEarlyTerminationReason(), BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
63+
assertTrue(merger.isQuerySatisfied(merged), "Combine should stop once server-level budget is reached");
64+
assertEquals(merged.getDistinctTable().size(), 1600);
65+
}
66+
67+
private static DistinctTable fakeTable(int startInclusive, int count) {
68+
Set<Integer> values = new HashSet<>();
69+
for (int i = 0; i < count; i++) {
70+
values.add(startInclusive + i);
71+
}
72+
return new FakeDistinctTable(values);
73+
}
74+
75+
/**
76+
* Minimal {@link DistinctTable} implementation backed by a set of integers.
77+
*/
78+
private static class FakeDistinctTable extends DistinctTable {
79+
private final Set<Integer> _values;
80+
81+
FakeDistinctTable(Set<Integer> values) {
82+
super(SCHEMA, Integer.MAX_VALUE, false);
83+
_values = values;
84+
}
85+
86+
@Override
87+
public boolean hasOrderBy() {
88+
return false;
89+
}
90+
91+
@Override
92+
public void mergeDistinctTable(DistinctTable distinctTable) {
93+
for (Object[] row : distinctTable.getRows()) {
94+
_values.add((Integer) row[0]);
95+
}
96+
}
97+
98+
@Override
99+
public boolean mergeDataTable(DataTable dataTable) {
100+
throw new UnsupportedOperationException();
101+
}
102+
103+
@Override
104+
public int size() {
105+
return _values.size();
106+
}
107+
108+
@Override
109+
public boolean isSatisfied() {
110+
return false;
111+
}
112+
113+
@Override
114+
public List<Object[]> getRows() {
115+
List<Object[]> rows = new ArrayList<>(_values.size());
116+
for (Integer v : _values) {
117+
rows.add(new Object[]{v});
118+
}
119+
return rows;
120+
}
121+
122+
@Override
123+
public DataTable toDataTable()
124+
throws IOException {
125+
throw new UnsupportedOperationException();
126+
}
127+
128+
@Override
129+
public ResultTable toResultTable() {
130+
throw new UnsupportedOperationException();
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)