Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ enum MetadataKey {
// Needed so that we can track workload name in Netty channel response.
WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
// Needed so that we can track query id in Netty channel response.
QUERY_ID(41, "queryId", MetadataValueType.STRING);
QUERY_ID(41, "queryId", MetadataValueType.STRING),
EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING);

// We keep this constant to track the max id added so far for backward compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
private static final int MAX_ID = QUERY_ID.getId();
private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();

private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
"numGroupsWarningLimitReached", "timeUsedMs", "requestId", "clientRequestId", "brokerId", "numDocsScanned",
"totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached",
"timeLimitInDistinctReached", "timeUsedMs",
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
Expand All @@ -74,6 +76,9 @@ public class BrokerResponseNative implements BrokerResponse {
private boolean _groupsTrimmed = false;
private boolean _numGroupsLimitReached = false;
private boolean _numGroupsWarningLimitReached = false;
private boolean _maxRowsInDistinctReached = false;
private boolean _numRowsWithoutChangeInDistinctReached = false;
private boolean _timeLimitInDistinctReached = false;
private long _timeUsedMs = 0L;
private String _requestId;
private String _clientRequestId;
Expand Down Expand Up @@ -189,7 +194,8 @@ public void setNumRowsResultSet(int numRowsResultSet) {
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@Override
public boolean isPartialResult() {
return getExceptionsSize() > 0 || isNumGroupsLimitReached();
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInDistinctReached()
|| isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
}

@Override
Expand Down Expand Up @@ -232,6 +238,30 @@ public void setNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReached
_numGroupsWarningLimitReached = numGroupsWarningLimitReached;
}

public boolean isMaxRowsInDistinctReached() {
return _maxRowsInDistinctReached;
}

public void setMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
_maxRowsInDistinctReached = maxRowsInDistinctReached;
}

public boolean isNumRowsWithoutChangeInDistinctReached() {
return _numRowsWithoutChangeInDistinctReached;
}

public void setNumRowsWithoutChangeInDistinctReached(boolean numRowsWithoutChangeInDistinctReached) {
_numRowsWithoutChangeInDistinctReached = numRowsWithoutChangeInDistinctReached;
}

public boolean isTimeLimitInDistinctReached() {
return _timeLimitInDistinctReached;
}

public void setTimeLimitInDistinctReached(boolean timeLimitInDistinctReached) {
_timeLimitInDistinctReached = timeLimitInDistinctReached;
}

@JsonIgnore
@Override
public boolean isMaxRowsInJoinReached() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "timeUsedMs", "stageStats",
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "maxRowsInDistinctReached",
"numRowsWithoutChangeInDistinctReached", "timeLimitInDistinctReached", "timeUsedMs", "stageStats",
"maxRowsInOperator", "requestId",
"clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
Expand Down Expand Up @@ -109,7 +111,8 @@ public int getNumRowsResultSet() {
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@Override
public boolean isPartialResult() {
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached();
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached()
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
}

@Override
Expand Down Expand Up @@ -170,6 +173,33 @@ public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
_maxRowsInWindowReached |= maxRowsInWindowReached;
}

@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public boolean isMaxRowsInDistinctReached() {
return _brokerStats.getBoolean(StatKey.MAX_ROWS_IN_DISTINCT_REACHED);
}

public void mergeMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
_brokerStats.merge(StatKey.MAX_ROWS_IN_DISTINCT_REACHED, maxRowsInDistinctReached);
}

@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public boolean isNumRowsWithoutChangeInDistinctReached() {
return _brokerStats.getBoolean(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED);
}

public void mergeNumRowsWithoutChangeInDistinctReached(boolean reached) {
_brokerStats.merge(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED, reached);
}

@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public boolean isTimeLimitInDistinctReached() {
return _brokerStats.getBoolean(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED);
}

public void mergeTimeLimitInDistinctReached(boolean timeLimitReached) {
_brokerStats.merge(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED, timeLimitReached);
}

/**
* Returns the stage statistics.
*/
Expand Down Expand Up @@ -453,7 +483,10 @@ public long merge(long value1, long value2) {
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
MAX_ROWS_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
TIME_LIMIT_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);

private final StatMap.Type _type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String, String> queryOptions) {
return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, extraPassiveTimeoutMsString, 0);
}

@Nullable
public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> queryOptions) {
String maxExecutionTimeMs = queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, maxExecutionTimeMs, 0);
}

@Nullable
public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) {
String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
Expand Down Expand Up @@ -418,6 +424,19 @@ public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin);
}

@Nullable
public static Integer getMaxRowsInDistinct(Map<String, String> queryOptions) {
String maxRowsInDistinct = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_DISTINCT);
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_DISTINCT, maxRowsInDistinct);
}

@Nullable
public static Integer getNumRowsWithoutChangeInDistinct(Map<String, String> queryOptions) {
String numRowsWithoutChange =
queryOptions.get(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT);
return checkedParseIntPositive(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT, numRowsWithoutChange);
}

@Nullable
public static JoinOverFlowMode getJoinOverflowMode(Map<String, String> queryOptions) {
String joinOverflowModeStr = queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
Expand Down Expand Up @@ -61,6 +62,16 @@ public void testParseQuery() {
"SELECT `foo`\n" + "FROM `countries`\n" + "WHERE `bar` > 1");
}

@Test
public void testParseQueryOptionsFromJson()
throws Exception {
SqlNodeAndOptions result = RequestUtils.parseQuery("select foo from countries", JsonUtils.stringToJsonNode(
"{\"sql\":\"select foo from countries\","
+ "\"queryOptions\":\"maxRowsInDistinct=5;numRowsWithoutChangeInDistinct=10\"}"));
assertEquals(result.getOptions().get("maxRowsInDistinct"), "5");
assertEquals(result.getOptions().get("numRowsWithoutChangeInDistinct"), "10");
}

@DataProvider(name = "queryProvider")
public Object[][] queryProvider() {
return new Object[][] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
* The {@code BaseResultsBlock} class is the holder of the server side results.
*/
public abstract class BaseResultsBlock implements Block {
public enum EarlyTerminationReason {
NONE,
DISTINCT_MAX_ROWS,
DISTINCT_NO_NEW_VALUES,
DISTINCT_TIME_LIMIT
}

private List<QueryErrorMessage> _processingExceptions;
private long _numTotalDocs;
private long _numDocsScanned;
Expand All @@ -49,6 +56,7 @@ public abstract class BaseResultsBlock implements Block {
private long _executionThreadCpuTimeNs;
private long _executionThreadMemAllocatedBytes;
private int _numServerThreads;
private EarlyTerminationReason _earlyTerminationReason = EarlyTerminationReason.NONE;

@Nullable
public List<QueryErrorMessage> getErrorMessages() {
Expand Down Expand Up @@ -163,6 +171,14 @@ public void setNumServerThreads(int numServerThreads) {
_numServerThreads = numServerThreads;
}

public EarlyTerminationReason getEarlyTerminationReason() {
return _earlyTerminationReason;
}

public void setEarlyTerminationReason(EarlyTerminationReason earlyTerminationReason) {
_earlyTerminationReason = earlyTerminationReason;
}

/**
* Returns the total size (number of rows) in this result block, without having to materialize the rows.
*
Expand Down Expand Up @@ -208,6 +224,9 @@ public Map<String, String> getResultsMetadata() {
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
Integer.toString(_numConsumingSegmentsProcessed));
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), Integer.toString(_numConsumingSegmentsMatched));
if (_earlyTerminationReason != EarlyTerminationReason.NONE) {
metadata.put(MetadataKey.EARLY_TERMINATION_REASON.getName(), _earlyTerminationReason.name());
}
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DistinctCombineOperator extends BaseSingleBlockCombineOperator<Dist
private static final String EXPLAIN_NAME = "COMBINE_DISTINCT";

public DistinctCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
super(new DistinctResultsBlockMerger(), operators, queryContext, executorService);
super(new DistinctResultsBlockMerger(queryContext), operators, queryContext, executorService);
}

@Override
Expand Down
Loading
Loading