Skip to content

Commit c59e4cc

Browse files
committed
spawn: Add configurable task queueing
To allow Dagger to fulfill more use cases, it would be helpful if we could perform some basic transformations on tasks before they get submitted to the scheduler. For example, we would like to enable in-order execution semantics for regions of code that execute GPU kernels, as this matches GPU synchronization semantics, and thus makes it easier to upgrade GPU code to use Dagger. Separately, to enable reliable DAG optimizations, we need to be able to guarantee that a region of user code can be seen as a whole within the scheduler. The one-at-a-time task submission semantics that we currently have are insufficient to achieve this, as some tasks in the DAG region of interest may already have been launched before the optimization can see enough of the DAG to be useful. To support these and other use cases, this commit adds a flexible pre-submit task queueing system, as well as making it possible to add additional tasks as synchronization dependencies (instead of the default set from task arguments). The task queueing system allows a custom task queue to be set in TLS, which will be used by `@spawn`/`spawn` when submitting tasks. The task queue is provided one or more task specifications and `EagerThunk` handles, and is free to delay and/or batch task submission, as well as to modify the task specification arbitrarily to match the desired semantics. Task queues are nestable, and tasks submitted within sets of nested task queues should inherit the semantics of the queue they are contained within most directly (with further transformations occuring as tasks move upwards within the nest of queues). The most upstream task queue submits tasks to the worker 1 eager scheduler, but this is also expected to be flexible to allow unique task submission semantics. To support the goal of predictable optimizations, a `LazyTaskQueue` is added (available via `spawn_bulk`) which batches up multiple task submissions into just one, and locks the scheduler until all tasks have been submitted, allowing the scheduler to see the entire DAG structure all at once. Nesting of `spawn_bulk` queues allows multiple DAG regions to be combined into a single total region which is submitted all at once. The ability to specify additional task synchronization dependencies is also a key piece that is orthogonal to task queues. This feature enables the goal of in-order execution semantics by enabling the creation of an `InOrderTaskQueue` (available via `spawn_sequential`), which tracks the last-submitted task or set of tasks, and adds those tasks as additional synchronization dependencies to the next submitted task or set of tasks, effectively causing serializing behavior. Nesting of `spawn_sequential` queues allows separate sequential chains of tasks to be specified, with deeper-nested chains sequencing after previously-submitted tasks or chains in shallower-nested queues. Interestingly, the nesting of `spawn_bulk` within `spawn_sequential` allows entire DAG regions to explicitly synchronize against each other (such that one region executes before another), while allowing tasks within each region to still expose parallelism. The inverse nesting of `spawn_sequential` within `spawn_bulk` allows a chain of sequential tasks to be submitted all at once, adding interesting optimization opportunities. Alongside these enhancements, the eager submission pipeline is optimized by removing the `eager_thunk` submission pathway (which submitted all tasks into a `Channel`), and allows tasks to be directly submitted into the scheduler without redirection. It is expected that this will improve task submission performance and reduce memory usage.
1 parent 2075371 commit c59e4cc

17 files changed

+719
-186
lines changed

docs/make.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ makedocs(;
1717
"Data Management" => "data-management.md",
1818
"Scopes" => "scopes.md",
1919
"Processors" => "processors.md",
20-
"Dynamic Scheduler Control" => "dynamic.md",
20+
"Task Queues" => "task-queues.md",
2121
"Option Propagation" => "propagation.md",
2222
"Logging and Graphing" => "logging.md",
2323
"Checkpointing" => "checkpointing.md",
2424
"Scheduler Visualization" => "scheduler-visualization.md",
2525
"Benchmarking" => "benchmarking.md",
26+
"Dynamic Scheduler Control" => "dynamic.md",
2627
"Scheduler Internals" => "scheduler-internals.md",
2728
"Dagger API" => [
2829
"Types" => "api-dagger/types.md",

docs/src/task-queues.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Task Queues
2+
3+
By default, `@spawn`/`spawn` submit tasks immediately and directly into
4+
Dagger's scheduler without modifications. However, sometimes you want to be
5+
able to tweak this behavior for a region of code; for example, when working
6+
with GPUs or other operations which operate in-place, you might want to emulate
7+
CUDA's stream semantics by ensuring that tasks execute sequentially (to avoid
8+
one kernel reading from an array while another kernel is actively writing to
9+
it). Or, you might want to ensure that a set of Dagger tasks are submitted into
10+
the scheduler all at once for benchmarking purposes or to emulate the behavior
11+
of `delayed`. This and more is possible through a mechanism called "task
12+
queues".
13+
14+
A task queue in Dagger is an object that can be configured to accept unlaunched
15+
tasks from `@spawn`/`spawn` and either modify them or delay their launching
16+
arbitrarily. By default, Dagger tasks are enqueued through the
17+
`EagerTaskQueue`, which submits tasks directly into the scheduler before
18+
`@spawn`/`spawn` returns. However, Dagger also has an `InOrderTaskQueue`, which
19+
ensures that tasks enqueued through it execute sequentially with respect to
20+
each other. This queue can be allocated with `Dagger.spawn_sequential`:
21+
22+
```julia
23+
A = rand(16)
24+
B = zeros(16)
25+
C = zeros(16)
26+
function vcopy!(B, A)
27+
B .= A .+ 1.0
28+
return
29+
end
30+
function vadd!(C, A, B)
31+
C .+= A .+ B
32+
return
33+
end
34+
wait(Dagger.spawn_sequential() do
35+
Dagger.@spawn vcopy!(B, A)
36+
Dagger.@spawn vadd!(C, A, B)
37+
end)
38+
```
39+
40+
In the above example, `vadd!` is guaranteed to wait until `vcopy!` is
41+
completed, even though `vadd!` isn't taking the result of `vcopy!` as an
42+
argument (which is how tasks are normally ordered).
43+
44+
What if we wanted to launch multiple `vcopy!` calls within a `spawn_sequential`
45+
region and allow them to execute in parallel, but still ensure that the `vadd!`
46+
happens after they all finish? In this case, we want to switch to another kind
47+
of task queue: the `LazyTaskQueue`. This task queue batches up task submissions
48+
into groups, so that all tasks enqueued with it are placed in the scheduler all
49+
at once. But what would happen if we used this task queue (via `spawn_bulk`)
50+
within a region using `spawn_sequential`:
51+
52+
```julia
53+
A = rand(16)
54+
B1 = zeros(16)
55+
B2 = zeros(16)
56+
C = zeros(16)
57+
wait(Dagger.spawn_sequential() do
58+
Dagger.spawn_bulk() do
59+
Dagger.@spawn vcopy!(B1, A)
60+
Dagger.@spawn vcopy!(B2, A)
61+
end
62+
Dagger.@spawn vadd!(C, B1, B2)
63+
end)
64+
```
65+
66+
Conveniently, Dagger's task queues can be nested to get the expected behavior;
67+
the above example will submit the two `vcopy!` tasks as a group (and they can
68+
execute concurrently), while still ensuring that those two tasks finish before
69+
the `vadd!` task executes.
70+
71+
!!! warn
72+
Task queues do not propagate to nested tasks; if a Dagger task launches
73+
another task internally, the child task doesn't inherit the task queue that
74+
the parent task was enqueued in.

src/Dagger.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ using MacroTools
1919
using TimespanLogging
2020

2121
include("lib/util.jl")
22+
include("utils/dagdebug.jl")
2223

2324
# Distributed data
2425
include("options.jl")
2526
include("processor.jl")
2627
include("scopes.jl")
28+
include("eager_thunk.jl")
29+
include("queue.jl")
2730
include("thunk.jl")
28-
include("utils/dagdebug.jl")
31+
include("submission.jl")
2932
include("chunks.jl")
3033

3134
# Task scheduling

src/chunks.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ shouldpersist(p::Chunk) = t.persist
6262
processor(c::Chunk) = c.processor
6363
affinity(c::Chunk) = affinity(c.handle)
6464

65+
is_task_or_chunk(c::Chunk) = true
66+
6567
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
6668
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
6769

@@ -272,6 +274,7 @@ function unwrap_weak_checked(c::WeakChunk)
272274
@assert c !== nothing
273275
return c
274276
end
277+
is_task_or_chunk(c::WeakChunk) = true
275278

276279
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
277280
Base.@deprecate_binding Part Chunk

src/compute.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function dependents(node::Thunk)
8585
if !haskey(deps, next)
8686
deps[next] = Set{Thunk}()
8787
end
88-
for (_, inp) in next.inputs
88+
for inp in next.syncdeps
8989
if istask(inp) || (inp isa Chunk)
9090
s = get!(()->Set{Thunk}(), deps, inp)
9191
push!(s, next)
@@ -96,7 +96,7 @@ function dependents(node::Thunk)
9696
end
9797
push!(visited, next)
9898
end
99-
deps
99+
return deps
100100
end
101101

102102
"""
@@ -126,7 +126,7 @@ function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}})
126126
has_all || continue
127127
noff[next] = off
128128
end
129-
noff
129+
return noff
130130
end
131131

132132
"""
@@ -153,7 +153,7 @@ function order(node::Thunk, ndeps)
153153
haskey(output, next) && continue
154154
s += 1
155155
output[next] = s
156-
parents = filter(istask, map(last, next.inputs))
156+
parents = collect(filter(istask, next.syncdeps))
157157
if !isempty(parents)
158158
# If parents is empty, sort! should be a no-op, but raises an ambiguity error
159159
# when InlineStrings.jl is loaded (at least, version 1.1.0), because InlineStrings

src/eager_thunk.jl

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"A future holding the result of a `Thunk`."
2+
struct ThunkFuture
3+
future::Future
4+
end
5+
ThunkFuture(x::Integer) = ThunkFuture(Future(x))
6+
ThunkFuture() = ThunkFuture(Future())
7+
Base.isready(t::ThunkFuture) = isready(t.future)
8+
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
9+
wait(t.future)
10+
end
11+
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
12+
error, value = Dagger.Sch.thunk_yield() do
13+
fetch(t.future)
14+
end
15+
if error
16+
throw(value)
17+
end
18+
if raw
19+
return value
20+
else
21+
return move(proc, value)
22+
end
23+
end
24+
Base.put!(t::ThunkFuture, x; error=false) = put!(t.future, (error, x))
25+
26+
struct Options
27+
options::NamedTuple
28+
end
29+
Options(;options...) = Options((;options...))
30+
Options(options...) = Options((;options...))
31+
32+
"""
33+
EagerThunk
34+
35+
Returned from `spawn`/`@spawn` calls. Represents a task that is in the
36+
scheduler, potentially ready to execute, executing, or finished executing. May
37+
be `fetch`'d or `wait`'d on at any time.
38+
"""
39+
mutable struct EagerThunk
40+
uid::UInt
41+
future::ThunkFuture
42+
finalizer_ref::DRef
43+
thunk_ref::DRef
44+
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
45+
end
46+
47+
Base.isready(t::EagerThunk) = isready(t.future)
48+
function Base.wait(t::EagerThunk)
49+
if !isdefined(t, :thunk_ref)
50+
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `EagerThunk`"))
51+
end
52+
wait(t.future)
53+
end
54+
function Base.fetch(t::EagerThunk; raw=false)
55+
if !isdefined(t, :thunk_ref)
56+
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
57+
end
58+
return fetch(t.future; raw)
59+
end
60+
function Base.show(io::IO, t::EagerThunk)
61+
status = if isdefined(t, :thunk_ref)
62+
isready(t) ? "finished" : "running"
63+
else
64+
"not launched"
65+
end
66+
print(io, "EagerThunk ($status)")
67+
end
68+
istask(t::EagerThunk) = true
69+
70+
"When finalized, cleans-up the associated `EagerThunk`."
71+
mutable struct EagerThunkFinalizer
72+
uid::UInt
73+
function EagerThunkFinalizer(uid)
74+
x = new(uid)
75+
finalizer(Sch.eager_cleanup, x)
76+
x
77+
end
78+
end
79+
80+
const EAGER_ID_COUNTER = Threads.Atomic{UInt64}(1)
81+
function eager_next_id()
82+
if myid() == 1
83+
Threads.atomic_add!(EAGER_ID_COUNTER, one(UInt64))
84+
else
85+
remotecall_fetch(eager_next_id, 1)
86+
end
87+
end

src/queue.jl

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
mutable struct EagerTaskSpec
2+
f
3+
args::Vector{Pair{Union{Symbol,Nothing},Any}}
4+
options::NamedTuple
5+
end
6+
7+
abstract type AbstractTaskQueue end
8+
9+
function enqueue! end
10+
11+
struct EagerTaskQueue <: AbstractTaskQueue end
12+
enqueue!(::EagerTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk}) =
13+
eager_launch!(spec)
14+
enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
15+
eager_launch!(specs)
16+
17+
enqueue!(spec::Pair{EagerTaskSpec,EagerThunk}) =
18+
enqueue!(get_options(:task_queue, EagerTaskQueue()), spec)
19+
enqueue!(specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
20+
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
21+
22+
struct LazyTaskQueue <: AbstractTaskQueue
23+
tasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
24+
LazyTaskQueue() = new(Pair{EagerTaskSpec,EagerThunk}[])
25+
end
26+
function enqueue!(queue::LazyTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
27+
push!(queue.tasks, spec)
28+
end
29+
function enqueue!(queue::LazyTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
30+
append!(queue.tasks, specs)
31+
end
32+
function spawn_bulk(f::Base.Callable)
33+
queue = LazyTaskQueue()
34+
result = with_options(f; task_queue=queue)
35+
if length(queue.tasks) > 0
36+
enqueue!(queue.tasks)
37+
end
38+
return result
39+
end
40+
41+
struct InOrderTaskQueue <: AbstractTaskQueue
42+
upper_queue::AbstractTaskQueue
43+
prev_tasks::Set{EagerThunk}
44+
InOrderTaskQueue(upper_queue) = new(upper_queue,
45+
Set{EagerThunk}())
46+
end
47+
function _add_prev_deps!(queue::InOrderTaskQueue, spec::EagerTaskSpec)
48+
# Add previously-enqueued task(s) to this task's syncdeps
49+
opts = spec.options
50+
syncdeps = get(Set{Any}, opts, :syncdeps)
51+
for task in queue.prev_tasks
52+
push!(syncdeps, task)
53+
end
54+
spec.options = merge(opts, (;syncdeps,))
55+
end
56+
function enqueue!(queue::InOrderTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
57+
if length(queue.prev_tasks) > 0
58+
_add_prev_deps!(queue, first(spec))
59+
empty!(queue.prev_tasks)
60+
end
61+
push!(queue.prev_tasks, last(spec))
62+
enqueue!(queue.upper_queue, spec)
63+
end
64+
function enqueue!(queue::InOrderTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
65+
if length(queue.prev_tasks) > 0
66+
for (spec, task) in specs
67+
_add_prev_deps!(queue, spec)
68+
end
69+
empty!(queue.prev_tasks)
70+
end
71+
for (spec, task) in specs
72+
push!(queue.prev_tasks, task)
73+
end
74+
enqueue!(queue.upper_queue, specs)
75+
end
76+
function spawn_sequential(f::Base.Callable)
77+
queue = InOrderTaskQueue(get_options(:task_queue, EagerTaskQueue()))
78+
return with_options(f; task_queue=queue)
79+
end

src/sch/Sch.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import Base: @invokelatest
1111
import ..Dagger
1212
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
1313
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
14+
import ..Dagger: @dagdebug
1415
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
1516

1617
import ..Dagger
@@ -130,7 +131,7 @@ function start_state(deps::Dict, node_order, chan)
130131

131132
for k in sort(collect(keys(deps)), by=node_order)
132133
if istask(k)
133-
waiting = Set{Thunk}(Iterators.filter(istask, map(last, inputs(k))))
134+
waiting = Set{Thunk}(Iterators.filter(istask, k.syncdeps))
134135
if isempty(waiting)
135136
push!(state.ready, k)
136137
else
@@ -883,7 +884,7 @@ function finish_task!(ctx, state, node, thunk_failed)
883884
schedule_dependents!(state, node, thunk_failed)
884885
fill_registered_futures!(state, node, thunk_failed)
885886

886-
to_evict = cleanup_inputs!(state, node)
887+
to_evict = cleanup_syncdeps!(state, node)
887888
if node.f isa Chunk
888889
# FIXME: Check the graph for matching chunks
889890
push!(to_evict, node.f)

0 commit comments

Comments
 (0)