Skip to content

Commit a2c284b

Browse files
committed
EagerTaskSpec->DTaskSpec, EagerTaskQueue->DefaultTaskQueue
1 parent da165d7 commit a2c284b

File tree

6 files changed

+34
-34
lines changed

6 files changed

+34
-34
lines changed

docs/src/task-queues.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ queues".
1414
A task queue in Dagger is an object that can be configured to accept unlaunched
1515
tasks from `@spawn`/`spawn` and either modify them or delay their launching
1616
arbitrarily. By default, Dagger tasks are enqueued through the
17-
`EagerTaskQueue`, which submits tasks directly into the scheduler before
17+
`DefaultTaskQueue`, which submits tasks directly into the scheduler before
1818
`@spawn`/`spawn` returns. However, Dagger also has an `InOrderTaskQueue`, which
1919
ensures that tasks enqueued through it execute sequentially with respect to
2020
each other. This queue can be allocated with `Dagger.spawn_sequential`:

src/datadeps.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
2626
# The fields following only apply when static==true
2727
static::Bool
2828
# The set of tasks that have already been seen
29-
seen_tasks::Union{Vector{Pair{EagerTaskSpec,DTask}},Nothing}
29+
seen_tasks::Union{Vector{Pair{DTaskSpec,DTask}},Nothing}
3030
# The data-dependency graph of all tasks
3131
g::Union{SimpleDiGraph{Int},Nothing}
3232
# The mapping from task to graph ID
@@ -38,7 +38,7 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
3838
traversal::Symbol=:inorder)
3939
deps = IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, DTask}}}()
4040
if static
41-
seen_tasks = Pair{EagerTaskSpec,DTask}[]
41+
seen_tasks = Pair{DTaskSpec,DTask}[]
4242
g = SimpleDiGraph()
4343
task_to_id = Dict{DTask,Int}()
4444
else
@@ -51,7 +51,7 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
5151
end
5252
end
5353

54-
function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,DTask})
54+
function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{DTaskSpec,DTask})
5555
# If static, record this task and its edges in the graph
5656
if queue.static
5757
g = queue.g
@@ -139,15 +139,15 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,DTask}
139139
spec.options = merge(opts, (;syncdeps, scope))
140140
end
141141
end
142-
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{EagerTaskSpec,DTask})
142+
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{DTaskSpec,DTask})
143143
_enqueue!(queue, spec)
144144
if queue.static
145145
push!(queue.seen_tasks, spec)
146146
else
147147
enqueue!(queue.upper_queue, spec)
148148
end
149149
end
150-
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{EagerTaskSpec,DTask}})
150+
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}})
151151
for spec in specs
152152
_enqueue!(queue, spec)
153153
end
@@ -467,7 +467,7 @@ is experimental and subject to change.
467467
function spawn_datadeps(f::Base.Callable; static::Bool=true,
468468
traversal::Symbol=:inorder)
469469
wait_all(; check_errors=true) do
470-
queue = DataDepsTaskQueue(get_options(:task_queue, EagerTaskQueue());
470+
queue = DataDepsTaskQueue(get_options(:task_queue, DefaultTaskQueue());
471471
static, traversal)
472472
result = with_options(f; task_queue=queue)
473473
if queue.static

src/queue.jl

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mutable struct EagerTaskSpec
1+
mutable struct DTaskSpec
22
f
33
args::Vector{Pair{Union{Symbol,Nothing},Any}}
44
options::NamedTuple
@@ -8,25 +8,25 @@ abstract type AbstractTaskQueue end
88

99
function enqueue! end
1010

11-
struct EagerTaskQueue <: AbstractTaskQueue end
12-
enqueue!(::EagerTaskQueue, spec::Pair{EagerTaskSpec,DTask}) =
11+
struct DefaultTaskQueue <: AbstractTaskQueue end
12+
enqueue!(::DefaultTaskQueue, spec::Pair{DTaskSpec,DTask}) =
1313
eager_launch!(spec)
14-
enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,DTask}}) =
14+
enqueue!(::DefaultTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}}) =
1515
eager_launch!(specs)
1616

17-
enqueue!(spec::Pair{EagerTaskSpec,DTask}) =
18-
enqueue!(get_options(:task_queue, EagerTaskQueue()), spec)
19-
enqueue!(specs::Vector{Pair{EagerTaskSpec,DTask}}) =
20-
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
17+
enqueue!(spec::Pair{DTaskSpec,DTask}) =
18+
enqueue!(get_options(:task_queue, DefaultTaskQueue()), spec)
19+
enqueue!(specs::Vector{Pair{DTaskSpec,DTask}}) =
20+
enqueue!(get_options(:task_queue, DefaultTaskQueue()), specs)
2121

2222
struct LazyTaskQueue <: AbstractTaskQueue
23-
tasks::Vector{Pair{EagerTaskSpec,DTask}}
24-
LazyTaskQueue() = new(Pair{EagerTaskSpec,DTask}[])
23+
tasks::Vector{Pair{DTaskSpec,DTask}}
24+
LazyTaskQueue() = new(Pair{DTaskSpec,DTask}[])
2525
end
26-
function enqueue!(queue::LazyTaskQueue, spec::Pair{EagerTaskSpec,DTask})
26+
function enqueue!(queue::LazyTaskQueue, spec::Pair{DTaskSpec,DTask})
2727
push!(queue.tasks, spec)
2828
end
29-
function enqueue!(queue::LazyTaskQueue, specs::Vector{Pair{EagerTaskSpec,DTask}})
29+
function enqueue!(queue::LazyTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}})
3030
append!(queue.tasks, specs)
3131
end
3232
function spawn_bulk(f::Base.Callable)
@@ -44,7 +44,7 @@ struct InOrderTaskQueue <: AbstractTaskQueue
4444
InOrderTaskQueue(upper_queue) = new(upper_queue,
4545
Set{DTask}())
4646
end
47-
function _add_prev_deps!(queue::InOrderTaskQueue, spec::EagerTaskSpec)
47+
function _add_prev_deps!(queue::InOrderTaskQueue, spec::DTaskSpec)
4848
# Add previously-enqueued task(s) to this task's syncdeps
4949
opts = spec.options
5050
syncdeps = get(Set{Any}, opts, :syncdeps)
@@ -53,15 +53,15 @@ function _add_prev_deps!(queue::InOrderTaskQueue, spec::EagerTaskSpec)
5353
end
5454
spec.options = merge(opts, (;syncdeps,))
5555
end
56-
function enqueue!(queue::InOrderTaskQueue, spec::Pair{EagerTaskSpec,DTask})
56+
function enqueue!(queue::InOrderTaskQueue, spec::Pair{DTaskSpec,DTask})
5757
if length(queue.prev_tasks) > 0
5858
_add_prev_deps!(queue, first(spec))
5959
empty!(queue.prev_tasks)
6060
end
6161
push!(queue.prev_tasks, last(spec))
6262
enqueue!(queue.upper_queue, spec)
6363
end
64-
function enqueue!(queue::InOrderTaskQueue, specs::Vector{Pair{EagerTaskSpec,DTask}})
64+
function enqueue!(queue::InOrderTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}})
6565
if length(queue.prev_tasks) > 0
6666
for (spec, task) in specs
6767
_add_prev_deps!(queue, spec)
@@ -74,26 +74,26 @@ function enqueue!(queue::InOrderTaskQueue, specs::Vector{Pair{EagerTaskSpec,DTas
7474
enqueue!(queue.upper_queue, specs)
7575
end
7676
function spawn_sequential(f::Base.Callable)
77-
queue = InOrderTaskQueue(get_options(:task_queue, EagerTaskQueue()))
77+
queue = InOrderTaskQueue(get_options(:task_queue, DefaultTaskQueue()))
7878
return with_options(f; task_queue=queue)
7979
end
8080

8181
struct WaitAllQueue <: AbstractTaskQueue
8282
upper_queue::AbstractTaskQueue
8383
tasks::Vector{DTask}
8484
end
85-
function enqueue!(queue::WaitAllQueue, spec::Pair{EagerTaskSpec,DTask})
85+
function enqueue!(queue::WaitAllQueue, spec::Pair{DTaskSpec,DTask})
8686
push!(queue.tasks, spec[2])
8787
enqueue!(queue.upper_queue, spec)
8888
end
89-
function enqueue!(queue::WaitAllQueue, specs::Vector{Pair{EagerTaskSpec,DTask}})
89+
function enqueue!(queue::WaitAllQueue, specs::Vector{Pair{DTaskSpec,DTask}})
9090
for (_, task) in specs
9191
push!(queue.tasks, task)
9292
end
9393
enqueue!(queue.upper_queue, specs)
9494
end
9595
function wait_all(f; check_errors::Bool=false)
96-
queue = WaitAllQueue(get_options(:task_queue, EagerTaskQueue()), DTask[])
96+
queue = WaitAllQueue(get_options(:task_queue, DefaultTaskQueue()), DTask[])
9797
result = with_options(f; task_queue=queue)
9898
for task in queue.tasks
9999
if check_errors

src/submission.jl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,13 @@ function eager_process_elem_submission_to_local(id_map, x)
193193
end
194194
end
195195
# TODO: This can probably operate in-place
196-
function eager_process_args_submission_to_local(id_map, spec::Pair{EagerTaskSpec,DTask})
196+
function eager_process_args_submission_to_local(id_map, spec::Pair{DTaskSpec,DTask})
197197
return Base.mapany(first(spec).args) do pos_x
198198
pos, x = pos_x
199199
return pos => eager_process_elem_submission_to_local(id_map, x)
200200
end
201201
end
202-
function eager_process_args_submission_to_local(id_map, specs::Vector{Pair{EagerTaskSpec,DTask}})
202+
function eager_process_args_submission_to_local(id_map, specs::Vector{Pair{DTaskSpec,DTask}})
203203
return Base.mapany(specs) do spec
204204
eager_process_args_submission_to_local(id_map, spec)
205205
end
@@ -217,7 +217,7 @@ function eager_process_options_submission_to_local(id_map, options::NamedTuple)
217217
return options
218218
end
219219
end
220-
function eager_spawn(spec::EagerTaskSpec)
220+
function eager_spawn(spec::DTaskSpec)
221221
# Generate new DTask
222222
uid = eager_next_id()
223223
future = ThunkFuture()
@@ -226,7 +226,7 @@ function eager_spawn(spec::EagerTaskSpec)
226226
# Return unlaunched DTask
227227
return DTask(uid, future, finalizer_ref)
228228
end
229-
function eager_launch!((spec, task)::Pair{EagerTaskSpec,DTask})
229+
function eager_launch!((spec, task)::Pair{DTaskSpec,DTask})
230230
# Lookup DTask -> ThunkID
231231
local args, options
232232
lock(Sch.EAGER_ID_MAP) do id_map
@@ -240,7 +240,7 @@ function eager_launch!((spec, task)::Pair{EagerTaskSpec,DTask})
240240
spec.f, args, options)
241241
task.thunk_ref = thunk_id.ref
242242
end
243-
function eager_launch!(specs::Vector{Pair{EagerTaskSpec,DTask}})
243+
function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}})
244244
ntasks = length(specs)
245245

246246
uids = [task.uid for (_, task) in specs]

src/thunk.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,13 +430,13 @@ function spawn(f, args...; kwargs...)
430430
args_kwargs = args_kwargs_to_pairs(args, kwargs)
431431

432432
# Get task queue, and don't let it propagate
433-
task_queue = get_options(:task_queue, EagerTaskQueue())
433+
task_queue = get_options(:task_queue, DefaultTaskQueue())
434434
options = NamedTuple(filter(opt->opt[1] != :task_queue, Base.pairs(options)))
435435
propagates = filter(prop->prop != :task_queue, propagates)
436436
options = merge(options, (;propagates))
437437

438438
# Construct task spec and handle
439-
spec = EagerTaskSpec(f, args_kwargs, options)
439+
spec = DTaskSpec(f, args_kwargs, options)
440440
task = eager_spawn(spec)
441441

442442
# Enqueue the task into the task queue

test/task-queues.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ function task_queue_wait_update(w, r, op, x, y)
88
return op(x, y)
99
end
1010

11-
@testset "EagerTaskQueue" begin
11+
@testset "DefaultTaskQueue" begin
1212
r = Ref(0)
1313
R = Dagger.@mutable r
1414
d = begin

0 commit comments

Comments
 (0)