Skip to content

Commit fc29b74

Browse files
authored
fix: Do not block scheduler if there are stages ready for scheduling (#26268)
## Description Do not block query scheduler when there is a stage ready to be scheduled. ## Motivation and Context Consider a query with two stages, Stage 1 and Stage 2. Before this change when one for the stages is blocked the scheduler will block and wait for it to unblock preventing the second stage from scheduling. ## Impact Reduced scheduling delays in certain cases ## Test Plan Tested with a simple INSERT INTO query with scaled writers enabled. ScaledWriters scheduler has a fixed 200ms delay. Delay in ScaledWriter scheduler was causing slow split scheduling for TableScan stage. ## Contributor checklist - [x] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
1 parent 7adbad9 commit fc29b74

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.execution.RemoteTask;
2828
import com.facebook.presto.execution.RemoteTaskFactory;
2929
import com.facebook.presto.execution.SqlStageExecution;
30+
import com.facebook.presto.execution.StageExecutionId;
3031
import com.facebook.presto.execution.StageExecutionInfo;
3132
import com.facebook.presto.execution.StageExecutionState;
3233
import com.facebook.presto.execution.StageId;
@@ -423,6 +424,7 @@ private void schedule()
423424
Set<StageId> completedStages = new HashSet<>();
424425

425426
List<ExecutionSchedule> sectionExecutionSchedules = new LinkedList<>();
427+
Map<StageExecutionId, ListenableFuture<?>> blockedStages = new HashMap<>();
426428

427429
while (!Thread.currentThread().isInterrupted()) {
428430
// remove finished section
@@ -445,19 +447,24 @@ private void schedule()
445447
.forEach(sectionExecutionSchedules::add);
446448

447449
while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
448-
List<ListenableFuture<?>> blockedStages = new ArrayList<>();
449-
450450
List<StageExecutionAndScheduler> executionsToSchedule = sectionExecutionSchedules.stream()
451451
.flatMap(schedule -> schedule.getStagesToSchedule().stream())
452452
.collect(toImmutableList());
453453

454+
boolean allBlocked = true;
454455
for (StageExecutionAndScheduler stageExecutionAndScheduler : executionsToSchedule) {
455456
long startCpuNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime();
456457
long startWallNanos = System.nanoTime();
457458

458459
SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
459460
stageExecution.beginScheduling();
460461

462+
ListenableFuture<?> stillBlocked = blockedStages.get(stageExecution.getStageExecutionId());
463+
if (stillBlocked != null && !stillBlocked.isDone()) {
464+
continue;
465+
}
466+
blockedStages.remove(stageExecution.getStageExecutionId());
467+
461468
// perform some scheduling work
462469
ScheduleResult result = stageExecutionAndScheduler.getStageScheduler()
463470
.schedule();
@@ -475,7 +482,10 @@ private void schedule()
475482
stageExecution.schedulingComplete();
476483
}
477484
else if (!result.getBlocked().isDone()) {
478-
blockedStages.add(result.getBlocked());
485+
blockedStages.put(stageExecution.getStageExecutionId(), result.getBlocked());
486+
}
487+
else {
488+
allBlocked = false;
479489
}
480490
stageExecutionAndScheduler.getStageLinkage()
481491
.processScheduleResults(stageExecution.getState(), result.getNewTasks());
@@ -535,11 +545,11 @@ else if (!result.getBlocked().isDone()) {
535545
}
536546

537547
// wait for a state change and then schedule again
538-
if (!blockedStages.isEmpty()) {
548+
if (allBlocked && !blockedStages.isEmpty()) {
539549
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
540-
tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
550+
tryGetFutureValue(whenAnyComplete(blockedStages.values()), 1, SECONDS);
541551
}
542-
for (ListenableFuture<?> blockedStage : blockedStages) {
552+
for (ListenableFuture<?> blockedStage : blockedStages.values()) {
543553
blockedStage.cancel(true);
544554
}
545555
}

0 commit comments

Comments
 (0)