Skip to content

Commit 6275277

Browse files
authored
MSQ: Use full parallelism in localSort. (#18765)
Since localSort was introduced for sort-merge join in #13506, it has used 1 processor at a time with a max of 2 channels per processor. This is inefficient, because at the time the sorter runs, it is the only sorter running. The patch contains a comment describing the situation in more detail. This patch has two benefits. First, the sorter should run faster if multiple processing threads are available. Second, the sorter, due to having a larger max channels per merger, will make more efficient use of intermediate channels.
1 parent 4a76915 commit 6275277

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ public boolean isBuffered()
364364
nextFuture = Futures.transformAsync(
365365
nextFuture,
366366
ignored -> {
367+
final WorkerMemoryParameters memoryParameters = executionContext.frameContext().memoryParameters();
367368
final SuperSorter sorter = new SuperSorter(
368369
Collections.singletonList(channel.getReadableChannel()),
369370
stageDefinition.getFrameReader(),
@@ -382,8 +383,18 @@ public <T2> FrameProcessor<T2> decorate(FrameProcessor<T2> processor)
382383
executionContext.makeIntermediateOutputChannelFactory(
383384
StringUtils.format("hash-parts-super-sort-%06d", channel.getPartitionNumber())),
384385
executionContext.frameContext().frameWriterSpec().getRowBasedFrameType(),
385-
1,
386-
2,
386+
387+
// Use full parallelism, since at the time this sorter runs, it is the only sorter running.
388+
//
389+
// Typically, nothing else is running at all. Whenever there is more than one output partition,
390+
// the step prior to this localSort (typically hashPartition) should use buffered output channels
391+
// and therefore would have exited.
392+
//
393+
// In the case where there is one output partition, the step prior to this localSort may run
394+
// concurrently with the sorter, but in that case it will only have one output channel so won't
395+
// have many frames buffered.
396+
memoryParameters.getSuperSorterConcurrentProcessors(),
397+
memoryParameters.getSuperSorterMaxChannelsPerMerger(),
387398
ShuffleSpec.UNLIMITED,
388399
executionContext.cancellationId(),
389400

0 commit comments

Comments
 (0)