Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ makedocs(;
"Parallel Nested Loops" => "use-cases/parallel-nested-loops.md",
],
"Task Spawning" => "task-spawning.md",
"Task Affinity" => "task-affinity.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
Expand Down
131 changes: 131 additions & 0 deletions docs/src/task-affinity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Task Affinity

Dagger's allows for precise control over task placement and result availability using scopes. Tasks are assigned based on the combination of multiple scopes: `scope`/`compute_scope`, and `result_scope` (which can all be specified with `@spawn`), and additionally the scopes of any arguments to the task (in the form of a scope attached to a `Chunk` argument). Let's take a look at how to configure these scopes, and how they work together to direct task placement.

For more information on how scopes work, see [Scopes](@ref).

---

## Task Scopes

### Scope

`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not specified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope.

**Example:**
```julia
g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y)
```
Task `g` executes only on worker 3. Its result can be accessed by any worker.

---

### Compute Scope

Like `scope`, `compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, then they default to `DefaultScope()`.

**Example:**
```julia
g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
```
Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker.

---

### Result Scope

The result_scope limits the processors from which a task's result can be accessed. This can be useful for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any processor (including those not default enabled for task execution, such as GPUs).

**Example:**
```julia
g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1, 3, 4]) f(x,y)
```
The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3.

---

## Interaction of `compute_scope` and `result_scope`

When `scope`/`compute_scope` and `result_scope` are specified, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty, then the scheduler throws a `Dagger.Sch.SchedulerException` error.

**Example:**
```julia
g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y)
```
The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), but accessng its result is restricted to thread 2 of worker 2 and thread 2 of worker 4.

---

## Function as a Chunk

This section explains how `scope`/`compute_scope` and `result_scope` affect tasks when a `Chunk` is used to specify the function to be executed by `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task). This may seem strange (to use a `Chunk` to specify the function to be executed), but it can be useful with working with callable structs, such as closures or Flux.jl models.

Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, and `chunk_scope` is its defined affinity.

When `Dagger.tochunk(...)` is used to pass a `Chunk` as the function to be executed by `@spawn`:
- The result is accessible only on processors in `chunk_scope`.
- Dagger validates that there is an intersection between `chunk_scope`, the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`), and the `result_scope`. If no intersection exists, the scheduler throws an exception.

!!! info While `chunk_proc` is currently required when constructing a chunk, it is only used to pick the most optimal processor for accessing the chunk; it does not affect which set of processors the task may execute on.

**Usage:**
```julia
chunk_scope = Dagger.scope(worker=3)
chunk_proc = Dagger.OSProc(3) # not important, just needs to be a valid processor
g(x, y) = x * 2 + y * 3
g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope)
h1 = Dagger.@spawn scope=Dagger.scope(worker=3) g_chunk(10, 11)
h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(20, 21)
h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(30, 31)
h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) g_chunk(40, 41)
h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=3) result_scope=Dagger.scope(worker=3,threads=[2,3]) g_chunk(50, 51)
```
In all these cases (`h1` through `h5`), the tasks get executed on any processor within `chunk_scope` and its result is accessible only within `chunk_scope`.

---

## Chunk arguments

This section details behavior when some or all of a task's arguments are `Chunk`s.

Assume `g(x, y) = x * 2 + y * 3`, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)`, where `arg_scope` is the argument's defined scope. Assume `arg_scope = Dagger.scope(worker=2)`.

### Scope
If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`.

```julia
h = Dagger.@spawn scope=Dagger.scope(worker=2) g(arg, 11)
```
Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any processor.

---

### Compute scope and Chunk argument scopes interaction
If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`.

```julia
h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 11)
h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 21)
```
Tasks `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is accessible from any processor.

---

### Result scope and Chunk argument scopes interaction
If only `result_scope` is specified, computation happens on any processor within the intersection of `arg_scope` and `result_scope`, and the result is only accessible within `result_scope`.

```julia
h = Dagger.@spawn result_scope=Dagger.scope(worker=2) g(arg, 11)
```
Task `h` executes on any processor within the intersection of `arg_scope` and `result_scope`. The result is accessible from only within `result_scope`.

---

### Compute, result, and chunk argument scopes interaction
When `scope`/`compute_scope`, `result_scope`, and `Chunk` argument scopes are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception.

```julia
h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31)
```
Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg_scope`, `compute_scope`, and `result_scope`), and its result access is restricted to thread 2 of worker 2 or thread 2 of worker 4.
39 changes: 24 additions & 15 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Random: randperm
import Base: @invokelatest

import ..Dagger
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
import ..Dagger: @dagdebug, @safe_lock_spin1
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek
Expand Down Expand Up @@ -726,16 +726,25 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
sig = signature(state, task)

# Calculate scope
scope = if task.f isa Chunk
task.f.scope
else
if task.options.proclist !== nothing
# proclist overrides scope selection
AnyScope()
else
DefaultScope()
scope = constrain(task.compute_scope, task.result_scope)
if scope isa InvalidScope
ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)")
state.cache[task] = ex
state.errored[task] = true
set_failed!(state, task)
@goto pop_task
end
if task.f isa Chunk
scope = constrain(scope, task.f.scope)
if scope isa InvalidScope
ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)")
state.cache[task] = ex
state.errored[task] = true
set_failed!(state, task)
@goto pop_task
end
end

for (_,input) in task.inputs
input = unwrap_weak_checked(input)
chunk = if istask(input)
Expand All @@ -747,8 +756,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
end
chunk isa Chunk || continue
scope = constrain(scope, chunk.scope)
if scope isa Dagger.InvalidScope
ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)")
if scope isa InvalidScope
ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)")
state.cache[task] = ex
state.errored[task] = true
set_failed!(state, task)
Expand Down Expand Up @@ -1086,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options,
propagated, ids, positions,
(log_sink=ctx.log_sink, profile=ctx.profile),
sch_handle, state.uid])
sch_handle, state.uid, thunk.result_scope])
end
# N.B. We don't batch these because we might get a deserialization
# error due to something not being defined on the worker, and then we don't
Expand Down Expand Up @@ -1305,7 +1314,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
task = task_spec[]
scope = task[5]
if !isa(constrain(scope, Dagger.ExactScope(to_proc)),
Dagger.InvalidScope) &&
InvalidScope) &&
typemax(UInt32) - proc_occupancy_cached >= occupancy
# Compatible, steal this task
return dequeue_pair!(queue)
Expand Down Expand Up @@ -1488,7 +1497,7 @@ function do_task(to_proc, task_desc)
scope, Tf, data,
send_result, persist, cache, meta,
options, propagated, ids, positions,
ctx_vars, sch_handle, sch_uid = task_desc
ctx_vars, sch_handle, sch_uid, result_scope = task_desc
ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile)

from_proc = OSProc()
Expand Down Expand Up @@ -1696,7 +1705,7 @@ function do_task(to_proc, task_desc)

# Construct result
# TODO: We should cache this locally
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache,
send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache,
tag=options.storage_root_tag,
leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()),
retain=options.storage_retain)
Expand Down
6 changes: 2 additions & 4 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ function get_propagated_options(thunk)
nt = NamedTuple()
for key in thunk.propagates
value = if key == :scope
isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope()
elseif key == :processor
isa(thunk.f, Chunk) ? thunk.f.processor : OSProc()
thunk.compute_scope
elseif key in fieldnames(Thunk)
getproperty(thunk, key)
elseif key in fieldnames(ThunkOptions)
Expand Down Expand Up @@ -340,7 +338,7 @@ function can_use_proc(state, task, gproc, proc, opts, scope)
scope = constrain(scope, Dagger.ExactScope(proc))
elseif opts.proclist isa Vector
if !(typeof(proc) in opts.proclist)
@dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist)"
@dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist) ($(opts.proclist))"
return false, scope
end
scope = constrain(scope,
Expand Down
34 changes: 34 additions & 0 deletions src/scopes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ constrain(x::ProcessScope, y::ExactScope) =
constrain(x::NodeScope, y::ExactScope) =
x == y.parent.parent ? y : InvalidScope(x, y)


function constrain(scope1, scope2, scopes...)
scope1 = constrain(scope1, scope2)
scope1 isa InvalidScope && return scope1
for s in scopes
scope1 = constrain(scope1, s)
scope1 isa InvalidScope && return scope1
end
return scope1
end

### Scopes helper

"""
Expand Down Expand Up @@ -412,3 +423,26 @@ to_scope(::Val{key}, sc::NamedTuple) where key =

# Base case for all Dagger-owned keys
scope_key_precedence(::Val) = 0

### Scope comparison helpers

function Base.issetequal(scopes::AbstractScope...)
scope1 = scopes[1]
scope1_procs = Dagger.compatible_processors(scope1)
for scope2 in scopes[2:end]
scope2_procs = Dagger.compatible_processors(scope2)
if !issetequal(scope1_procs, scope2_procs)
return false
end
end
return true
end

function Base.issubset(scope1::AbstractScope, scope2::AbstractScope)
scope1_procs = compatible_processors(scope1)
scope2_procs = compatible_processors(scope2)
for proc in scope1_procs
proc in scope2_procs || return false
end
return true
end
21 changes: 16 additions & 5 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,20 @@ struct StreamingFunction{F, S}
new{F, S}(f, stream, max_evals)
end

struct DestPostMigration
thunk_id::Int
cancel_token::CancelToken
f
DestPostMigration(thunk_id, tls, f) = new(thunk_id, tls.cancel_token, f)
end
function (dpm::DestPostMigration)(store, unsent)
STREAM_THUNK_ID[] = dpm.thunk_id
@assert !in_task()
tls = DTaskTLS(OSProc(), typemax(UInt64), nothing, [], dpm.cancel_token)
set_tls!(tls)
return dpm.f(store, unsent)
end

function migrate_stream!(stream::Stream, w::Integer=myid())
# Perform migration of the StreamStore
# MemPool will block access to the new ref until the migration completes
Expand Down Expand Up @@ -318,11 +332,8 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
empty!(store.output_buffers)
return (unsent_inputs, unsent_outputs)
end,
dest_post_migration=(store, unsent)->begin
dest_post_migration=DestPostMigration(thunk_id, tls, (store, unsent)->begin
# Initialize the StreamStore on the destination with the unsent inputs/outputs.
STREAM_THUNK_ID[] = thunk_id
@assert !in_task()
set_tls!(tls)
#get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true)
unsent_inputs, unsent_outputs = unsent
for (input_uid, inputs) in unsent_inputs
Expand All @@ -342,7 +353,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
# Reset the state of this new store
store.open = true
store.migrating = false
end,
end),
post_migration=store->begin
# Indicate that this store has migrated
store.migrating = true
Expand Down
Loading