Skip to content

Commit a641555

Browse files
committed
Add distinct early termination options
Support early termination in combine operator
1 parent 6c87d2e commit a641555

File tree

32 files changed

+2508
-146
lines changed

32 files changed

+2508
-146
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
557557
SqlNodeAndOptions sqlNodeAndOptions;
558558
try {
559559
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
560+
if (sqlRequestJson.has(Request.QUERY_OPTIONS)) {
561+
sqlNodeAndOptions.setExtraOptions(
562+
RequestUtils.getOptionsFromString(sqlRequestJson.get(Request.QUERY_OPTIONS).asText()));
563+
}
560564
} catch (Exception e) {
561565
return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage());
562566
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
590590
QueryDispatcher.QueryResult queryResults;
591591
try {
592592
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, timer.getRemainingTimeMs(),
593-
query.getOptions());
593+
mergeQueryOptions(query));
594594
} catch (QueryException e) {
595595
throw e;
596596
} catch (Throwable t) {
@@ -686,6 +686,14 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
686686
}
687687
}
688688

689+
/**
690+
* Uses both the parsed SQL options and the (potentially mutated) compiled query options to ensure we
691+
* propagate everything (including queryOptions from the client) to the servers.
692+
*/
693+
private static Map<String, String> mergeQueryOptions(QueryEnvironment.CompiledQuery query) {
694+
return QueryOptionsUtils.mergeQueryOptions(query.getSqlNodeAndOptions().getOptions(), query.getOptions());
695+
}
696+
689697
private static void throwTableAccessError(TableAuthorizationResult tableAuthorizationResult) {
690698
String failureMessage = tableAuthorizationResult.getFailureMessage();
691699
if (StringUtils.isNotBlank(failureMessage)) {

pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,12 @@ enum MetadataKey {
151151
// Needed so that we can track workload name in Netty channel response.
152152
WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
153153
// Needed so that we can track query id in Netty channel response.
154-
QUERY_ID(41, "queryId", MetadataValueType.STRING);
154+
QUERY_ID(41, "queryId", MetadataValueType.STRING),
155+
EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING);
155156

156157
// We keep this constant to track the max id added so far for backward compatibility.
157158
// Increase it when adding new keys, but NEVER DECREASE IT!!!
158-
private static final int MAX_ID = QUERY_ID.getId();
159+
private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
159160

160161
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
161162
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@
4444
*/
4545
@JsonPropertyOrder({
4646
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
47-
"numGroupsWarningLimitReached", "timeUsedMs", "requestId", "clientRequestId", "brokerId", "numDocsScanned",
48-
"totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
49-
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
50-
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
51-
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
52-
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
53-
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
47+
"numGroupsWarningLimitReached", "maxRowsInDistinctReached", "numRowsWithoutChangeInDistinctReached",
48+
"timeLimitInDistinctReached", "timeUsedMs",
49+
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
50+
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
51+
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
52+
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
53+
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
54+
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
55+
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
5456
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
5557
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
5658
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
@@ -74,6 +76,9 @@ public class BrokerResponseNative implements BrokerResponse {
7476
private boolean _groupsTrimmed = false;
7577
private boolean _numGroupsLimitReached = false;
7678
private boolean _numGroupsWarningLimitReached = false;
79+
private boolean _maxRowsInDistinctReached = false;
80+
private boolean _numRowsWithoutChangeInDistinctReached = false;
81+
private boolean _timeLimitInDistinctReached = false;
7782
private long _timeUsedMs = 0L;
7883
private String _requestId;
7984
private String _clientRequestId;
@@ -189,7 +194,8 @@ public void setNumRowsResultSet(int numRowsResultSet) {
189194
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
190195
@Override
191196
public boolean isPartialResult() {
192-
return getExceptionsSize() > 0 || isNumGroupsLimitReached();
197+
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInDistinctReached()
198+
|| isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
193199
}
194200

195201
@Override
@@ -232,6 +238,30 @@ public void setNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReached
232238
_numGroupsWarningLimitReached = numGroupsWarningLimitReached;
233239
}
234240

241+
public boolean isMaxRowsInDistinctReached() {
242+
return _maxRowsInDistinctReached;
243+
}
244+
245+
public void setMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
246+
_maxRowsInDistinctReached = maxRowsInDistinctReached;
247+
}
248+
249+
public boolean isNumRowsWithoutChangeInDistinctReached() {
250+
return _numRowsWithoutChangeInDistinctReached;
251+
}
252+
253+
public void setNumRowsWithoutChangeInDistinctReached(boolean numRowsWithoutChangeInDistinctReached) {
254+
_numRowsWithoutChangeInDistinctReached = numRowsWithoutChangeInDistinctReached;
255+
}
256+
257+
public boolean isTimeLimitInDistinctReached() {
258+
return _timeLimitInDistinctReached;
259+
}
260+
261+
public void setTimeLimitInDistinctReached(boolean timeLimitInDistinctReached) {
262+
_timeLimitInDistinctReached = timeLimitInDistinctReached;
263+
}
264+
235265
@JsonIgnore
236266
@Override
237267
public boolean isMaxRowsInJoinReached() {

pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@
3838
*/
3939
@JsonPropertyOrder({
4040
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
41-
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "timeUsedMs", "stageStats",
42-
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs",
43-
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
44-
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
45-
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
46-
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
47-
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
48-
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
41+
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "maxRowsInDistinctReached",
42+
"numRowsWithoutChangeInDistinctReached", "timeLimitInDistinctReached", "timeUsedMs", "stageStats",
43+
"maxRowsInOperator", "requestId",
44+
"clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
45+
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
46+
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
47+
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
48+
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
49+
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
50+
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
4951
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
5052
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
5153
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes",
@@ -109,7 +111,8 @@ public int getNumRowsResultSet() {
109111
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
110112
@Override
111113
public boolean isPartialResult() {
112-
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached();
114+
return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached()
115+
|| isMaxRowsInDistinctReached() || isNumRowsWithoutChangeInDistinctReached() || isTimeLimitInDistinctReached();
113116
}
114117

115118
@Override
@@ -170,6 +173,33 @@ public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
170173
_maxRowsInWindowReached |= maxRowsInWindowReached;
171174
}
172175

176+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
177+
public boolean isMaxRowsInDistinctReached() {
178+
return _brokerStats.getBoolean(StatKey.MAX_ROWS_IN_DISTINCT_REACHED);
179+
}
180+
181+
public void mergeMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
182+
_brokerStats.merge(StatKey.MAX_ROWS_IN_DISTINCT_REACHED, maxRowsInDistinctReached);
183+
}
184+
185+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
186+
public boolean isNumRowsWithoutChangeInDistinctReached() {
187+
return _brokerStats.getBoolean(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED);
188+
}
189+
190+
public void mergeNumRowsWithoutChangeInDistinctReached(boolean reached) {
191+
_brokerStats.merge(StatKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED, reached);
192+
}
193+
194+
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
195+
public boolean isTimeLimitInDistinctReached() {
196+
return _brokerStats.getBoolean(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED);
197+
}
198+
199+
public void mergeTimeLimitInDistinctReached(boolean timeLimitReached) {
200+
_brokerStats.merge(StatKey.TIME_LIMIT_IN_DISTINCT_REACHED, timeLimitReached);
201+
}
202+
173203
/**
174204
* Returns the stage statistics.
175205
*/
@@ -453,7 +483,10 @@ public long merge(long value1, long value2) {
453483
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
454484
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
455485
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
456-
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
486+
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
487+
MAX_ROWS_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
488+
NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN),
489+
TIME_LIMIT_IN_DISTINCT_REACHED(StatMap.Type.BOOLEAN);
457490

458491
private final StatMap.Type _type;
459492

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,26 @@ public static Map<String, String> resolveCaseInsensitiveOptions(Map<String, Stri
9797
return resolved;
9898
}
9999

100+
public static Map<String, String> mergeQueryOptions(@Nullable Map<String, String> baseOptions,
101+
@Nullable Map<String, String> overridingOptions) {
102+
Map<String, String> merged = new HashMap<>();
103+
if (baseOptions != null) {
104+
merged.putAll(baseOptions);
105+
}
106+
if (overridingOptions != null && !overridingOptions.isEmpty()) {
107+
merged.putAll(resolveCaseInsensitiveOptions(overridingOptions));
108+
}
109+
return merged;
110+
}
111+
112+
public static void mergeQueryOptionsIfAbsent(@Nullable Map<String, String> targetOptions,
113+
@Nullable Map<String, String> extraOptions) {
114+
if (targetOptions == null || extraOptions == null || extraOptions.isEmpty()) {
115+
return;
116+
}
117+
resolveCaseInsensitiveOptions(extraOptions).forEach(targetOptions::putIfAbsent);
118+
}
119+
100120
@Nullable
101121
public static String resolveCaseInsensitiveKey(Object property) {
102122
if (property instanceof String) {
@@ -117,6 +137,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String, String> queryOptions) {
117137
return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, extraPassiveTimeoutMsString, 0);
118138
}
119139

140+
@Nullable
141+
public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> queryOptions) {
142+
String maxExecutionTimeMs = queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
143+
return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, maxExecutionTimeMs, 0);
144+
}
145+
120146
@Nullable
121147
public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) {
122148
String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
@@ -418,6 +444,19 @@ public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
418444
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin);
419445
}
420446

447+
@Nullable
448+
public static Integer getMaxRowsInDistinct(Map<String, String> queryOptions) {
449+
String maxRowsInDistinct = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_DISTINCT);
450+
return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_DISTINCT, maxRowsInDistinct);
451+
}
452+
453+
@Nullable
454+
public static Integer getNumRowsWithoutChangeInDistinct(Map<String, String> queryOptions) {
455+
String numRowsWithoutChange =
456+
queryOptions.get(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT);
457+
return checkedParseIntPositive(QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT, numRowsWithoutChange);
458+
}
459+
421460
@Nullable
422461
public static JoinOverFlowMode getJoinOverflowMode(Map<String, String> queryOptions) {
423462
String joinOverflowModeStr = queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);

pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ public void setParseTimeNs(long parseTimeNs) {
5858
}
5959

6060
public void setExtraOptions(Map<String, String> extractOptionsMap) {
61-
for (Map.Entry<String, String> e : QueryOptionsUtils.resolveCaseInsensitiveOptions(extractOptionsMap).entrySet()) {
62-
_options.putIfAbsent(e.getKey(), e.getValue());
63-
}
61+
QueryOptionsUtils.mergeQueryOptionsIfAbsent(_options, extractOptionsMap);
6462
}
6563
}

pinot-common/src/test/java/org/apache/pinot/common/utils/request/RequestUtilsTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.calcite.sql.parser.SqlParserPos;
2525
import org.apache.pinot.common.request.Expression;
2626
import org.apache.pinot.common.request.ExpressionType;
27+
import org.apache.pinot.spi.utils.JsonUtils;
2728
import org.apache.pinot.sql.parsers.CalciteSqlParser;
2829
import org.apache.pinot.sql.parsers.PinotSqlType;
2930
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -61,6 +62,16 @@ public void testParseQuery() {
6162
"SELECT `foo`\n" + "FROM `countries`\n" + "WHERE `bar` > 1");
6263
}
6364

65+
@Test
66+
public void testParseQueryOptionsFromJson()
67+
throws Exception {
68+
SqlNodeAndOptions result = RequestUtils.parseQuery("select foo from countries", JsonUtils.stringToJsonNode(
69+
"{\"sql\":\"select foo from countries\","
70+
+ "\"queryOptions\":\"maxRowsInDistinct=5;numRowsWithoutChangeInDistinct=10\"}"));
71+
assertEquals(result.getOptions().get("maxRowsInDistinct"), "5");
72+
assertEquals(result.getOptions().get("numRowsWithoutChangeInDistinct"), "10");
73+
}
74+
6475
@DataProvider(name = "queryProvider")
6576
public Object[][] queryProvider() {
6677
return new Object[][] {

pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
* The {@code BaseResultsBlock} class is the holder of the server side results.
3838
*/
3939
public abstract class BaseResultsBlock implements Block {
40+
public enum EarlyTerminationReason {
41+
NONE,
42+
DISTINCT_MAX_ROWS,
43+
DISTINCT_NO_NEW_VALUES,
44+
TIME_LIMIT
45+
}
46+
4047
private List<QueryErrorMessage> _processingExceptions;
4148
private long _numTotalDocs;
4249
private long _numDocsScanned;
@@ -49,6 +56,7 @@ public abstract class BaseResultsBlock implements Block {
4956
private long _executionThreadCpuTimeNs;
5057
private long _executionThreadMemAllocatedBytes;
5158
private int _numServerThreads;
59+
private EarlyTerminationReason _earlyTerminationReason = EarlyTerminationReason.NONE;
5260

5361
@Nullable
5462
public List<QueryErrorMessage> getErrorMessages() {
@@ -163,6 +171,14 @@ public void setNumServerThreads(int numServerThreads) {
163171
_numServerThreads = numServerThreads;
164172
}
165173

174+
public EarlyTerminationReason getEarlyTerminationReason() {
175+
return _earlyTerminationReason;
176+
}
177+
178+
public void setEarlyTerminationReason(EarlyTerminationReason earlyTerminationReason) {
179+
_earlyTerminationReason = earlyTerminationReason;
180+
}
181+
166182
/**
167183
* Returns the total size (number of rows) in this result block, without having to materialize the rows.
168184
*
@@ -208,6 +224,9 @@ public Map<String, String> getResultsMetadata() {
208224
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
209225
Integer.toString(_numConsumingSegmentsProcessed));
210226
metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), Integer.toString(_numConsumingSegmentsMatched));
227+
if (_earlyTerminationReason != EarlyTerminationReason.NONE) {
228+
metadata.put(MetadataKey.EARLY_TERMINATION_REASON.getName(), _earlyTerminationReason.name());
229+
}
211230
return metadata;
212231
}
213232
}

pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class DistinctCombineOperator extends BaseSingleBlockCombineOperator<Dist
3434
private static final String EXPLAIN_NAME = "COMBINE_DISTINCT";
3535

3636
public DistinctCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
37-
super(new DistinctResultsBlockMerger(), operators, queryContext, executorService);
37+
super(new DistinctResultsBlockMerger(queryContext), operators, queryContext, executorService);
3838
}
3939

4040
@Override

0 commit comments

Comments
 (0)