@@ -1105,19 +1105,23 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1105
1105
proc_occupancy = istate. proc_occupancy
1106
1106
time_pressure = istate. time_pressure
1107
1107
1108
+ work_to_do = false
1108
1109
while isopen (return_queue)
1109
1110
# Wait for new tasks
1110
- @dagdebug nothing :processor " Waiting for tasks"
1111
- timespan_start (ctx, :proc_run_wait , to_proc, nothing )
1112
- wait (istate. reschedule)
1113
- @static if VERSION >= v " 1.9"
1114
- reset (istate. reschedule)
1111
+ if ! work_to_do
1112
+ @dagdebug nothing :processor " Waiting for tasks"
1113
+ timespan_start (ctx, :proc_run_wait , to_proc, nothing )
1114
+ wait (istate. reschedule)
1115
+ @static if VERSION >= v " 1.9"
1116
+ reset (istate. reschedule)
1117
+ end
1118
+ timespan_finish (ctx, :proc_run_wait , to_proc, nothing )
1115
1119
end
1116
- timespan_finish (ctx, :proc_run_wait , to_proc, nothing )
1117
1120
1118
1121
# Fetch a new task to execute
1119
1122
@dagdebug nothing :processor " Trying to dequeue"
1120
1123
timespan_start (ctx, :proc_run_fetch , to_proc, nothing )
1124
+ work_to_do = false
1121
1125
task_and_occupancy = lock (istate. queue) do queue
1122
1126
# Only steal if there are multiple queued tasks, to prevent
1123
1127
# ping-pong of tasks between empty queues
@@ -1130,7 +1134,9 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1130
1134
@dagdebug nothing :processor " Insufficient occupancy" proc_occupancy= proc_occupancy[] task_occupancy= occupancy
1131
1135
return nothing
1132
1136
end
1133
- return dequeue_pair! (queue)
1137
+ queue_result = dequeue_pair! (queue)
1138
+ work_to_do = length (queue) > 0
1139
+ return queue_result
1134
1140
end
1135
1141
if task_and_occupancy === nothing
1136
1142
timespan_finish (ctx, :proc_run_fetch , to_proc, nothing )
0 commit comments