Skip to content

Commit 8424403

Browse files
AkhilAkkapellijpsamaroo
authored andcommitted
Add task affinity support with compute_scope and result_scope in Dagger.jl's @Spawn macro
- Enhanced the Thunk struct to include compute_scope and result_scope for better task execution control. - Updated the Thunk constructor to accept new scope parameters. - Modified the spawn function to handle the new scope parameters appropriately. - Introduced a new test suite for task affinity, covering various scenarios with scope interactions. - Added comprehensive documentation for task affinity, detailing the usage of scope, compute_scope, and result_scope. - Implemented tests to validate behavior when using chunks as inputs in tasks, ensuring correct scope handling. - Updated documentation for the `@spawn` macro to clarify the usage of `scope`, `compute_scope`, and `result_scope`, including examples with the new syntax. - Improved error messages in the scheduling logic to provide clearer feedback when scopes are incompatible. - Refactored test cases for task affinity to ensure they align with the new scope handling and provide better coverage for edge cases. - Removed deprecated comments and cleaned up the code for better readability.
1 parent f106c74 commit 8424403

File tree

12 files changed

+688
-80
lines changed

12 files changed

+688
-80
lines changed

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ makedocs(;
2020
"Parallel Nested Loops" => "use-cases/parallel-nested-loops.md",
2121
],
2222
"Task Spawning" => "task-spawning.md",
23+
"Task Affinity" => "task-affinity.md",
2324
"Data Management" => "data-management.md",
2425
"Distributed Arrays" => "darray.md",
2526
"Streaming Tasks" => "streaming.md",

docs/src/task-affinity.md

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# Task Affinity
2+
3+
Dagger's allows for precise control over task placement and result availability using scopes. Tasks are assigned based on the combination of multiple scopes: `scope`/`compute_scope`, and `result_scope` (which can all be specified with `@spawn`), and additionally the scopes of any arguments to the task (in the form of a scope attached to a `Chunk` argument). Let's take a look at how to configure these scopes, and how they work together to direct task placement.
4+
5+
For more information on how scopes work, see [Scopes](@ref).
6+
7+
---
8+
9+
## Task Scopes
10+
11+
### Scope
12+
13+
`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not specified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope.
14+
15+
**Example:**
16+
```julia
17+
g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y)
18+
```
19+
Task `g` executes only on worker 3. Its result can be accessed by any worker.
20+
21+
---
22+
23+
### Compute Scope
24+
25+
Like `scope`, `compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, then they default to `DefaultScope()`.
26+
27+
**Example:**
28+
```julia
29+
g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
30+
g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
31+
```
32+
Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker.
33+
34+
---
35+
36+
### Result Scope
37+
38+
The result_scope limits the processors from which a task's result can be accessed. This can be useful for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any processor (including those not default enabled for task execution, such as GPUs).
39+
40+
**Example:**
41+
```julia
42+
g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1, 3, 4]) f(x,y)
43+
```
44+
The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3.
45+
46+
---
47+
48+
## Interaction of `compute_scope` and `result_scope`
49+
50+
When `scope`/`compute_scope` and `result_scope` are specified, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty, then the scheduler throws a `Dagger.Sch.SchedulerException` error.
51+
52+
**Example:**
53+
```julia
54+
g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y)
55+
```
56+
The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), but accessng its result is restricted to thread 2 of worker 2 and thread 2 of worker 4.
57+
58+
---
59+
60+
## Function as a Chunk
61+
62+
This section explains how `scope`/`compute_scope` and `result_scope` affect tasks when a `Chunk` is used to specify the function to be executed by `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task). This may seem strange (to use a `Chunk` to specify the function to be executed), but it can be useful with working with callable structs, such as closures or Flux.jl models.
63+
64+
Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, and `chunk_scope` is its defined affinity.
65+
66+
When `Dagger.tochunk(...)` is used to pass a `Chunk` as the function to be executed by `@spawn`:
67+
- The result is accessible only on processors in `chunk_scope`.
68+
- Dagger validates that there is an intersection between `chunk_scope`, the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`), and the `result_scope`. If no intersection exists, the scheduler throws an exception.
69+
70+
!!! info While `chunk_proc` is currently required when constructing a chunk, it is only used to pick the most optimal processor for accessing the chunk; it does not affect which set of processors the task may execute on.
71+
72+
**Usage:**
73+
```julia
74+
chunk_scope = Dagger.scope(worker=3)
75+
chunk_proc = Dagger.OSProc(3) # not important, just needs to be a valid processor
76+
g(x, y) = x * 2 + y * 3
77+
g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope)
78+
h1 = Dagger.@spawn scope=Dagger.scope(worker=3) g_chunk(10, 11)
79+
h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(20, 21)
80+
h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(30, 31)
81+
h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) g_chunk(40, 41)
82+
h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=3) result_scope=Dagger.scope(worker=3,threads=[2,3]) g_chunk(50, 51)
83+
```
84+
In all these cases (`h1` through `h5`), the tasks get executed on any processor within `chunk_scope` and its result is accessible only within `chunk_scope`.
85+
86+
---
87+
88+
## Chunk arguments
89+
90+
This section details behavior when some or all of a task's arguments are `Chunk`s.
91+
92+
Assume `g(x, y) = x * 2 + y * 3`, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)`, where `arg_scope` is the argument's defined scope. Assume `arg_scope = Dagger.scope(worker=2)`.
93+
94+
### Scope
95+
If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`.
96+
97+
```julia
98+
h = Dagger.@spawn scope=Dagger.scope(worker=2) g(arg, 11)
99+
```
100+
Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any processor.
101+
102+
---
103+
104+
### Compute scope and Chunk argument scopes interaction
105+
If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`.
106+
107+
```julia
108+
h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 11)
109+
h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 21)
110+
```
111+
Tasks `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is accessible from any processor.
112+
113+
---
114+
115+
### Result scope and Chunk argument scopes interaction
116+
If only `result_scope` is specified, computation happens on any processor within the intersection of `arg_scope` and `result_scope`, and the result is only accessible within `result_scope`.
117+
118+
```julia
119+
h = Dagger.@spawn result_scope=Dagger.scope(worker=2) g(arg, 11)
120+
```
121+
Task `h` executes on any processor within the intersection of `arg_scope` and `result_scope`. The result is accessible from only within `result_scope`.
122+
123+
---
124+
125+
### Compute, result, and chunk argument scopes interaction
126+
When `scope`/`compute_scope`, `result_scope`, and `Chunk` argument scopes are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception.
127+
128+
```julia
129+
h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31)
130+
```
131+
Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg_scope`, `compute_scope`, and `result_scope`), and its result access is restricted to thread 2 of worker 2 or thread 2 of worker 4.

src/sch/Sch.jl

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import Random: randperm
1414
import Base: @invokelatest
1515

1616
import ..Dagger
17-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
17+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject
1818
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
1919
import ..Dagger: @dagdebug, @safe_lock_spin1
2020
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
@@ -726,16 +726,25 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
726726
sig = signature(state, task)
727727

728728
# Calculate scope
729-
scope = if task.f isa Chunk
730-
task.f.scope
731-
else
732-
if task.options.proclist !== nothing
733-
# proclist overrides scope selection
734-
AnyScope()
735-
else
736-
DefaultScope()
729+
scope = constrain(task.compute_scope, task.result_scope)
730+
if scope isa InvalidScope
731+
ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)")
732+
state.cache[task] = ex
733+
state.errored[task] = true
734+
set_failed!(state, task)
735+
@goto pop_task
736+
end
737+
if task.f isa Chunk
738+
scope = constrain(scope, task.f.scope)
739+
if scope isa InvalidScope
740+
ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)")
741+
state.cache[task] = ex
742+
state.errored[task] = true
743+
set_failed!(state, task)
744+
@goto pop_task
737745
end
738746
end
747+
739748
for (_,input) in task.inputs
740749
input = unwrap_weak_checked(input)
741750
chunk = if istask(input)
@@ -747,8 +756,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
747756
end
748757
chunk isa Chunk || continue
749758
scope = constrain(scope, chunk.scope)
750-
if scope isa Dagger.InvalidScope
751-
ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)")
759+
if scope isa InvalidScope
760+
ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)")
752761
state.cache[task] = ex
753762
state.errored[task] = true
754763
set_failed!(state, task)
@@ -1086,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10861095
thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options,
10871096
propagated, ids, positions,
10881097
(log_sink=ctx.log_sink, profile=ctx.profile),
1089-
sch_handle, state.uid])
1098+
sch_handle, state.uid, thunk.result_scope])
10901099
end
10911100
# N.B. We don't batch these because we might get a deserialization
10921101
# error due to something not being defined on the worker, and then we don't
@@ -1305,7 +1314,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13051314
task = task_spec[]
13061315
scope = task[5]
13071316
if !isa(constrain(scope, Dagger.ExactScope(to_proc)),
1308-
Dagger.InvalidScope) &&
1317+
InvalidScope) &&
13091318
typemax(UInt32) - proc_occupancy_cached >= occupancy
13101319
# Compatible, steal this task
13111320
return dequeue_pair!(queue)
@@ -1488,7 +1497,7 @@ function do_task(to_proc, task_desc)
14881497
scope, Tf, data,
14891498
send_result, persist, cache, meta,
14901499
options, propagated, ids, positions,
1491-
ctx_vars, sch_handle, sch_uid = task_desc
1500+
ctx_vars, sch_handle, sch_uid, result_scope = task_desc
14921501
ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile)
14931502

14941503
from_proc = OSProc()
@@ -1696,7 +1705,7 @@ function do_task(to_proc, task_desc)
16961705

16971706
# Construct result
16981707
# TODO: We should cache this locally
1699-
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache,
1708+
send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache,
17001709
tag=options.storage_root_tag,
17011710
leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()),
17021711
retain=options.storage_retain)

src/sch/util.jl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ function get_propagated_options(thunk)
4242
nt = NamedTuple()
4343
for key in thunk.propagates
4444
value = if key == :scope
45-
isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope()
46-
elseif key == :processor
47-
isa(thunk.f, Chunk) ? thunk.f.processor : OSProc()
45+
thunk.compute_scope
4846
elseif key in fieldnames(Thunk)
4947
getproperty(thunk, key)
5048
elseif key in fieldnames(ThunkOptions)
@@ -340,7 +338,7 @@ function can_use_proc(state, task, gproc, proc, opts, scope)
340338
scope = constrain(scope, Dagger.ExactScope(proc))
341339
elseif opts.proclist isa Vector
342340
if !(typeof(proc) in opts.proclist)
343-
@dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist)"
341+
@dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist) ($(opts.proclist))"
344342
return false, scope
345343
end
346344
scope = constrain(scope,

src/scopes.jl

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,17 @@ constrain(x::ProcessScope, y::ExactScope) =
240240
constrain(x::NodeScope, y::ExactScope) =
241241
x == y.parent.parent ? y : InvalidScope(x, y)
242242

243+
244+
function constrain(scope1, scope2, scopes...)
245+
scope1 = constrain(scope1, scope2)
246+
scope1 isa InvalidScope && return scope1
247+
for s in scopes
248+
scope1 = constrain(scope1, s)
249+
scope1 isa InvalidScope && return scope1
250+
end
251+
return scope1
252+
end
253+
243254
### Scopes helper
244255

245256
"""
@@ -412,3 +423,26 @@ to_scope(::Val{key}, sc::NamedTuple) where key =
412423

413424
# Base case for all Dagger-owned keys
414425
scope_key_precedence(::Val) = 0
426+
427+
### Scope comparison helpers
428+
429+
function Base.issetequal(scopes::AbstractScope...)
430+
scope1 = scopes[1]
431+
scope1_procs = Dagger.compatible_processors(scope1)
432+
for scope2 in scopes[2:end]
433+
scope2_procs = Dagger.compatible_processors(scope2)
434+
if !issetequal(scope1_procs, scope2_procs)
435+
return false
436+
end
437+
end
438+
return true
439+
end
440+
441+
function Base.issubset(scope1::AbstractScope, scope2::AbstractScope)
442+
scope1_procs = compatible_processors(scope1)
443+
scope2_procs = compatible_processors(scope2)
444+
for proc in scope1_procs
445+
proc in scope2_procs || return false
446+
end
447+
return true
448+
end

src/thunk.jl

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ mutable struct Thunk
7373
eager_ref::Union{DRef,Nothing}
7474
options::Any # stores scheduler-specific options
7575
propagates::Tuple # which options we'll propagate
76+
compute_scope::AbstractScope
77+
result_scope::AbstractScope
7678
function Thunk(f, xs...;
7779
syncdeps=nothing,
7880
id::Int=next_id(),
@@ -84,16 +86,14 @@ mutable struct Thunk
8486
affinity=nothing,
8587
eager_ref=nothing,
8688
processor=nothing,
87-
scope=nothing,
89+
scope=DefaultScope(),
90+
compute_scope=scope,
91+
result_scope=AnyScope(),
8892
options=nothing,
8993
propagates=(),
9094
kwargs...
9195
)
92-
if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
93-
f = tochunk(f,
94-
something(processor, OSProc()),
95-
something(scope, DefaultScope()))
96-
end
96+
9797
xs = Base.mapany(identity, xs)
9898
syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs)))
9999
if syncdeps !== nothing
@@ -105,11 +105,11 @@ mutable struct Thunk
105105
if options !== nothing
106106
@assert isempty(kwargs)
107107
new(f, xs, syncdeps_set, id, get_result, meta, persist, cache,
108-
cache_ref, affinity, eager_ref, options, propagates)
108+
cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope)
109109
else
110110
new(f, xs, syncdeps_set, id, get_result, meta, persist, cache,
111111
cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...),
112-
propagates)
112+
propagates, compute_scope, result_scope)
113113
end
114114
end
115115
end
@@ -476,15 +476,6 @@ function spawn(f, args...; kwargs...)
476476
args = args[2:end]
477477
end
478478

479-
# Wrap f in a Chunk if necessary
480-
processor = haskey(options, :processor) ? options.processor : nothing
481-
scope = haskey(options, :scope) ? options.scope : nothing
482-
if !isnothing(processor) || !isnothing(scope)
483-
f = tochunk(f,
484-
something(processor, get_options(:processor, OSProc())),
485-
something(scope, get_options(:scope, DefaultScope())))
486-
end
487-
488479
# Process the args and kwargs into Pair form
489480
args_kwargs = args_kwargs_to_pairs(args, kwargs)
490481

test/options.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ end
2828
for (option, default, value, value2) in [
2929
# Special handling
3030
(:scope, AnyScope(), ProcessScope(first_wid), ProcessScope(last_wid)),
31-
(:processor, OSProc(), Dagger.ThreadProc(first_wid, 1), Dagger.ThreadProc(last_wid, 1)),
3231
# ThunkOptions field
3332
(:single, 0, first_wid, last_wid),
3433
# Thunk field
@@ -80,7 +79,7 @@ end
8079
@test fetch(Dagger.@spawn sf(obj)) == 0
8180
@test fetch(Dagger.@spawn sf(obj)) == 0
8281
end
83-
Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), processor=OSProc(1), meta=true) do
82+
Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), meta=true) do
8483
@test fetch(Dagger.@spawn sf(obj)) == 43
8584
@test fetch(Dagger.@spawn sf(obj)) == 43
8685
end

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ tests = [
1818
("Options", "options.jl"),
1919
("Mutation", "mutation.jl"),
2020
("Task Queues", "task-queues.jl"),
21+
("Task Affinity", "task-affinity.jl"),
2122
("Datadeps", "datadeps.jl"),
2223
("Streaming", "streaming.jl"),
2324
("Domain Utilities", "domain.jl"),

test/scheduler.jl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import Dagger: Chunk
12
import Dagger.Sch: SchedulerOptions, ThunkOptions, SchedulerHaltedException, ComputeState, ThunkID, sch_handle
23

34
@everywhere begin
@@ -162,10 +163,8 @@ end
162163
@test Dagger.default_enabled(Dagger.ThreadProc(1,1)) == true
163164
@test Dagger.default_enabled(FakeProc()) == false
164165

165-
opts = Dagger.Sch.ThunkOptions(;proclist=[Dagger.ThreadProc])
166-
as = [delayed(identity; options=opts)(i) for i in 1:5]
167-
opts = Dagger.Sch.ThunkOptions(;proclist=[FakeProc])
168-
b = delayed(fakesum; options=opts)(as...)
166+
as = [delayed(identity; proclist=[Dagger.ThreadProc])(i) for i in 1:5]
167+
b = delayed(fakesum; proclist=[FakeProc], compute_scope=Dagger.AnyScope())(as...)
169168

170169
@test collect(Context(), b) == FakeVal(57)
171170
end

0 commit comments

Comments
 (0)