Skip to content

Commit ab969cb

Browse files
authored
MSQ: Configurable maxPartitions and maxInputFilesPerWorker. (#18826)
* MSQ: Configurable maxPartitions and maxInputFilesPerWorker. The defaults remain the same, but the parameters become configurable. This patch also contains a removal of redundant logic: RunWorkOrder's initGlobalSortPartitionBoundariesIfNeeded is removed, since WorkerStageKernel has the same logic. We can copy the partitions over from there.
1 parent 425b906 commit ab969cb

File tree

19 files changed

+345
-88
lines changed

19 files changed

+345
-88
lines changed

docs/multi-stage-query/reference.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,8 @@ The following table lists the context parameters for the MSQ task engine:
414414
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` |
415415
| `includeAllCounters` | SELECT, INSERT or REPLACE<br /><br />Whether to include counters that were added in Druid 31 or later. This is a backwards compatibility option that must be set to `false` during a rolling update from versions prior to Druid 31. | `true` |
416416
| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) |
417+
| `maxInputFilesPerWorker` | SELECT, INSERT, REPLACE<br /><br />Maximum number of input files or segments per worker. If a single worker would need to read more than this number of files, the query fails with a `TooManyInputFiles` error. In this case, you should either increase this limit if your tasks have enough memory to handle more files, add more workers by increasing `maxNumTasks`, or split your query into smaller queries that process fewer files. | 10,000 |
418+
| `maxPartitions` | SELECT, INSERT, REPLACE<br /><br />Maximum number of output partitions for any single stage. For INSERT or REPLACE queries, this controls the maximum number of segments that can be generated. If the query would exceed this limit, it fails with a `TooManyPartitions` error. You can increase this limit if needed, break your query into smaller queries, or use a larger target segment size (via `rowsPerSegment`). | 25,000 |
417419
| `maxThreads` | SELECT, INSERT or REPLACE<br /><br />Maximum number of threads to use for processing. This only has an effect if it is greater than zero and less than the default thread count based on system configuration. Otherwise, it is ignored, and workers use the default thread count. | Not set (use default thread count) |
418420

419421
## Joins
@@ -571,10 +573,10 @@ The following table lists query limits:
571573

572574
| Limit | Value | Error if exceeded |
573575
|---|---|---|
574-
| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. | 1 MB | `RowTooLarge` |
576+
| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. Configurable with [`maxFrameSize`](#context). | 1 MB | `RowTooLarge` |
575577
| Number of segment-granular time chunks encountered during ingestion. | 5,000 | `TooManyBuckets`|
576-
| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles`|
577-
| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions`|
578+
| Number of input files/segments per worker. Configurable with [`maxInputFilesPerWorker`](#context). | 10,000 | `TooManyInputFiles`|
579+
| Number of output partitions for any one stage. Number of segments generated during ingestion. Configurable with [`maxPartitions`](#context). | 25,000 | `TooManyPartitions`|
578580
| Number of output columns for any one stage. | 2,000 | `TooManyColumns`|
579581
| Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` |
580582
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers`|

multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,7 @@ private void startWorkForStage(
14441444
final QueryDefinition queryDef,
14451445
final ControllerQueryKernel queryKernel,
14461446
final int stageNumber,
1447+
final int maxInputFilesPerWorker,
14471448
@Nullable final List<SegmentIdWithShardSpec> segmentsToGenerate
14481449
)
14491450
{
@@ -1454,7 +1455,8 @@ private void startWorkForStage(
14541455
segmentsToGenerate
14551456
);
14561457

1457-
final Int2ObjectMap<WorkOrder> workOrders = queryKernel.createWorkOrders(stageNumber, extraInfos);
1458+
final Int2ObjectMap<WorkOrder> workOrders =
1459+
queryKernel.createWorkOrders(stageNumber, maxInputFilesPerWorker, extraInfos);
14581460
final StageId stageId = new StageId(queryDef.getQueryId(), stageNumber);
14591461

14601462
queryKernel.startStage(stageId);
@@ -2533,8 +2535,12 @@ private void submitSequentialMergeFetchRequests(StageId stageId, Set<String> tas
25332535
*/
25342536
private void startStages() throws IOException, InterruptedException
25352537
{
2538+
final int maxInputFilesPerWorker =
2539+
MultiStageQueryContext.getMaxInputFilesPerWorker(querySpec.getContext());
25362540
final long maxInputBytesPerWorker =
25372541
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getContext());
2542+
final int maxPartitions =
2543+
MultiStageQueryContext.getMaxPartitions(querySpec.getContext());
25382544

25392545
logKernelStatus(queryDef.getQueryId(), queryKernel);
25402546

@@ -2545,7 +2551,9 @@ private void startStages() throws IOException, InterruptedException
25452551
inputSpecSlicerFactory,
25462552
querySpec.getAssignmentStrategy(),
25472553
rowBasedFrameType,
2548-
maxInputBytesPerWorker
2554+
maxInputFilesPerWorker,
2555+
maxInputBytesPerWorker,
2556+
maxPartitions
25492557
);
25502558

25512559
for (final StageId stageId : newStageIds) {
@@ -2573,7 +2581,13 @@ private void startStages() throws IOException, InterruptedException
25732581
retryWorkersOrFailJob(queryKernel, workerFaultSet);
25742582
}
25752583
stageRuntimesForLiveReports.put(stageId.getStageNumber(), new Interval(DateTimes.nowUtc(), DateTimes.MAX));
2576-
startWorkForStage(queryDef, queryKernel, stageId.getStageNumber(), segmentsToGenerate);
2584+
startWorkForStage(
2585+
queryDef,
2586+
queryKernel,
2587+
stageId.getStageNumber(),
2588+
maxInputFilesPerWorker,
2589+
segmentsToGenerate
2590+
);
25772591
}
25782592
} while (!newStageIds.isEmpty());
25792593
}

multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
package org.apache.druid.msq.exec;
2121

22+
import org.apache.druid.msq.indexing.error.TooManyBucketsFault;
23+
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
24+
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
25+
import org.apache.druid.msq.util.MultiStageQueryContext;
26+
2227
public class Limits
2328
{
2429
/**
@@ -45,9 +50,10 @@ public class Limits
4550
public static final int MAX_WORKERS = 1000;
4651

4752
/**
48-
* Maximum number of input files per worker
53+
* Default maximum number of input files per worker. Exceeding this will yield a {@link TooManyInputFilesFault}.
54+
* Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_INPUT_FILES_PER_WORKER}.
4955
*/
50-
public static final int MAX_INPUT_FILES_PER_WORKER = 10_000;
56+
public static final int DEFAULT_MAX_INPUT_FILES_PER_WORKER = 10_000;
5157

5258
/**
5359
* Maximum number of parse exceptions with their stack traces a worker can send to the controller.
@@ -94,10 +100,18 @@ public class Limits
94100
public static final long MAX_SELECT_RESULT_ROWS = 3_000;
95101

96102
/**
97-
* Max number of partition buckets for ingestion queries.
103+
* Max number of partition buckets. Exceeding this will yield a {@link TooManyBucketsFault}. For an ingestion job,
104+
* this is the maximum number of output time chunks.
98105
*/
99106
public static final int MAX_PARTITION_BUCKETS = 5_000;
100107

108+
/**
109+
* Default max number of output partitions for a stage. Exceeding this will yield a {@link TooManyPartitionsFault}.
110+
* For an ingestion job, this is the maximum number of segments that can be created.
111+
* Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_PARTITIONS}.
112+
*/
113+
public static final int DEFAULT_MAX_PARTITIONS = 25_000;
114+
101115
/**
102116
* Max number of rows with the same key in a window. This acts as a guardrail for
103117
* data distribution with high cardinality

multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.apache.druid.msq.input.stage.StageInputSliceReader;
6666
import org.apache.druid.msq.input.table.SegmentsInputSlice;
6767
import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
68-
import org.apache.druid.msq.kernel.ShuffleKind;
6968
import org.apache.druid.msq.kernel.WorkOrder;
7069
import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory;
7170
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -174,7 +173,6 @@ public void startAsync()
174173

175174
try {
176175
exec.registerCancellationId(cancellationId);
177-
initGlobalSortPartitionBoundariesIfNeeded();
178176
startStageProcessor();
179177
setUpCompletionCallbacks();
180178
}
@@ -284,25 +282,6 @@ private void startStageProcessor()
284282
stageResultFuture = processor.execute(executionContext);
285283
}
286284

287-
/**
288-
* Initialize {@link #stagePartitionBoundariesFuture} if it will be needed (i.e. if {@link ShuffleKind#GLOBAL_SORT})
289-
* but does not need statistics. In this case, it is known upfront, before the job starts.
290-
*/
291-
private void initGlobalSortPartitionBoundariesIfNeeded()
292-
{
293-
if (workOrder.getStageDefinition().doesShuffle()
294-
&& workOrder.getStageDefinition().getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT
295-
&& !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) {
296-
// Result key stats aren't needed, so the partition boundaries are knowable ahead of time. Compute them now.
297-
final ClusterByPartitions boundaries =
298-
workOrder.getStageDefinition()
299-
.generatePartitionBoundariesForShuffle(null)
300-
.valueOrThrow();
301-
302-
stagePartitionBoundariesFuture.set(boundaries);
303-
}
304-
}
305-
306285
/**
307286
* Callbacks that fire when all work for the stage is done (i.e. when {@link #stageResultFuture} resolves).
308287
*/

multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,11 @@ private void handleNewWorkOrder(
429429
// Set up processorCloser (called when processing is done).
430430
kernelHolder.processorCloser.register(() -> runWorkOrder.stop(null));
431431

432+
// Set resultPartitionBoundaries in RunWorkOrder if it is known ahead of time.
433+
if (kernel.hasResultPartitionBoundaries()) {
434+
runWorkOrder.getStagePartitionBoundariesFuture().set(kernel.getResultPartitionBoundaries());
435+
}
436+
432437
// Start working on this stage immediately.
433438
kernel.startReading();
434439
runWorkOrder.startAsync();

multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ public TooManyInputFilesFault(
4646
super(
4747
CODE,
4848
"Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try"
49-
+ " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by"
50-
+ " setting %s in your query context",
49+
+ " increasing the limit using the %s query context parameter, breaking your query up into smaller queries,"
50+
+ " or increasing the number of workers to at least [%d] by setting %s in your query context.",
5151
numInputFiles,
5252
maxInputFiles,
53+
MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER,
5354
minNumWorkers,
5455
MultiStageQueryContext.CTX_MAX_NUM_TASKS
5556
);

multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.fasterxml.jackson.annotation.JsonCreator;
2323
import com.fasterxml.jackson.annotation.JsonProperty;
2424
import com.fasterxml.jackson.annotation.JsonTypeName;
25+
import org.apache.druid.msq.util.MultiStageQueryContext;
2526

2627
import java.util.Objects;
2728

@@ -37,9 +38,10 @@ public TooManyPartitionsFault(@JsonProperty("maxPartitions") final int maxPartit
3738
{
3839
super(
3940
CODE,
40-
"Too many partitions (max = %d); try breaking your query up into smaller queries or "
41-
+ "using a larger target size",
42-
maxPartitions
41+
"Too many partitions (max = %d). Try increasing the limit using the %s query context parameter, "
42+
+ "breaking your query up into smaller queries, or using a larger target size.",
43+
maxPartitions,
44+
MultiStageQueryContext.CTX_MAX_PARTITIONS
4345
);
4446
this.maxPartitions = maxPartitions;
4547
}

multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.apache.druid.msq.input.table.TableInputSpec;
4949
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
5050
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
51+
import org.apache.druid.msq.util.MultiStageQueryContext;
52+
import org.apache.druid.query.QueryContext;
5153
import org.apache.druid.segment.column.RowSignature;
5254

5355
import javax.annotation.Nullable;
@@ -85,8 +87,6 @@
8587
*/
8688
public class StageDefinition
8789
{
88-
private static final int MAX_PARTITIONS = 25_000; // Limit for TooManyPartitions
89-
9090
// If adding any fields here, add them to builder(StageDefinition) below too.
9191
private final StageId id;
9292
private final List<InputSpec> inputSpecs;
@@ -292,12 +292,6 @@ public boolean getShuffleCheckHasMultipleValues()
292292
return shuffleCheckHasMultipleValues;
293293
}
294294

295-
public int getMaxPartitionCount()
296-
{
297-
// Pretends to be an instance method, but really returns a constant. Maybe one day this will be defined per stage.
298-
return MAX_PARTITIONS;
299-
}
300-
301295
public int getStageNumber()
302296
{
303297
return id.getStageNumber();
@@ -334,8 +328,19 @@ public boolean mustGatherResultKeyStatistics()
334328
return mustGatherResultKeyStatistics(shuffleSpec);
335329
}
336330

331+
/**
332+
* Generate partition boundaries for {@link ShuffleKind#GLOBAL_SORT} shuffles.
333+
*
334+
* @param collector statistics collector, to be provided if {@link #mustGatherResultKeyStatistics()}
335+
* @param maxPartitions maximum number of partitions to generate. On the controller, this is the value of
336+
* {@link MultiStageQueryContext#getMaxPartitions(QueryContext)}. On workers, this method
337+
* is only used when the number of partitions is determined ahead of time by the
338+
* {@link ShuffleSpec}, so {@link Integer#MAX_VALUE} is typically provided for this parameter
339+
* out of convenience.
340+
*/
337341
public Either<Long, ClusterByPartitions> generatePartitionBoundariesForShuffle(
338-
@Nullable ClusterByStatisticsCollector collector
342+
@Nullable ClusterByStatisticsCollector collector,
343+
int maxPartitions
339344
)
340345
{
341346
if (shuffleSpec == null) {
@@ -351,7 +356,7 @@ public Either<Long, ClusterByPartitions> generatePartitionBoundariesForShuffle(
351356
} else if (!mustGatherResultKeyStatistics() && collector != null) {
352357
throw new ISE("Statistics gathered, but not required for stage[%d]", getStageNumber());
353358
} else {
354-
return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, MAX_PARTITIONS);
359+
return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, maxPartitions);
355360
}
356361
}
357362

multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import it.unimi.dsi.fastutil.ints.Int2IntMap;
2525
import it.unimi.dsi.fastutil.ints.IntSet;
2626
import org.apache.druid.java.util.common.StringUtils;
27-
import org.apache.druid.msq.exec.Limits;
2827
import org.apache.druid.msq.input.InputSlice;
2928
import org.apache.druid.msq.input.InputSpec;
3029
import org.apache.druid.msq.input.InputSpecSlicer;
@@ -49,6 +48,7 @@ public List<InputSlice> assign(
4948
final InputSpec inputSpec,
5049
final Int2IntMap stageWorkerCountMap,
5150
final InputSpecSlicer slicer,
51+
final int maxInputFilesPerSlice,
5252
final long maxInputBytesPerSlice
5353
)
5454
{
@@ -58,7 +58,7 @@ public List<InputSlice> assign(
5858

5959
/**
6060
* Use the lowest possible number of workers, while keeping each worker's workload under
61-
* {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code maxInputBytesPerWorker} bytes.
61+
* {@code maxInputFilesPerSlice} files and {@code maxInputBytesPerWorker} bytes.
6262
*
6363
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
6464
*/
@@ -69,14 +69,15 @@ public List<InputSlice> assign(
6969
final InputSpec inputSpec,
7070
final Int2IntMap stageWorkerCountMap,
7171
final InputSpecSlicer slicer,
72+
final int maxInputFilesPerSlice,
7273
final long maxInputBytesPerSlice
7374
)
7475
{
7576
if (slicer.canSliceDynamic(inputSpec)) {
7677
return slicer.sliceDynamic(
7778
inputSpec,
7879
stageDef.getMaxWorkerCount(),
79-
Limits.MAX_INPUT_FILES_PER_WORKER,
80+
maxInputFilesPerSlice,
8081
maxInputBytesPerSlice
8182
);
8283
} else {
@@ -117,6 +118,7 @@ public String toString()
117118
* @param inputSpec inputSpec containing information on where the input is read from
118119
* @param stageWorkerCountMap map of past stage number vs number of worker inputs
119120
* @param slicer creates slices of input spec based on other parameters
121+
* @param maxInputFilesPerSlice hard maximum number of files per input slice
120122
* @param maxInputBytesPerSlice maximum suggested bytes per input slice
121123
* @return list containing input slices
122124
*/
@@ -125,6 +127,7 @@ public abstract List<InputSlice> assign(
125127
InputSpec inputSpec,
126128
Int2IntMap stageWorkerCountMap,
127129
InputSpecSlicer slicer,
130+
int maxInputFilesPerSlice,
128131
long maxInputBytesPerSlice
129132
);
130133
}

0 commit comments

Comments
 (0)