Skip to content

Commit 4b83c4b

Browse files
authored
Merge pull request #373 from JuliaParallel/jps/task-balance
Implement work stealing
2 parents 85637c0 + 14dc2b5 commit 4b83c4b

File tree

11 files changed

+536
-118
lines changed

11 files changed

+536
-118
lines changed

Project.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ version = "0.16.3"
55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
77
ContextVariablesX = "6add18c4-b38d-439d-96f6-d6bc489c04c5"
8+
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
89
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
910
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1011
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"

docs/src/index.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,5 +227,41 @@ opts = Dagger.Sch.ThunkOptions(;single=1)
227227
delayed(+)(1, 2; options=opts)
228228
```
229229

230+
### Core vs. Worker Schedulers
231+
232+
Dagger's scheduler is really two kinds of entities: the "core" scheduler, and
233+
"worker" schedulers:
234+
235+
The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks
236+
which have been submitted. The core scheduler manages all task dependencies,
237+
notifies calls to `wait` and `fetch` of task completion, and generally performs
238+
initial task placement. The core scheduler has cached information about each
239+
worker and their processors, and uses that information (together with metrics
240+
about previous tasks and other aspects of the Dagger runtime) to generate a
241+
near-optimal just-in-time task schedule.
242+
243+
The worker schedulers each run as a set of tasks across all workers and all
244+
processors, and handles data movement and task execution. Once the core
245+
scheduler has scheduled and launched a task, it arrives at the worker scheduler
246+
for handling. The worker scheduler will pass the task to a queue for the
247+
assigned processor, where it will wait until the processor has a sufficient
248+
amount of "occupancy" for the task. Once the processor is ready for the task,
249+
it will first fetch all arguments to the task from other workers, and then it
250+
will execute the task, package the result into a `Chunk`, and pass that back to
251+
the core scheduler.
252+
253+
### Workload Balancing
254+
255+
In general, Dagger's core scheduler tries to balance workloads as much as
256+
possible across all the available processors, but it can fail to do so
257+
effectively when either the cached per-processor information is outdated, or
258+
when the estimates about the task's behavior are inaccurate. To minimize the
259+
impact of this potential workload imbalance, the worker schedulers' processors
260+
will attempt to steal tasks from each other when they are under-occupied. Tasks
261+
will only be stolen if their [scope](`Scopes`) matches the processor attempting
262+
the steal, so tasks with wider scopes have better balancing potential.
263+
264+
### Scheduler/Thunk Options
265+
230266
[`Dagger.Sch.SchedulerOptions`](@ref)
231267
[`Dagger.Sch.ThunkOptions`](@ref)

src/Dagger.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ include("chunks.jl")
3636
include("compute.jl")
3737
include("utils/clock.jl")
3838
include("utils/system_uuid.jl")
39+
include("utils/locked-object.jl")
3940
include("sch/Sch.jl"); using .Sch
4041

4142
# Array computations

src/processor.jl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
export OSProc, Context, addprocs!, rmprocs!
22

3+
import Base: @invokelatest
4+
35
"""
46
Processor
57
@@ -157,8 +159,9 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...))
157159
task = Task() do
158160
set_tls!(tls)
159161
TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
160-
f(args...)
162+
@invokelatest f(args...)
161163
end
164+
task.sticky = true
162165
ret = ccall(:jl_set_task_tid, Cint, (Any, Cint), task, proc.tid-1)
163166
if ret == 0
164167
error("jl_set_task_tid == 0")
@@ -310,8 +313,7 @@ get_tls() = (
310313
sch_uid=task_local_storage(:_dagger_sch_uid),
311314
sch_handle=task_local_storage(:_dagger_sch_handle),
312315
processor=thunk_processor(),
313-
time_utilization=task_local_storage(:_dagger_time_utilization),
314-
alloc_utilization=task_local_storage(:_dagger_alloc_utilization),
316+
task_spec=task_local_storage(:_dagger_task_spec),
315317
)
316318

317319
"""
@@ -323,6 +325,5 @@ function set_tls!(tls)
323325
task_local_storage(:_dagger_sch_uid, tls.sch_uid)
324326
task_local_storage(:_dagger_sch_handle, tls.sch_handle)
325327
task_local_storage(:_dagger_processor, tls.processor)
326-
task_local_storage(:_dagger_time_utilization, tls.time_utilization)
327-
task_local_storage(:_dagger_alloc_utilization, tls.alloc_utilization)
328+
task_local_storage(:_dagger_task_spec, tls.task_spec)
328329
end

0 commit comments

Comments
 (0)