Skip to content

Commit 6ca66cd

Browse files
Enhance task affinity support in Dagger.jl
- Updated documentation for the `@spawn` macro to clarify the usage of `scope`, `compute_scope`, and `result_scope`, including examples with the new syntax. - Improved error messages in the scheduling logic to provide clearer feedback when scopes are incompatible. - Refactored test cases for task affinity to ensure they align with the new scope handling and provide better coverage for edge cases. - Removed deprecated comments and cleaned up the code for better readability.
1 parent 44543d6 commit 6ca66cd

File tree

4 files changed

+112
-141
lines changed

4 files changed

+112
-141
lines changed

docs/src/task-affinity.md

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,127 @@
11
# Task Affinity
22

3-
Dagger.jl's `@spawn` macro allows precise control over task execution and result access using `scope`, `compute_scope`, and `result_scope`.
3+
Dagger.jl's `@spawn` macro allows precise control over task execution and result accessibility using `scope`, `compute_scope`, and `result_scope`, which specify various chunk scopes of the task.
4+
5+
For more information on how these scopes work, see [Scopes](scopes.md#Scopes).
46

57
---
68

79
## Key Terms
810

911
### Scope
10-
`scope` defines the general set of locations where a Dagger task can execute. If `compute_scope` and `result_scope` are not explicitly set, the task's `compute_scope` defaults to its `scope`, and its `result_scope` defaults to `AnyScope()`, meaning the result can be accessed by any processor. Execution occurs on any processor within the defined scope.
12+
`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not explicitly set, the task runs within the `compute_scope`. If both `scope` and `compute_scope` both are unspecified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope.
1113

1214
**Example:**
1315
```julia
14-
g = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) f(x,y)
16+
g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y)
1517
```
16-
Task `g` executes only on Processor 3. Its result can be accessed by any processor.
18+
Task `g` executes only on worker 3. Its result can be accessed by any worker.
1719

1820
---
1921

2022
### Compute Scope
21-
`compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If `result_scope` isn't specified, it defaults to `AnyScope()`, allowing the result to be accessed by any processor.
23+
Like `scope`,`compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, the they default to `DefaultScope()`.
2224

2325
**Example:**
2426
```julia
25-
g1 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) f(x,y)
26-
g2 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) f(x,y)
27+
g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
28+
g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
2729
```
28-
Task `g1` and `g2` execute on either thread 2 of processor 1, or thread 1 of processor 3. Their result can be accessed by any processor.
30+
Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker.
2931

3032
---
3133

3234
### Result Scope
3335

34-
`result_scope` restricts where a task's result can be fetched or moved. This is crucial for managing data locality and minimizing transfers. If only `result_scope` is specified, the `compute_scope` defaults to `Dagger.DefaultScope()`, meaning computation may happen on any processor.
36+
The result_scope limits the workers from which a task's result can be accessed. This is crucial for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any worker.
3537

3638
**Example:**
3739
```julia
38-
g = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) f(x,y)
40+
g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1,3, 4]) f(x,y)
3941
```
40-
The result of `g` is accessible only from worker process 3. The task's execution may happen anywhere.
42+
The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3.
4143

4244
---
4345

44-
## Interaction of compute_scope and result_scope
46+
## Interaction of `compute_scope` and `result_scope`
4547

46-
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If intersection does not exist then Scheduler throws Exception error.
48+
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty then the scheduler throws a `Dagger.Sch.SchedulerException` error.
4749

4850
**Example:**
4951
```julia
50-
g = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) f(x,y)
52+
g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y)
5153
```
52-
The task `g` computes on `Dagger.ThreadProc(2, 2)` (as it's the intersection of compute and result scopes), and its result access is also restricted to `Dagger.ThreadProc(2, 2)`.
54+
The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), and its result access is also restricted to thread 2 of worker 2.
5355

5456
---
5557

5658
## Chunk Inputs to Tasks
5759

58-
This section explains how `scope`, `compute_scope`, and `result_scope` affect tasks when a `Chunk` is the primary input to `@spawn` (e.g., `Dagger.tochunk(...)`).
60+
This section explains how `scope`, `compute_scope`, and `result_scope` affect tasks when a `Chunk` is the primary input to `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task).
5961

60-
Assume `g` is some function, e.g., `g(x, y) = x * 2 + y * 3` and . `chunk_proc` is the chunk's processor, and `chunk_scope` is its defined accessibility.
62+
Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, `chunk_proc` is the chunk's processor, and `chunk_scope` is its defined accessibility.
6163

6264
When `Dagger.tochunk(...)` is directly spawned:
6365
- The task executes on `chunk_proc`.
6466
- The result is accessible only within `chunk_scope`.
6567
- This behavior occurs irrespective of the `scope`, `compute_scope`, and `result_scope` values provided in the `@spawn` macro.
66-
- Dagger validates that there is an intersection between the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`) and the `result_scope`. If no intersection exists, the Scheduler throws an exception.
68+
- Dagger validates that there is an intersection between the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`) and the `result_scope`. If no intersection exists, the scheduler throws an exception.
69+
70+
!!! info While `chunk_proc` is currently required when constructing a chunk, it is largely unused in actual scheduling logic. It exists primarily for backward compatibility and may be deprecated in the future.
6771

6872
**Usage:**
6973
```julia
70-
h1 = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope)
71-
h2 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope)
72-
h3 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope)
73-
h4 = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) Dagger.tochunk(g(40, 41), chunk_proc, chunk_scope)
74-
h5 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) Dagger.tochunk(g(50, 51), chunk_proc, chunk_scope)
74+
h1 = Dagger.@spawn scope=Dagger.scope(worker=3) Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope)
75+
h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope)
76+
h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope)
77+
h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) Dagger.tochunk(g(40, 41), chunk_proc, chunk_scope)
78+
h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope(worker=2,threads=[2,3]) Dagger.tochunk(g(50, 51), chunk_proc, chunk_scope)
7579
```
76-
In all these cases (`h1` through `h5`), the task gets executed on `chunk_proc`, and its result is accessible only within `chunk_scope`.
80+
In all these cases (`h1` through `h5`), the tasks get executed on processor `chunk_proc` of chunk, and its result is accessible only within `chunk_scope`.
7781

7882
---
7983

80-
## Function with Chunked Arguments as Tasks
84+
## Function with Chunk Arguments as Tasks
8185

82-
This section details behavior when `scope`, `compute_scope`, and `result_scope` are used with tasks where a function is the input, and its arguments include `Chunks`.
86+
This section details behavior when `scope`, `compute_scope`, and `result_scope` are used with tasks where a function is the input, and its arguments include `Chunk`s.
8387

84-
Assume `g(x, y) = x * 2 + y * 3` is a function, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)` is a chunked argument, where `arg_proc` is the chunk's processor and `arg_scope` is its defined scope.
88+
Assume `g(x, y) = x * 2 + y * 3` is a function, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)` is a chunk argument, where `arg_proc` is the chunk's processor and `arg_scope` is its defined scope.
8589

8690
### Scope
87-
If `arg_scope` and `scope` do not intersect, the Scheduler throws an exception. Otherwise, `compute_scope` defaults to `scope`, and `result_scope` defaults to `AnyScope()`. Execution occurs on the intersection of `scope` and `arg_scope`.
91+
If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`.
8892

8993
```julia
90-
h = Dagger.@spawn scope=ExactScope(Dagger.OSProc(3)) g(arg, 11)
94+
h = Dagger.@spawn scope=Dagger.scope(worker=3) g(arg, 11)
9195
```
92-
Task `h` executes on any processor within the intersection of `scope` and `arg_scope`. The result is stored and accessible from anywhere.
96+
Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any worker.
9397

9498
---
9599

96-
### Compute Scope
97-
If `arg_scope` and `compute_scope` do not intersect, the Scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. `result_scope` defaults to `AnyScope()`.
100+
### Compute scope and Chunk argument scopes interaction
101+
If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. `result_scope` defaults to `AnyScope()`.
98102

99103
```julia
100-
h1 = Dagger.@spawn compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) g(arg, 11)
101-
h2 = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(2, 3)) compute_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(1, 2)), ExactScope(Dagger.ThreadProc(3, 1))) g(arg, 21)
104+
h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 11)
105+
h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 21)
102106
```
103-
Task `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is stored and accessible from anywhere.
107+
Tasks `h1` and `h2` execute on any worker within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is stored and accessible from anywhere.
104108

105109
---
106110

107-
### Result Scope
108-
If only `result_scope` is specified, computation happens on any processor within `arg_scope`, and the result is only accessible from `result_scope`.
111+
### Result scope and Chunk argument scopes interaction
112+
If only `result_scope` is specified, computation happens on any worker within `arg_scope`, and the result is only accessible from `result_scope`.
109113

110114
```julia
111-
h = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(3)) g(arg, 11)
115+
h = Dagger.@spawn result_scope=Dagger.scope(worker=3) g(arg, 11)
112116
```
113-
Task `h` executes on any processor within `arg_scope`. The result is accessible from `result_scope`.
117+
Task `h` executes on any worker within `arg_scope`. The result is accessible from `result_scope`.
114118

115119
---
116120

117-
### Compute and Result Scope
118-
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the Scheduler throws an exception.
121+
### Compute, result, and chunk argument scopes interaction
122+
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception.
119123

120124
```julia
121-
h = Dagger.@spawn scope=ExactScope(Dagger.ThreadProc(3, 2)) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.UnionScope(ExactScope(Dagger.ThreadProc(2, 2)), ExactScope(Dagger.ThreadProc(4, 2))) g(arg, 31)
125+
h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31)
122126
```
123-
Task `h` computes on `Dagger.ThreadProc(2, 2)` (as it's the intersection of `arg`, `compute`, and `result` scopes), and its result access is also restricted to `Dagger.ThreadProc(2, 2)`.
127+
Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg`, `compute`, and `result` scopes), and its result access is also restricted to thread 2 of worker 2.

src/sch/Sch.jl

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
728728
# Calculate scope
729729
scope = constrain(task.compute_scope, task.result_scope)
730730
if scope isa InvalidScope
731-
ex = SchedulingException("Compute and Result Scopes are not compatible: $(scope.x), $(scope.y)")
731+
ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)")
732732
state.cache[task] = ex
733733
state.errored[task] = true
734734
set_failed!(state, task)
@@ -737,21 +737,14 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
737737
if task.f isa Chunk
738738
scope = constrain(scope, task.f.scope)
739739
if scope isa InvalidScope
740-
ex = SchedulingException("Compute and Chunk Scopes are not compatible: $(scope.x), $(scope.y)")
740+
ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)")
741741
state.cache[task] = ex
742742
state.errored[task] = true
743743
set_failed!(state, task)
744744
@goto pop_task
745745
end
746746
end
747747

748-
# if task.options.proclist !== nothing
749-
# # proclist overrides scope selection
750-
# AnyScope()
751-
# else
752-
# DefaultScope()
753-
# end
754-
755748
for (_,input) in task.inputs
756749
input = unwrap_weak_checked(input)
757750
chunk = if istask(input)
@@ -764,7 +757,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
764757
chunk isa Chunk || continue
765758
scope = constrain(scope, chunk.scope)
766759
if scope isa InvalidScope
767-
ex = SchedulingException("Final Compute and Argument Chunk Scopes are not compatible: $(scope.x), $(scope.y)")
760+
ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)")
768761
state.cache[task] = ex
769762
state.errored[task] = true
770763
set_failed!(state, task)
@@ -1721,8 +1714,6 @@ function do_task(to_proc, task_desc)
17211714
RemoteException(myid(), CapturedException(ex, bt))
17221715
end
17231716

1724-
# @dagdebug thunk_id :scope "Result scope is $result_scope"
1725-
17261717
threadtime = cputhreadtime() - threadtime_start
17271718
# FIXME: This is not a realistic measure of max. required memory
17281719
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))

src/thunk.jl

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,6 @@ mutable struct Thunk
9393
propagates=(),
9494
kwargs...
9595
)
96-
# if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
97-
# f = tochunk(f,
98-
# something(processor, OSProc()),
99-
# something(scope, DefaultScope()))
100-
# end
10196

10297
xs = Base.mapany(identity, xs)
10398
syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs)))
@@ -481,17 +476,6 @@ function spawn(f, args...; kwargs...)
481476
args = args[2:end]
482477
end
483478

484-
# Wrap f in a Chunk if necessary
485-
# processor = haskey(options, :processor) ? options.processor : nothing
486-
# compute_scope = haskey(options, :compute_scope) ? options.compute_scope : (haskey(options, :scope) ? options.scope : nothing)
487-
# result_scope = haskey(options, :result_scope) ? options.result_scope : nothing
488-
489-
# if !isnothing(processor) || !isnothing(scope)
490-
# f = tochunk(f,
491-
# something(processor, get_options(:processor, OSProc())),
492-
# something(scope, get_options(:scope, DefaultScope())))
493-
# end
494-
495479
# Process the args and kwargs into Pair form
496480
args_kwargs = args_kwargs_to_pairs(args, kwargs)
497481

0 commit comments

Comments
 (0)