Skip to content

Commit b461012

Browse files
authored
Merge pull request #398 from JuliaParallel/jps/task-queues
Implement task queues for configurable task launch
2 parents 8a98558 + 1ec8563 commit b461012

17 files changed

+815
-264
lines changed

docs/make.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ makedocs(;
1717
"Data Management" => "data-management.md",
1818
"Scopes" => "scopes.md",
1919
"Processors" => "processors.md",
20-
"Dynamic Scheduler Control" => "dynamic.md",
20+
"Task Queues" => "task-queues.md",
2121
"Option Propagation" => "propagation.md",
2222
"Logging and Graphing" => "logging.md",
2323
"Checkpointing" => "checkpointing.md",
2424
"Scheduler Visualization" => "scheduler-visualization.md",
2525
"Benchmarking" => "benchmarking.md",
26+
"Dynamic Scheduler Control" => "dynamic.md",
2627
"Scheduler Internals" => "scheduler-internals.md",
2728
"Dagger API" => [
2829
"Types" => "api-dagger/types.md",

docs/src/task-queues.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Task Queues
2+
3+
By default, `@spawn`/`spawn` submit tasks immediately and directly into
4+
Dagger's scheduler without modifications. However, sometimes you want to be
5+
able to tweak this behavior for a region of code; for example, when working
6+
with GPUs or other operations which operate in-place, you might want to emulate
7+
CUDA's stream semantics by ensuring that tasks execute sequentially (to avoid
8+
one kernel reading from an array while another kernel is actively writing to
9+
it). Or, you might want to ensure that a set of Dagger tasks are submitted into
10+
the scheduler all at once for benchmarking purposes or to emulate the behavior
11+
of `delayed`. This and more is possible through a mechanism called "task
12+
queues".
13+
14+
A task queue in Dagger is an object that can be configured to accept unlaunched
15+
tasks from `@spawn`/`spawn` and either modify them or delay their launching
16+
arbitrarily. By default, Dagger tasks are enqueued through the
17+
`EagerTaskQueue`, which submits tasks directly into the scheduler before
18+
`@spawn`/`spawn` returns. However, Dagger also has an `InOrderTaskQueue`, which
19+
ensures that tasks enqueued through it execute sequentially with respect to
20+
each other. This queue can be allocated with `Dagger.spawn_sequential`:
21+
22+
```julia
23+
A = rand(16)
24+
B = zeros(16)
25+
C = zeros(16)
26+
function vcopy!(B, A)
27+
B .= A .+ 1.0
28+
return
29+
end
30+
function vadd!(C, A, B)
31+
C .+= A .+ B
32+
return
33+
end
34+
wait(Dagger.spawn_sequential() do
35+
Dagger.@spawn vcopy!(B, A)
36+
Dagger.@spawn vadd!(C, A, B)
37+
end)
38+
```
39+
40+
In the above example, `vadd!` is guaranteed to wait until `vcopy!` is
41+
completed, even though `vadd!` isn't taking the result of `vcopy!` as an
42+
argument (which is how tasks are normally ordered).
43+
44+
What if we wanted to launch multiple `vcopy!` calls within a `spawn_sequential`
45+
region and allow them to execute in parallel, but still ensure that the `vadd!`
46+
happens after they all finish? In this case, we want to switch to another kind
47+
of task queue: the `LazyTaskQueue`. This task queue batches up task submissions
48+
into groups, so that all tasks enqueued with it are placed in the scheduler all
49+
at once. But what would happen if we used this task queue (via `spawn_bulk`)
50+
within a region using `spawn_sequential`:
51+
52+
```julia
53+
A = rand(16)
54+
B1 = zeros(16)
55+
B2 = zeros(16)
56+
C = zeros(16)
57+
wait(Dagger.spawn_sequential() do
58+
Dagger.spawn_bulk() do
59+
Dagger.@spawn vcopy!(B1, A)
60+
Dagger.@spawn vcopy!(B2, A)
61+
end
62+
Dagger.@spawn vadd!(C, B1, B2)
63+
end)
64+
```
65+
66+
Conveniently, Dagger's task queues can be nested to get the expected behavior;
67+
the above example will submit the two `vcopy!` tasks as a group (and they can
68+
execute concurrently), while still ensuring that those two tasks finish before
69+
the `vadd!` task executes.
70+
71+
!!! warn
72+
Task queues do not propagate to nested tasks; if a Dagger task launches
73+
another task internally, the child task doesn't inherit the task queue that
74+
the parent task was enqueued in.

src/Dagger.jl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@ using Requires
1818
using MacroTools
1919
using TimespanLogging
2020

21-
const PLUGINS = Dict{Symbol,Any}()
22-
const PLUGIN_CONFIGS = Dict{Symbol,String}(
23-
:scheduler => "Dagger.Sch"
24-
)
25-
2621
include("lib/util.jl")
22+
include("utils/dagdebug.jl")
2723

2824
# Distributed data
2925
include("options.jl")
3026
include("processor.jl")
3127
include("scopes.jl")
28+
include("eager_thunk.jl")
29+
include("queue.jl")
3230
include("thunk.jl")
31+
include("submission.jl")
3332
include("chunks.jl")
3433

3534
# Task scheduling

src/chunks.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ shouldpersist(p::Chunk) = t.persist
6262
processor(c::Chunk) = c.processor
6363
affinity(c::Chunk) = affinity(c.handle)
6464

65+
is_task_or_chunk(c::Chunk) = true
66+
6567
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
6668
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
6769

@@ -272,6 +274,7 @@ function unwrap_weak_checked(c::WeakChunk)
272274
@assert c !== nothing
273275
return c
274276
end
277+
is_task_or_chunk(c::WeakChunk) = true
275278

276279
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
277280
Base.@deprecate_binding Part Chunk

src/compute.jl

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ runs the scheduler with the specified options. Returns a Chunk which references
2020
the result.
2121
"""
2222
function compute(ctx::Context, d::Thunk; options=nothing)
23-
scheduler = get!(PLUGINS, :scheduler) do
24-
get_type(PLUGIN_CONFIGS[:scheduler])
25-
end
26-
res = scheduler.compute_dag(ctx, d; options=options)
23+
result = Sch.compute_dag(ctx, d; options=options)
2724
if ctx.log_file !== nothing
2825
if ctx.log_sink isa TimespanLogging.LocalEventLog
2926
logs = TimespanLogging.get_logs!(ctx.log_sink)
@@ -34,12 +31,12 @@ function compute(ctx::Context, d::Thunk; options=nothing)
3431
@warn "Context log_sink not set to LocalEventLog, skipping"
3532
end
3633
end
37-
res
34+
result
3835
end
3936

4037
function debug_compute(ctx::Context, args...; profile=false, options=nothing)
41-
@time res = compute(ctx, args...; options=options)
42-
get_logs!(ctx.log_sink), res
38+
@time result = compute(ctx, args...; options=options)
39+
get_logs!(ctx.log_sink), result
4340
end
4441

4542
function debug_compute(arg; profile=false, options=nothing)
@@ -53,11 +50,7 @@ Base.@deprecate gather(x) collect(x)
5350

5451
cleanup() = cleanup(Context(global_context()))
5552
function cleanup(ctx::Context)
56-
if :scheduler in keys(PLUGINS)
57-
scheduler = PLUGINS[:scheduler]
58-
(scheduler).cleanup(ctx)
59-
delete!(PLUGINS, :scheduler)
60-
end
53+
Sch.cleanup(ctx)
6154
nothing
6255
end
6356

@@ -92,7 +85,7 @@ function dependents(node::Thunk)
9285
if !haskey(deps, next)
9386
deps[next] = Set{Thunk}()
9487
end
95-
for (_, inp) in next.inputs
88+
for inp in next.syncdeps
9689
if istask(inp) || (inp isa Chunk)
9790
s = get!(()->Set{Thunk}(), deps, inp)
9891
push!(s, next)
@@ -103,7 +96,7 @@ function dependents(node::Thunk)
10396
end
10497
push!(visited, next)
10598
end
106-
deps
99+
return deps
107100
end
108101

109102
"""
@@ -133,7 +126,7 @@ function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}})
133126
has_all || continue
134127
noff[next] = off
135128
end
136-
noff
129+
return noff
137130
end
138131

139132
"""
@@ -160,7 +153,7 @@ function order(node::Thunk, ndeps)
160153
haskey(output, next) && continue
161154
s += 1
162155
output[next] = s
163-
parents = filter(istask, map(last, next.inputs))
156+
parents = collect(filter(istask, next.syncdeps))
164157
if !isempty(parents)
165158
# If parents is empty, sort! should be a no-op, but raises an ambiguity error
166159
# when InlineStrings.jl is loaded (at least, version 1.1.0), because InlineStrings

src/eager_thunk.jl

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"A future holding the result of a `Thunk`."
2+
struct ThunkFuture
3+
future::Future
4+
end
5+
ThunkFuture(x::Integer) = ThunkFuture(Future(x))
6+
ThunkFuture() = ThunkFuture(Future())
7+
Base.isready(t::ThunkFuture) = isready(t.future)
8+
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
9+
wait(t.future)
10+
end
11+
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
12+
error, value = Dagger.Sch.thunk_yield() do
13+
fetch(t.future)
14+
end
15+
if error
16+
throw(value)
17+
end
18+
if raw
19+
return value
20+
else
21+
return move(proc, value)
22+
end
23+
end
24+
Base.put!(t::ThunkFuture, x; error=false) = put!(t.future, (error, x))
25+
26+
struct Options
27+
options::NamedTuple
28+
end
29+
Options(;options...) = Options((;options...))
30+
Options(options...) = Options((;options...))
31+
32+
"""
33+
EagerThunk
34+
35+
Returned from `spawn`/`@spawn` calls. Represents a task that is in the
36+
scheduler, potentially ready to execute, executing, or finished executing. May
37+
be `fetch`'d or `wait`'d on at any time.
38+
"""
39+
mutable struct EagerThunk
40+
uid::UInt
41+
future::ThunkFuture
42+
finalizer_ref::DRef
43+
thunk_ref::DRef
44+
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
45+
end
46+
47+
Base.isready(t::EagerThunk) = isready(t.future)
48+
function Base.wait(t::EagerThunk)
49+
if !isdefined(t, :thunk_ref)
50+
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `EagerThunk`"))
51+
end
52+
wait(t.future)
53+
end
54+
function Base.fetch(t::EagerThunk; raw=false)
55+
if !isdefined(t, :thunk_ref)
56+
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
57+
end
58+
return fetch(t.future; raw)
59+
end
60+
function Base.show(io::IO, t::EagerThunk)
61+
status = if isdefined(t, :thunk_ref)
62+
isready(t) ? "finished" : "running"
63+
else
64+
"not launched"
65+
end
66+
print(io, "EagerThunk ($status)")
67+
end
68+
istask(t::EagerThunk) = true
69+
70+
"When finalized, cleans-up the associated `EagerThunk`."
71+
mutable struct EagerThunkFinalizer
72+
uid::UInt
73+
function EagerThunkFinalizer(uid)
74+
x = new(uid)
75+
finalizer(Sch.eager_cleanup, x)
76+
x
77+
end
78+
end
79+
80+
const EAGER_ID_COUNTER = Threads.Atomic{UInt64}(1)
81+
function eager_next_id()
82+
if myid() == 1
83+
Threads.atomic_add!(EAGER_ID_COUNTER, one(UInt64))
84+
else
85+
remotecall_fetch(eager_next_id, 1)
86+
end
87+
end

src/queue.jl

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
mutable struct EagerTaskSpec
2+
f
3+
args::Vector{Pair{Union{Symbol,Nothing},Any}}
4+
options::NamedTuple
5+
end
6+
7+
abstract type AbstractTaskQueue end
8+
9+
function enqueue! end
10+
11+
struct EagerTaskQueue <: AbstractTaskQueue end
12+
enqueue!(::EagerTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk}) =
13+
eager_launch!(spec)
14+
enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
15+
eager_launch!(specs)
16+
17+
enqueue!(spec::Pair{EagerTaskSpec,EagerThunk}) =
18+
enqueue!(get_options(:task_queue, EagerTaskQueue()), spec)
19+
enqueue!(specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
20+
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
21+
22+
struct LazyTaskQueue <: AbstractTaskQueue
23+
tasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
24+
LazyTaskQueue() = new(Pair{EagerTaskSpec,EagerThunk}[])
25+
end
26+
function enqueue!(queue::LazyTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
27+
push!(queue.tasks, spec)
28+
end
29+
function enqueue!(queue::LazyTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
30+
append!(queue.tasks, specs)
31+
end
32+
function spawn_bulk(f::Base.Callable)
33+
queue = LazyTaskQueue()
34+
result = with_options(f; task_queue=queue)
35+
if length(queue.tasks) > 0
36+
enqueue!(queue.tasks)
37+
end
38+
return result
39+
end
40+
41+
struct InOrderTaskQueue <: AbstractTaskQueue
42+
upper_queue::AbstractTaskQueue
43+
prev_tasks::Set{EagerThunk}
44+
InOrderTaskQueue(upper_queue) = new(upper_queue,
45+
Set{EagerThunk}())
46+
end
47+
function _add_prev_deps!(queue::InOrderTaskQueue, spec::EagerTaskSpec)
48+
# Add previously-enqueued task(s) to this task's syncdeps
49+
opts = spec.options
50+
syncdeps = get(Set{Any}, opts, :syncdeps)
51+
for task in queue.prev_tasks
52+
push!(syncdeps, task)
53+
end
54+
spec.options = merge(opts, (;syncdeps,))
55+
end
56+
function enqueue!(queue::InOrderTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
57+
if length(queue.prev_tasks) > 0
58+
_add_prev_deps!(queue, first(spec))
59+
empty!(queue.prev_tasks)
60+
end
61+
push!(queue.prev_tasks, last(spec))
62+
enqueue!(queue.upper_queue, spec)
63+
end
64+
function enqueue!(queue::InOrderTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
65+
if length(queue.prev_tasks) > 0
66+
for (spec, task) in specs
67+
_add_prev_deps!(queue, spec)
68+
end
69+
empty!(queue.prev_tasks)
70+
end
71+
for (spec, task) in specs
72+
push!(queue.prev_tasks, task)
73+
end
74+
enqueue!(queue.upper_queue, specs)
75+
end
76+
function spawn_sequential(f::Base.Callable)
77+
queue = InOrderTaskQueue(get_options(:task_queue, EagerTaskQueue()))
78+
return with_options(f; task_queue=queue)
79+
end

0 commit comments

Comments
 (0)