Skip to content

Commit 7b038c8

Browse files
committed
Add distinct early termination budgets and time limit support
1 parent e6138c8 commit 7b038c8

File tree

23 files changed

+510
-120
lines changed

23 files changed

+510
-120
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
519519
SqlNodeAndOptions sqlNodeAndOptions;
520520
try {
521521
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
522+
if (sqlRequestJson.has(Request.QUERY_OPTIONS)) {
523+
sqlNodeAndOptions.setExtraOptions(
524+
RequestUtils.getOptionsFromString(sqlRequestJson.get(Request.QUERY_OPTIONS).asText()));
525+
}
522526
} catch (Exception e) {
523527
return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage());
524528
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Collection;
2828
import java.util.HashSet;
2929
import java.util.List;
30+
import java.util.HashMap;
3031
import java.util.Locale;
3132
import java.util.Map;
3233
import java.util.Set;
@@ -555,7 +556,7 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
555556
QueryDispatcher.QueryResult queryResults;
556557
try {
557558
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, timer.getRemainingTimeMs(),
558-
query.getOptions());
559+
mergeQueryOptions(query));
559560
} catch (QueryException e) {
560561
throw e;
561562
} catch (Throwable t) {
@@ -651,6 +652,16 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
651652
}
652653
}
653654

655+
/**
656+
* Uses both the parsed SQL options and the (potentially mutated) compiled query options to ensure we
657+
* propagate everything (including queryOptions from the client) to the servers.
658+
*/
659+
private static Map<String, String> mergeQueryOptions(QueryEnvironment.CompiledQuery query) {
660+
Map<String, String> merged = new HashMap<>(query.getSqlNodeAndOptions().getOptions());
661+
merged.putAll(query.getOptions());
662+
return merged;
663+
}
664+
654665
private static void throwTableAccessError(TableAuthorizationResult tableAuthorizationResult) {
655666
String failureMessage = tableAuthorizationResult.getFailureMessage();
656667
if (StringUtils.isNotBlank(failureMessage)) {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
*/
4545
@JsonPropertyOrder({
4646
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
47-
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached", "timeUsedMs",
47+
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached",
48+
"timeLimitInDistinctReached", "timeUsedMs",
4849
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
4950
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
5051
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
@@ -77,6 +78,7 @@ public class BrokerResponseNative implements BrokerResponse {
7778
private boolean _numGroupsWarningLimitReached = false;
7879
private boolean _maxRowsInDistinctReached = false;
7980
private boolean _numRowsWithoutChangeInDistinctReached = false;
81+
private boolean _timeLimitInDistinctReached = false;
8082
private long _timeUsedMs = 0L;
8183
private String _requestId;
8284
private String _clientRequestId;
@@ -193,7 +195,7 @@ public void setNumRowsResultSet(int numRowsResultSet) {
193195
@Override
194196
public boolean isPartialResult() {
195197
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInDistinctReached()
196-
|| isNumRowsWithoutChangeInDistinctReached();
198+
|| isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
197199
}
198200

199201
@Override
@@ -252,6 +254,14 @@ public void setNumRowsWithoutChangeInDistinctReached(boolean numRowsWithoutChang
252254
_numRowsWithoutChangeInDistinctReached = numRowsWithoutChangeInDistinctReached;
253255
}
254256

257+
public boolean isTimeLimitInDistinctReached() {
258+
return _timeLimitInDistinctReached;
259+
}
260+
261+
public void setTimeLimitInDistinctReached(boolean timeLimitInDistinctReached) {
262+
_timeLimitInDistinctReached = timeLimitInDistinctReached;
263+
}
264+
255265
@JsonIgnore
256266
@Override
257267
public boolean isMaxRowsInJoinReached() {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
@JsonPropertyOrder({
4040
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
4141
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "maxRowsInDistinctReached",
42-
"numRowsWithoutChangeInDistinctReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId",
42+
"numRowsWithoutChangeInDistinctReached", "timeLimitInDistinctReached", "timeUsedMs", "stageStats",
43+
"maxRowsInOperator", "requestId",
4344
"clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
4445
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
4546
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
@@ -111,7 +112,7 @@ public int getNumRowsResultSet() {
111112
@Override
112113
public boolean isPartialResult() {
113114
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached()
114-
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached();
115+
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
115116
}
116117

117118
@Override
@@ -190,6 +191,15 @@ public void mergeNumRowsWithoutChangeInDistinctReached(boolean reached) {
190191
_brokerStats.merge(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED, reached);
191192
}
192193

194+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
195+
public boolean isTimeLimitInDistinctReached() {
196+
return _brokerStats.getBoolean(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED);
197+
}
198+
199+
public void mergeTimeLimitInDistinctReached(boolean timeLimitReached) {
200+
_brokerStats.merge(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED, timeLimitReached);
201+
}
202+
193203
/**
194204
* Returns the stage statistics.
195205
*/
@@ -475,7 +485,8 @@ public long merge(long value1, long value2) {
475485
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
476486
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
477487
MAX_ROWS_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
478-
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
488+
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
489+
TIME_LIMIT_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
479490

480491
private final StatMap.Type _type;
481492

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String, String> queryOptions) {
117117
return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, extraPassiveTimeoutMsString, 0);
118118
}
119119

120+
@Nullable
121+
public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> queryOptions) {
122+
String maxExecutionTimeMs = queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
123+
return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, maxExecutionTimeMs, 0);
124+
}
125+
120126
@Nullable
121127
public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) {
122128
String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);

pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public abstract class BaseResultsBlock implements Block {
4040
public enum EarlyTerminationReason {
4141
NONE,
4242
DISTINCT_MAX_ROWS,
43-
DISTINCT_NO_NEW_VALUES
43+
DISTINCT_NO_NEW_VALUES,
44+
TIME_LIMIT
4445
}
4546

4647
private List<QueryErrorMessage> _processingExceptions;

pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.function.LongSupplier;
2527
import java.util.stream.Collectors;
2628
import org.apache.pinot.common.request.context.ExpressionContext;
2729
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -55,22 +57,35 @@ public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
5557
private int _numRowsWithoutNewDistinct = 0;
5658
private boolean _hitMaxRowsLimit = false;
5759
private boolean _hitNoChangeLimit = false;
60+
private final long _maxExecutionTimeNs;
61+
private boolean _hitTimeLimit = false;
62+
private final LongSupplier _timeSupplier;
5863

5964
public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
6065
BaseProjectOperator<?> projectOperator) {
66+
this(indexSegment, queryContext, projectOperator, System::nanoTime);
67+
}
68+
69+
DistinctOperator(IndexSegment indexSegment, QueryContext queryContext, BaseProjectOperator<?> projectOperator,
70+
LongSupplier timeSupplier) {
6171
_indexSegment = indexSegment;
6272
_queryContext = queryContext;
6373
_projectOperator = projectOperator;
74+
_timeSupplier = timeSupplier;
6475
Map<String, String> queryOptions = queryContext.getQueryOptions();
6576
if (queryOptions != null) {
6677
Integer maxRowsInDistinct = QueryOptionsUtils.getMaxRowsInDistinct(queryOptions);
6778
_maxRowsInDistinct = maxRowsInDistinct != null ? maxRowsInDistinct : UNLIMITED_ROWS;
6879
Integer numRowsWithoutChange = QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryOptions);
6980
_numRowsWithoutChangeInDistinct =
7081
numRowsWithoutChange != null ? numRowsWithoutChange : UNLIMITED_ROWS;
82+
Long maxExecutionTimeMs = QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryOptions);
83+
_maxExecutionTimeNs =
84+
maxExecutionTimeMs != null ? TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs) : Long.MAX_VALUE;
7185
} else {
7286
_maxRowsInDistinct = UNLIMITED_ROWS;
7387
_numRowsWithoutChangeInDistinct = UNLIMITED_ROWS;
88+
_maxExecutionTimeNs = Long.MAX_VALUE;
7489
}
7590
}
7691

@@ -82,7 +97,18 @@ protected DistinctResultsBlock getNextBlock() {
8297
ValueBlock valueBlock;
8398
boolean enforceRowLimit = _maxRowsInDistinct != UNLIMITED_ROWS;
8499
boolean enforceNoChangeLimit = _numRowsWithoutChangeInDistinct != UNLIMITED_ROWS;
100+
boolean enforceTimeLimit = _maxExecutionTimeNs != Long.MAX_VALUE;
101+
final long startTimeNs = _timeSupplier.getAsLong();
85102
while ((valueBlock = _projectOperator.nextBlock()) != null) {
103+
if (enforceTimeLimit) {
104+
long elapsed = _timeSupplier.getAsLong() - startTimeNs;
105+
long remaining = _maxExecutionTimeNs - elapsed;
106+
executor.setRemainingTimeNanos(remaining);
107+
}
108+
if (enforceTimeLimit && hasExceededTimeLimit(startTimeNs)) {
109+
_hitTimeLimit = true;
110+
break;
111+
}
86112
if (enforceRowLimit && executor.getRemainingRowsToProcess() <= 0) {
87113
_hitMaxRowsLimit = true;
88114
break;
@@ -117,10 +143,16 @@ protected DistinctResultsBlock getNextBlock() {
117143
if (_hitMaxRowsLimit || _hitNoChangeLimit || satisfied) {
118144
break;
119145
}
146+
if (enforceTimeLimit && hasExceededTimeLimit(startTimeNs)) {
147+
_hitTimeLimit = true;
148+
break;
149+
}
120150
}
121151
DistinctResultsBlock resultsBlock = new DistinctResultsBlock(executor.getResult(), _queryContext);
122152
resultsBlock.setNumDocsScanned(_numDocsScanned);
123-
if (_hitMaxRowsLimit) {
153+
if (_hitTimeLimit) {
154+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
155+
} else if (_hitMaxRowsLimit) {
124156
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
125157
} else if (_hitNoChangeLimit) {
126158
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
@@ -176,4 +208,8 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
176208
.collect(Collectors.toList());
177209
attributeBuilder.putStringList("keyColumns", expressions);
178210
}
211+
212+
private boolean hasExceededTimeLimit(long startTimeNs) {
213+
return _timeSupplier.getAsLong() - startTimeNs >= _maxExecutionTimeNs;
214+
}
179215
}

pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,9 @@
2929
* Base implementation of {@link DistinctExecutor} for single column.
3030
*/
3131
public abstract class BaseSingleColumnDistinctExecutor<T extends DistinctTable, S, M> implements DistinctExecutor {
32-
private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
33-
3432
protected final ExpressionContext _expression;
3533
protected final T _distinctTable;
36-
private int _rowsRemaining = UNLIMITED_ROWS;
37-
private int _numRowsProcessed = 0;
38-
private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
39-
private int _numRowsWithoutChange = 0;
40-
private boolean _numRowsWithoutChangeLimitReached = false;
34+
private final DistinctEarlyTerminationContext _earlyTerminationContext = new DistinctEarlyTerminationContext();
4135

4236
public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinctTable) {
4337
_expression = expression;
@@ -46,22 +40,27 @@ public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T distinct
4640

4741
@Override
4842
public void setMaxRowsToProcess(int maxRows) {
49-
_rowsRemaining = maxRows;
43+
_earlyTerminationContext.setMaxRowsToProcess(maxRows);
5044
}
5145

5246
@Override
5347
public void setNumRowsWithoutChangeInDistinct(int numRowsWithoutChangeInDistinct) {
54-
_numRowsWithoutChangeLimit = numRowsWithoutChangeInDistinct;
48+
_earlyTerminationContext.setNumRowsWithoutChangeInDistinct(numRowsWithoutChangeInDistinct);
49+
}
50+
51+
@Override
52+
public void setRemainingTimeNanos(long remainingTimeNanos) {
53+
_earlyTerminationContext.setRemainingTimeNanos(remainingTimeNanos);
5554
}
5655

5756
@Override
5857
public boolean isNumRowsWithoutChangeLimitReached() {
59-
return _numRowsWithoutChangeLimitReached;
58+
return _earlyTerminationContext.isNumRowsWithoutChangeLimitReached();
6059
}
6160

6261
@Override
6362
public int getNumRowsProcessed() {
64-
return _numRowsProcessed;
63+
return _earlyTerminationContext.getNumRowsProcessed();
6564
}
6665

6766
@Override
@@ -156,37 +155,18 @@ public int getNumDistinctRowsCollected() {
156155

157156
@Override
158157
public int getRemainingRowsToProcess() {
159-
return _rowsRemaining;
158+
return _earlyTerminationContext.getRemainingRowsToProcess();
160159
}
161160

162161
private int clampToRemaining(int numDocs) {
163-
if (_rowsRemaining == UNLIMITED_ROWS) {
164-
return numDocs;
165-
}
166-
if (_rowsRemaining <= 0) {
167-
return 0;
168-
}
169-
return Math.min(numDocs, _rowsRemaining);
162+
return _earlyTerminationContext.clampToRemaining(numDocs);
170163
}
171164

172165
private void recordRowProcessed(boolean distinctChanged) {
173-
_numRowsProcessed++;
174-
if (_rowsRemaining != UNLIMITED_ROWS) {
175-
_rowsRemaining--;
176-
}
177-
if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
178-
if (distinctChanged) {
179-
_numRowsWithoutChange = 0;
180-
} else {
181-
_numRowsWithoutChange++;
182-
if (_numRowsWithoutChange >= _numRowsWithoutChangeLimit) {
183-
_numRowsWithoutChangeLimitReached = true;
184-
}
185-
}
186-
}
166+
_earlyTerminationContext.recordRowProcessed(distinctChanged);
187167
}
188168

189169
private boolean shouldStopProcessing() {
190-
return _rowsRemaining <= 0 || _numRowsWithoutChangeLimitReached;
170+
return _earlyTerminationContext.shouldStopProcessing();
191171
}
192172
}

0 commit comments

Comments
 (0)