Skip to content

Commit 44543d6

Browse files
Add task affinity support with compute_scope and result_scope in Dagger.jl's @Spawn macro
- Enhanced the Thunk struct to include compute_scope and result_scope for better task execution control. - Updated the Thunk constructor to accept new scope parameters. - Modified the spawn function to handle the new scope parameters appropriately. - Introduced a new test suite for task affinity, covering various scenarios with scope interactions. - Added comprehensive documentation for task affinity, detailing the usage of scope, compute_scope, and result_scope. - Implemented tests to validate behavior when using chunks as inputs in tasks, ensuring correct scope handling.
1 parent c0eec21 commit 44543d6

File tree

6 files changed

+757
-30
lines changed

6 files changed

+757
-30
lines changed

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ makedocs(;
2020
"Parallel Nested Loops" => "use-cases/parallel-nested-loops.md",
2121
],
2222
"Task Spawning" => "task-spawning.md",
23+
"Task Affinity" => "task-affinity.md",
2324
"Data Management" => "data-management.md",
2425
"Distributed Arrays" => "darray.md",
2526
"Streaming Tasks" => "streaming.md",

docs/src/task-affinity.md

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Task Affinity
2+
3+
Dagger.jl's `@spawn` macro allows precise control over task execution and result access using `scope`, `compute_scope`, and `result_scope`.
4+
5+
---
6+
7+
## Key Terms
8+
9+
### Scope
10+
`scope` defines the general set of locations where a Dagger task can execute. If `compute_scope` and `result_scope` are not explicitly set, the task's `compute_scope` defaults to its `scope`, and its `result_scope` defaults to `AnyScope()`, meaning the result can be accessed by any processor. Execution occurs on any processor within the defined scope.
11+
12+
**Example:**
13+
```julia
14+
g = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) f(x,y)
15+
```
16+
Task `g` executes only on Processor 3. Its result can be accessed by any processor.
17+
18+
---
19+
20+
### Compute Scope
21+
`compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If `result_scope` isn't specified, it defaults to `AnyScope()`, allowing the result to be accessed by any processor.
22+
23+
**Example:**
24+
```julia
25+
g1 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) f(x,y)
26+
g2 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) f(x,y)
27+
```
28+
Task `g1` and `g2` execute on either thread 2 of processor 1, or thread 1 of processor 3. Their result can be accessed by any processor.
29+
30+
---
31+
32+
### Result Scope
33+
34+
`result_scope` restricts where a task's result can be fetched or moved. This is crucial for managing data locality and minimizing transfers. If only `result_scope` is specified, the `compute_scope` defaults to `Dagger.DefaultScope()`, meaning computation may happen on any processor.
35+
36+
**Example:**
37+
```julia
38+
g = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) f(x,y)
39+
```
40+
The result of `g` is accessible only from worker process 3. The task's execution may happen anywhere.
41+
42+
---
43+
44+
## Interaction of compute_scope and result_scope
45+
46+
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If intersection does not exist then Scheduler throws Exception error.
47+
48+
**Example:**
49+
```julia
50+
g = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) f(x,y)
51+
```
52+
The task `g` computes on `Dagger.ThreadProc(2, 2)` (as it's the intersection of compute and result scopes), and its result access is also restricted to `Dagger.ThreadProc(2, 2)`.
53+
54+
---
55+
56+
## Chunk Inputs to Tasks
57+
58+
This section explains how `scope`, `compute_scope`, and `result_scope` affect tasks when a `Chunk` is the primary input to `@spawn` (e.g., `Dagger.tochunk(...)`).
59+
60+
Assume `g` is some function, e.g., `g(x, y) = x * 2 + y * 3` and . `chunk_proc` is the chunk's processor, and `chunk_scope` is its defined accessibility.
61+
62+
When `Dagger.tochunk(...)` is directly spawned:
63+
- The task executes on `chunk_proc`.
64+
- The result is accessible only within `chunk_scope`.
65+
- This behavior occurs irrespective of the `scope`, `compute_scope`, and `result_scope` values provided in the `@spawn` macro.
66+
- Dagger validates that there is an intersection between the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`) and the `result_scope`. If no intersection exists, the Scheduler throws an exception.
67+
68+
**Usage:**
69+
```julia
70+
h1 = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope)
71+
h2 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope)
72+
h3 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope)
73+
h4 = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) Dagger.tochunk(g(40, 41), chunk_proc, chunk_scope)
74+
h5 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) Dagger.tochunk(g(50, 51), chunk_proc, chunk_scope)
75+
```
76+
In all these cases (`h1` through `h5`), the task gets executed on `chunk_proc`, and its result is accessible only within `chunk_scope`.
77+
78+
---
79+
80+
## Function with Chunked Arguments as Tasks
81+
82+
This section details behavior when `scope`, `compute_scope`, and `result_scope` are used with tasks where a function is the input, and its arguments include `Chunks`.
83+
84+
Assume `g(x, y) = x * 2 + y * 3` is a function, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)` is a chunked argument, where `arg_proc` is the chunk's processor and `arg_scope` is its defined scope.
85+
86+
### Scope
87+
If `arg_scope` and `scope` do not intersect, the Scheduler throws an exception. Otherwise, `compute_scope` defaults to `scope`, and `result_scope` defaults to `AnyScope()`. Execution occurs on the intersection of `scope` and `arg_scope`.
88+
89+
```julia
90+
h = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) g(arg, 11)
91+
```
92+
Task `h` executes on any processor within the intersection of `scope` and `arg_scope`. The result is stored and accessible from anywhere.
93+
94+
---
95+
96+
### Compute Scope
97+
If `arg_scope` and `compute_scope` do not intersect, the Scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. `result_scope` defaults to `AnyScope()`.
98+
99+
```julia
100+
h1 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) g(arg, 11)
101+
h2 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) g(arg, 21)
102+
```
103+
Task `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is stored and accessible from anywhere.
104+
105+
---
106+
107+
### Result Scope
108+
If only `result_scope` is specified, computation happens on any processor within `arg_scope`, and the result is only accessible from `result_scope`.
109+
110+
```julia
111+
h = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) g(arg, 11)
112+
```
113+
Task `h` executes on any processor within `arg_scope`. The result is accessible from `result_scope`.
114+
115+
---
116+
117+
### Compute and Result Scope
118+
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the Scheduler throws an exception.
119+
120+
```julia
121+
h = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) g(arg, 31)
122+
```
123+
Task `h` computes on `Dagger.ThreadProc(2, 2)` (as it's the intersection of `arg`, `compute`, and `result` scopes), and its result access is also restricted to `Dagger.ThreadProc(2, 2)`.

src/sch/Sch.jl

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import Random: randperm
1414
import Base: @invokelatest
1515

1616
import ..Dagger
17-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
17+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject
1818
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
1919
import ..Dagger: @dagdebug, @safe_lock_spin1
2020
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
@@ -726,16 +726,32 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
726726
sig = signature(state, task)
727727

728728
# Calculate scope
729-
scope = if task.f isa Chunk
730-
task.f.scope
731-
else
732-
if task.options.proclist !== nothing
733-
# proclist overrides scope selection
734-
AnyScope()
735-
else
736-
DefaultScope()
729+
scope = constrain(task.compute_scope, task.result_scope)
730+
if scope isa InvalidScope
731+
ex = SchedulingException("Compute and Result Scopes are not compatible: $(scope.x), $(scope.y)")
732+
state.cache[task] = ex
733+
state.errored[task] = true
734+
set_failed!(state, task)
735+
@goto pop_task
736+
end
737+
if task.f isa Chunk
738+
scope = constrain(scope, task.f.scope)
739+
if scope isa InvalidScope
740+
ex = SchedulingException("Compute and Chunk Scopes are not compatible: $(scope.x), $(scope.y)")
741+
state.cache[task] = ex
742+
state.errored[task] = true
743+
set_failed!(state, task)
744+
@goto pop_task
737745
end
738746
end
747+
748+
# if task.options.proclist !== nothing
749+
# # proclist overrides scope selection
750+
# AnyScope()
751+
# else
752+
# DefaultScope()
753+
# end
754+
739755
for (_,input) in task.inputs
740756
input = unwrap_weak_checked(input)
741757
chunk = if istask(input)
@@ -747,8 +763,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
747763
end
748764
chunk isa Chunk || continue
749765
scope = constrain(scope, chunk.scope)
750-
if scope isa Dagger.InvalidScope
751-
ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)")
766+
if scope isa InvalidScope
767+
ex = SchedulingException("Final Compute and Argument Chunk Scopes are not compatible: $(scope.x), $(scope.y)")
752768
state.cache[task] = ex
753769
state.errored[task] = true
754770
set_failed!(state, task)
@@ -1086,7 +1102,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10861102
thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options,
10871103
propagated, ids, positions,
10881104
(log_sink=ctx.log_sink, profile=ctx.profile),
1089-
sch_handle, state.uid])
1105+
sch_handle, state.uid, thunk.result_scope])
10901106
end
10911107
# N.B. We don't batch these because we might get a deserialization
10921108
# error due to something not being defined on the worker, and then we don't
@@ -1305,7 +1321,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13051321
task = task_spec[]
13061322
scope = task[5]
13071323
if !isa(constrain(scope, Dagger.ExactScope(to_proc)),
1308-
Dagger.InvalidScope) &&
1324+
InvalidScope) &&
13091325
typemax(UInt32) - proc_occupancy_cached >= occupancy
13101326
# Compatible, steal this task
13111327
return dequeue_pair!(queue)
@@ -1488,7 +1504,7 @@ function do_task(to_proc, task_desc)
14881504
scope, Tf, data,
14891505
send_result, persist, cache, meta,
14901506
options, propagated, ids, positions,
1491-
ctx_vars, sch_handle, sch_uid = task_desc
1507+
ctx_vars, sch_handle, sch_uid, result_scope = task_desc
14921508
ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile)
14931509

14941510
from_proc = OSProc()
@@ -1696,7 +1712,7 @@ function do_task(to_proc, task_desc)
16961712

16971713
# Construct result
16981714
# TODO: We should cache this locally
1699-
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache,
1715+
send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache,
17001716
tag=options.storage_root_tag,
17011717
leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()),
17021718
retain=options.storage_retain)
@@ -1705,6 +1721,8 @@ function do_task(to_proc, task_desc)
17051721
RemoteException(myid(), CapturedException(ex, bt))
17061722
end
17071723

1724+
# @dagdebug thunk_id :scope "Result scope is $result_scope"
1725+
17081726
threadtime = cputhreadtime() - threadtime_start
17091727
# FIXME: This is not a realistic measure of max. required memory
17101728
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))

src/thunk.jl

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ mutable struct Thunk
7373
eager_ref::Union{DRef,Nothing}
7474
options::Any # stores scheduler-specific options
7575
propagates::Tuple # which options we'll propagate
76+
compute_scope::AbstractScope
77+
result_scope::AbstractScope
7678
function Thunk(f, xs...;
7779
syncdeps=nothing,
7880
id::Int=next_id(),
@@ -84,16 +86,19 @@ mutable struct Thunk
8486
affinity=nothing,
8587
eager_ref=nothing,
8688
processor=nothing,
87-
scope=nothing,
89+
scope=DefaultScope(),
90+
compute_scope=scope,
91+
result_scope=AnyScope(),
8892
options=nothing,
8993
propagates=(),
9094
kwargs...
9195
)
92-
if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
93-
f = tochunk(f,
94-
something(processor, OSProc()),
95-
something(scope, DefaultScope()))
96-
end
96+
# if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
97+
# f = tochunk(f,
98+
# something(processor, OSProc()),
99+
# something(scope, DefaultScope()))
100+
# end
101+
97102
xs = Base.mapany(identity, xs)
98103
syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs)))
99104
if syncdeps !== nothing
@@ -105,11 +110,11 @@ mutable struct Thunk
105110
if options !== nothing
106111
@assert isempty(kwargs)
107112
new(f, xs, syncdeps_set, id, get_result, meta, persist, cache,
108-
cache_ref, affinity, eager_ref, options, propagates)
113+
cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope)
109114
else
110115
new(f, xs, syncdeps_set, id, get_result, meta, persist, cache,
111116
cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...),
112-
propagates)
117+
propagates, compute_scope, result_scope)
113118
end
114119
end
115120
end
@@ -477,13 +482,15 @@ function spawn(f, args...; kwargs...)
477482
end
478483

479484
# Wrap f in a Chunk if necessary
480-
processor = haskey(options, :processor) ? options.processor : nothing
481-
scope = haskey(options, :scope) ? options.scope : nothing
482-
if !isnothing(processor) || !isnothing(scope)
483-
f = tochunk(f,
484-
something(processor, get_options(:processor, OSProc())),
485-
something(scope, get_options(:scope, DefaultScope())))
486-
end
485+
# processor = haskey(options, :processor) ? options.processor : nothing
486+
# compute_scope = haskey(options, :compute_scope) ? options.compute_scope : (haskey(options, :scope) ? options.scope : nothing)
487+
# result_scope = haskey(options, :result_scope) ? options.result_scope : nothing
488+
489+
# if !isnothing(processor) || !isnothing(scope)
490+
# f = tochunk(f,
491+
# something(processor, get_options(:processor, OSProc())),
492+
# something(scope, get_options(:scope, DefaultScope())))
493+
# end
487494

488495
# Process the args and kwargs into Pair form
489496
args_kwargs = args_kwargs_to_pairs(args, kwargs)

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ tests = [
99
("Options", "options.jl"),
1010
("Mutation", "mutation.jl"),
1111
("Task Queues", "task-queues.jl"),
12+
("Task Affinity", "task-affinity.jl"),
1213
("Datadeps", "datadeps.jl"),
1314
("Streaming", "streaming.jl"),
1415
("Domain Utilities", "domain.jl"),

0 commit comments

Comments
 (0)