Skip to content

Commit cd3d015

Browse files
committed
Add distinct early termination budgets and time limit support
1 parent 426799b commit cd3d015

File tree

18 files changed

+427
-112
lines changed

18 files changed

+427
-112
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
556556
SqlNodeAndOptions sqlNodeAndOptions;
557557
try {
558558
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
559+
if (sqlRequestJson.has(Request.QUERY_OPTIONS)) {
560+
sqlNodeAndOptions.setExtraOptions(
561+
RequestUtils.getOptionsFromString(sqlRequestJson.get(Request.QUERY_OPTIONS).asText()));
562+
}
559563
} catch (Exception e) {
560564
return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage());
561565
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
2727
import java.util.Collection;
28+
import java.util.HashMap;
2829
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Locale;
@@ -584,7 +585,7 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
584585
QueryDispatcher.QueryResult queryResults;
585586
try {
586587
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, timer.getRemainingTimeMs(),
587-
query.getOptions());
588+
mergeQueryOptions(query));
588589
} catch (QueryException e) {
589590
throw e;
590591
} catch (Throwable t) {
@@ -680,6 +681,16 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
680681
}
681682
}
682683

684+
/**
685+
* Uses both the parsed SQL options and the (potentially mutated) compiled query options to ensure we
686+
* propagate everything (including queryOptions from the client) to the servers.
687+
*/
688+
private static Map<String, String> mergeQueryOptions(QueryEnvironment.CompiledQuery query) {
689+
Map<String, String> merged = new HashMap<>(query.getSqlNodeAndOptions().getOptions());
690+
merged.putAll(query.getOptions());
691+
return merged;
692+
}
693+
683694
private static void throwTableAccessError(TableAuthorizationResult tableAuthorizationResult) {
684695
String failureMessage = tableAuthorizationResult.getFailureMessage();
685696
if (StringUtils.isNotBlank(failureMessage)) {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
*/
4545
@JsonPropertyOrder({
4646
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
47-
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached", "timeUsedMs",
47+
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached",
48+
"timeLimitInDistinctReached", "timeUsedMs",
4849
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
4950
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
5051
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
@@ -77,6 +78,7 @@ public class BrokerResponseNative implements BrokerResponse {
7778
private boolean _numGroupsWarningLimitReached = false;
7879
private boolean _maxRowsInDistinctReached = false;
7980
private boolean _numRowsWithoutChangeInDistinctReached = false;
81+
private boolean _timeLimitInDistinctReached = false;
8082
private long _timeUsedMs = 0L;
8183
private String _requestId;
8284
private String _clientRequestId;
@@ -193,7 +195,7 @@ public void setNumRowsResultSet(int numRowsResultSet) {
193195
@Override
194196
public boolean isPartialResult() {
195197
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInDistinctReached()
196-
|| isNumRowsWithoutChangeInDistinctReached();
198+
|| isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
197199
}
198200

199201
@Override
@@ -252,6 +254,14 @@ public void setNumRowsWithoutChangeInDistinctReached(boolean numRowsWithoutChang
252254
_numRowsWithoutChangeInDistinctReached = numRowsWithoutChangeInDistinctReached;
253255
}
254256

257+
public boolean isTimeLimitInDistinctReached() {
258+
return _timeLimitInDistinctReached;
259+
}
260+
261+
public void setTimeLimitInDistinctReached(boolean timeLimitInDistinctReached) {
262+
_timeLimitInDistinctReached = timeLimitInDistinctReached;
263+
}
264+
255265
@JsonIgnore
256266
@Override
257267
public boolean isMaxRowsInJoinReached() {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
@JsonPropertyOrder({
4040
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
4141
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "maxRowsInDistinctReached",
42-
"numRowsWithoutChangeInDistinctReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId",
42+
"numRowsWithoutChangeInDistinctReached", "timeLimitInDistinctReached", "timeUsedMs", "stageStats",
43+
"maxRowsInOperator", "requestId",
4344
"clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
4445
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
4546
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
@@ -111,7 +112,7 @@ public int getNumRowsResultSet() {
111112
@Override
112113
public boolean isPartialResult() {
113114
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached()
114-
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached();
115+
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
115116
}
116117

117118
@Override
@@ -190,6 +191,15 @@ public void mergeNumRowsWithoutChangeInDistinctReached(boolean reached) {
190191
_brokerStats.merge(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED, reached);
191192
}
192193

194+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
195+
public boolean isTimeLimitInDistinctReached() {
196+
return _brokerStats.getBoolean(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED);
197+
}
198+
199+
public void mergeTimeLimitInDistinctReached(boolean timeLimitReached) {
200+
_brokerStats.merge(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED, timeLimitReached);
201+
}
202+
193203
/**
194204
* Returns the stage statistics.
195205
*/
@@ -475,7 +485,8 @@ public long merge(long value1, long value2) {
475485
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
476486
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
477487
MAX_ROWS_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
478-
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
488+
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
489+
TIME_LIMIT_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
479490

480491
private final StatMap.Type _type;
481492

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String, String> queryOptions) {
117117
return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, extraPassiveTimeoutMsString, 0);
118118
}
119119

120+
@Nullable
121+
public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> queryOptions) {
122+
String maxExecutionTimeMs = queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
123+
return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, maxExecutionTimeMs, 0);
124+
}
125+
120126
@Nullable
121127
public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) {
122128
String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);

pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public abstract class BaseResultsBlock implements Block {
4040
public enum EarlyTerminationReason {
4141
NONE,
4242
DISTINCT_MAX_ROWS,
43-
DISTINCT_NO_NEW_VALUES
43+
DISTINCT_NO_NEW_VALUES,
44+
TIME_LIMIT
4445
}
4546

4647
private List<QueryErrorMessage> _processingExceptions;

pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.function.LongSupplier;
2527
import java.util.stream.Collectors;
2628
import org.apache.pinot.common.request.context.ExpressionContext;
2729
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -55,34 +57,59 @@ public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
5557
private int _numRowsWithoutNewDistinct = 0;
5658
private boolean _hitMaxRowsLimit = false;
5759
private boolean _hitNoChangeLimit = false;
60+
private final long _maxExecutionTimeNs;
61+
private boolean _hitTimeLimit = false;
62+
private final LongSupplier _timeSupplier;
5863

5964
public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
6065
BaseProjectOperator<?> projectOperator) {
66+
this(indexSegment, queryContext, projectOperator, System::nanoTime);
67+
}
68+
69+
DistinctOperator(IndexSegment indexSegment, QueryContext queryContext, BaseProjectOperator<?> projectOperator,
70+
LongSupplier timeSupplier) {
6171
_indexSegment = indexSegment;
6272
_queryContext = queryContext;
6373
_projectOperator = projectOperator;
74+
_timeSupplier = timeSupplier;
6475
Map<String, String> queryOptions = queryContext.getQueryOptions();
6576
if (queryOptions != null) {
6677
Integer maxRowsInDistinct = QueryOptionsUtils.getMaxRowsInDistinct(queryOptions);
6778
_maxRowsInDistinct = maxRowsInDistinct != null ? maxRowsInDistinct : UNLIMITED_ROWS;
6879
Integer numRowsWithoutChange = QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryOptions);
6980
_numRowsWithoutChangeInDistinct =
7081
numRowsWithoutChange != null ? numRowsWithoutChange : UNLIMITED_ROWS;
82+
Long maxExecutionTimeMs = QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryOptions);
83+
_maxExecutionTimeNs =
84+
maxExecutionTimeMs != null ? TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs) : Long.MAX_VALUE;
7185
} else {
7286
_maxRowsInDistinct = UNLIMITED_ROWS;
7387
_numRowsWithoutChangeInDistinct = UNLIMITED_ROWS;
88+
_maxExecutionTimeNs = Long.MAX_VALUE;
7489
}
7590
}
7691

7792
@Override
7893
protected DistinctResultsBlock getNextBlock() {
7994
DistinctExecutor executor = DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext);
95+
executor.setTimeSupplier(_timeSupplier);
8096
executor.setMaxRowsToProcess(_maxRowsInDistinct);
8197
executor.setNumRowsWithoutChangeInDistinct(_numRowsWithoutChangeInDistinct);
8298
ValueBlock valueBlock;
8399
boolean enforceRowLimit = _maxRowsInDistinct != UNLIMITED_ROWS;
84100
boolean enforceNoChangeLimit = _numRowsWithoutChangeInDistinct != UNLIMITED_ROWS;
101+
boolean enforceTimeLimit = _maxExecutionTimeNs != Long.MAX_VALUE;
102+
final long startTimeNs = _timeSupplier.getAsLong();
85103
while ((valueBlock = _projectOperator.nextBlock()) != null) {
104+
if (enforceTimeLimit) {
105+
long elapsed = _timeSupplier.getAsLong() - startTimeNs;
106+
long remaining = _maxExecutionTimeNs - elapsed;
107+
executor.setRemainingTimeNanos(remaining);
108+
}
109+
if (enforceTimeLimit && hasExceededTimeLimit(startTimeNs)) {
110+
_hitTimeLimit = true;
111+
break;
112+
}
86113
if (enforceRowLimit && executor.getRemainingRowsToProcess() <= 0) {
87114
_hitMaxRowsLimit = true;
88115
break;
@@ -114,13 +141,18 @@ protected DistinctResultsBlock getNextBlock() {
114141
}
115142
}
116143
}
117-
if (_hitMaxRowsLimit || _hitNoChangeLimit || satisfied) {
144+
if (enforceTimeLimit && hasExceededTimeLimit(startTimeNs)) {
145+
_hitTimeLimit = true;
146+
}
147+
if (_hitTimeLimit || _hitMaxRowsLimit || _hitNoChangeLimit || satisfied) {
118148
break;
119149
}
120150
}
121151
DistinctResultsBlock resultsBlock = new DistinctResultsBlock(executor.getResult(), _queryContext);
122152
resultsBlock.setNumDocsScanned(_numDocsScanned);
123-
if (_hitMaxRowsLimit) {
153+
if (_hitTimeLimit) {
154+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
155+
} else if (_hitMaxRowsLimit) {
124156
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
125157
} else if (_hitNoChangeLimit) {
126158
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
@@ -176,4 +208,8 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
176208
.collect(Collectors.toList());
177209
attributeBuilder.putStringList("keyColumns", expressions);
178210
}
211+
212+
private boolean hasExceededTimeLimit(long startTimeNs) {
213+
return _timeSupplier.getAsLong() - startTimeNs >= _maxExecutionTimeNs;
214+
}
179215
}

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.core.query.distinct;
2020

21+
import java.util.function.LongSupplier;
2122
import org.apache.pinot.common.request.context.ExpressionContext;
2223
import org.apache.pinot.core.common.BlockValSet;
2324
import org.apache.pinot.core.operator.blocks.ValueBlock;
@@ -29,15 +30,9 @@
2930
* Base implementation of {@link DistinctExecutor} for single column.
3031
*/
3132
public abstract class BaseSingleColumnDistinctExecutor<T extends DistinctTable, S, M> implements DistinctExecutor {
32-
private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
33-
3433
protected final ExpressionContext _expression;
3534
protected final T _distinctTable;
36-
private int _rowsRemaining = UNLIMITED_ROWS;
37-
private int _numRowsProcessed = 0;
38-
private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
39-
private int _numRowsWithoutChange = 0;
40-
private boolean _numRowsWithoutChangeLimitReached = false;
35+
private final DistinctEarlyTerminationContext _earlyTerminationContext = new DistinctEarlyTerminationContext();
4136

4237
public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinctTable) {
4338
_expression = expression;
@@ -46,22 +41,32 @@ public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinct
4641

4742
@Override
4843
public void setMaxRowsToProcess(int maxRows) {
49-
_rowsRemaining = maxRows;
44+
_earlyTerminationContext.setMaxRowsToProcess(maxRows);
5045
}
5146

5247
@Override
5348
public void setNumRowsWithoutChangeInDistinct(int numRowsWithoutChangeInDistinct) {
54-
_numRowsWithoutChangeLimit = numRowsWithoutChangeInDistinct;
49+
_earlyTerminationContext.setNumRowsWithoutChangeInDistinct(numRowsWithoutChangeInDistinct);
50+
}
51+
52+
@Override
53+
public void setTimeSupplier(LongSupplier timeSupplier) {
54+
_earlyTerminationContext.setTimeSupplier(timeSupplier);
55+
}
56+
57+
@Override
58+
public void setRemainingTimeNanos(long remainingTimeNanos) {
59+
_earlyTerminationContext.setRemainingTimeNanos(remainingTimeNanos);
5560
}
5661

5762
@Override
5863
public boolean isNumRowsWithoutChangeLimitReached() {
59-
return _numRowsWithoutChangeLimitReached;
64+
return _earlyTerminationContext.isNumRowsWithoutChangeLimitReached();
6065
}
6166

6267
@Override
6368
public int getNumRowsProcessed() {
64-
return _numRowsProcessed;
69+
return _earlyTerminationContext.getNumRowsProcessed();
6570
}
6671

6772
@Override
@@ -156,37 +161,18 @@ public int getNumDistinctRowsCollected() {
156161

157162
@Override
158163
public int getRemainingRowsToProcess() {
159-
return _rowsRemaining;
164+
return _earlyTerminationContext.getRemainingRowsToProcess();
160165
}
161166

162167
private int clampToRemaining(int numDocs) {
163-
if (_rowsRemaining == UNLIMITED_ROWS) {
164-
return numDocs;
165-
}
166-
if (_rowsRemaining <= 0) {
167-
return 0;
168-
}
169-
return Math.min(numDocs, _rowsRemaining);
168+
return _earlyTerminationContext.clampToRemaining(numDocs);
170169
}
171170

172171
private void recordRowProcessed(boolean distinctChanged) {
173-
_numRowsProcessed++;
174-
if (_rowsRemaining != UNLIMITED_ROWS) {
175-
_rowsRemaining--;
176-
}
177-
if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
178-
if (distinctChanged) {
179-
_numRowsWithoutChange = 0;
180-
} else {
181-
_numRowsWithoutChange++;
182-
if (_numRowsWithoutChange >= _numRowsWithoutChangeLimit) {
183-
_numRowsWithoutChangeLimitReached = true;
184-
}
185-
}
186-
}
172+
_earlyTerminationContext.recordRowProcessed(distinctChanged);
187173
}
188174

189175
private boolean shouldStopProcessing() {
190-
return _rowsRemaining <= 0 || _numRowsWithoutChangeLimitReached;
176+
return _earlyTerminationContext.shouldStopProcessing();
191177
}
192178
}

0 commit comments

Comments
 (0)