Skip to content

Commit c901ebf

Browse files
committed
Add distinct early termination options
1 parent 8bfb907 commit c901ebf

File tree

18 files changed

+1250
-88
lines changed

18 files changed

+1250
-88
lines changed

pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,12 @@ enum MetadataKey {
151151
// Needed so that we can track workload name in Netty channel response.
152152
WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
153153
// Needed so that we can track query id in Netty channel response.
154-
QUERY_ID(41, "queryId", MetadataValueType.STRING);
154+
QUERY_ID(41, "queryId", MetadataValueType.STRING),
155+
EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING);
155156

156157
// We keep this constant to track the max id added so far for backward compatibility.
157158
// Increase it when adding new keys, but NEVER DECREASE IT!!!
158-
private static final int MAX_ID = QUERY_ID.getId();
159+
private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
159160

160161
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
161162
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@
4444
*/
4545
@JsonPropertyOrder({
4646
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
47-
"numGroupsWarningLimitReached", "timeUsedMs", "requestId", "clientRequestId", "brokerId", "numDocsScanned",
48-
"totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
49-
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
50-
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
51-
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
52-
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
53-
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
47+
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached", "timeUsedMs",
48+
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
49+
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
50+
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
51+
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
52+
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
53+
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
54+
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
5455
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
5556
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
5657
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
@@ -74,6 +75,8 @@ public class BrokerResponseNative implements BrokerResponse {
7475
private boolean _groupsTrimmed = false;
7576
private boolean _numGroupsLimitReached = false;
7677
private boolean _numGroupsWarningLimitReached = false;
78+
private boolean _maxRowsInDistinctReached = false;
79+
private boolean _numRowsWithoutChangeInDistinctReached = false;
7780
private long _timeUsedMs = 0L;
7881
private String _requestId;
7982
private String _clientRequestId;
@@ -189,7 +192,8 @@ public void setNumRowsResultSet(int numRowsResultSet) {
189192
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
190193
@Override
191194
public boolean isPartialResult() {
192-
return getExceptionsSize() > 0 || isNumGroupsLimitReached();
195+
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInDistinctReached()
196+
|| isNumRowsWithoutChangeInDistinctReached();
193197
}
194198

195199
@Override
@@ -232,6 +236,22 @@ public void setNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReached
232236
_numGroupsWarningLimitReached = numGroupsWarningLimitReached;
233237
}
234238

239+
public boolean isMaxRowsInDistinctReached() {
240+
return _maxRowsInDistinctReached;
241+
}
242+
243+
public void setMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
244+
_maxRowsInDistinctReached = maxRowsInDistinctReached;
245+
}
246+
247+
public boolean isNumRowsWithoutChangeInDistinctReached() {
248+
return _numRowsWithoutChangeInDistinctReached;
249+
}
250+
251+
public void setNumRowsWithoutChangeInDistinctReached(boolean numRowsWithoutChangeInDistinctReached) {
252+
_numRowsWithoutChangeInDistinctReached = numRowsWithoutChangeInDistinctReached;
253+
}
254+
235255
@JsonIgnore
236256
@Override
237257
public boolean isMaxRowsInJoinReached() {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@
3838
*/
3939
@JsonPropertyOrder({
4040
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
41-
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "timeUsedMs", "stageStats",
42-
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs",
43-
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
44-
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
45-
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
46-
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
47-
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
48-
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
41+
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "maxRowsInDistinctReached",
42+
"numRowsWithoutChangeInDistinctReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId",
43+
"clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
44+
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
45+
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
46+
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
47+
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
48+
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
49+
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
4950
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
5051
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
5152
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
@@ -109,7 +110,8 @@ public int getNumRowsResultSet() {
109110
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
110111
@Override
111112
public boolean isPartialResult() {
112-
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached();
113+
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached()
114+
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached();
113115
}
114116

115117
@Override
@@ -170,6 +172,24 @@ public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
170172
_maxRowsInWindowReached |= maxRowsInWindowReached;
171173
}
172174

175+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
176+
public boolean isMaxRowsInDistinctReached() {
177+
return _brokerStats.getBoolean(StatKey.MAX_ROWS_IN_DISTINCT_REACHED);
178+
}
179+
180+
public void mergeMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
181+
_brokerStats.merge(StatKey.MAX_ROWS_IN_DISTINCT_REACHED, maxRowsInDistinctReached);
182+
}
183+
184+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
185+
public boolean isNumRowsWithoutChangeInDistinctReached() {
186+
return _brokerStats.getBoolean(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED);
187+
}
188+
189+
public void mergeNumRowsWithoutChangeInDistinctReached(boolean reached) {
190+
_brokerStats.merge(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED, reached);
191+
}
192+
173193
/**
174194
* Returns the stage statistics.
175195
*/
@@ -453,7 +473,9 @@ public long merge(long value1, long value2) {
453473
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
454474
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
455475
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
456-
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
476+
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
477+
MAX_ROWS_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
478+
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
457479

458480
private final StatMap.Type _type;
459481

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,19 @@ public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
414414
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin);
415415
}
416416

417+
@Nullable
418+
public static Integer getMaxRowsInDistinct(Map<String, String> queryOptions) {
419+
String maxRowsInDistinct = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_DISTINCT);
420+
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_DISTINCT, maxRowsInDistinct);
421+
}
422+
423+
@Nullable
424+
public static Integer getNumRowsWithoutChangeInDistinct(Map<String, String> queryOptions) {
425+
String numRowsWithoutChange =
426+
queryOptions.get(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT);
427+
return checkedParseIntPositive(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT, numRowsWithoutChange);
428+
}
429+
417430
@Nullable
418431
public static JoinOverFlowMode getJoinOverflowMode(Map<String, String> queryOptions) {
419432
String joinOverflowModeStr = queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);

pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@
3737
* The {@code BaseResultsBlock} class is the holder of the server side results.
3838
*/
3939
public abstract class BaseResultsBlock implements Block {
40+
public enum EarlyTerminationReason {
41+
NONE,
42+
DISTINCT_MAX_ROWS,
43+
DISTINCT_NO_NEW_VALUES
44+
}
45+
4046
private List<QueryErrorMessage> _processingExceptions;
4147
private long _numTotalDocs;
4248
private long _numDocsScanned;
@@ -49,6 +55,7 @@ public abstract class BaseResultsBlock implements Block {
4955
private long _executionThreadCpuTimeNs;
5056
private long _executionThreadMemAllocatedBytes;
5157
private int _numServerThreads;
58+
private EarlyTerminationReason _earlyTerminationReason = EarlyTerminationReason.NONE;
5259

5360
@Nullable
5461
public List<QueryErrorMessage> getErrorMessages() {
@@ -163,6 +170,14 @@ public void setNumServerThreads(int numServerThreads) {
163170
_numServerThreads = numServerThreads;
164171
}
165172

173+
public EarlyTerminationReason getEarlyTerminationReason() {
174+
return _earlyTerminationReason;
175+
}
176+
177+
public void setEarlyTerminationReason(EarlyTerminationReason earlyTerminationReason) {
178+
_earlyTerminationReason = earlyTerminationReason;
179+
}
180+
166181
/**
167182
* Returns the total size (number of rows) in this result block, without having to materialize the rows.
168183
*
@@ -208,6 +223,9 @@ public Map<String, String> getResultsMetadata() {
208223
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
209224
Integer.toString(_numConsumingSegmentsProcessed));
210225
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), Integer.toString(_numConsumingSegmentsMatched));
226+
if (_earlyTerminationReason != EarlyTerminationReason.NONE) {
227+
metadata.put(MetadataKey.EARLY_TERMINATION_REASON.getName(), _earlyTerminationReason.name());
228+
}
211229
return metadata;
212230
}
213231
}

0 commit comments

Comments
 (0)