Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .gitmodules
Empty file.
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ makedocs(;
"Parallel Nested Loops" => "use-cases/parallel-nested-loops.md",
],
"Task Spawning" => "task-spawning.md",
"Task Affinity" => "task-affinity.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
Expand Down
338 changes: 337 additions & 1 deletion docs/src/darray.md

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions docs/src/task-affinity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Task Affinity

Dagger.jl's `@spawn` macro allows precise control over task execution and result accessibility using `scope`, `compute_scope`, and `result_scope`, which specify various chunk scopes of the task.

For more information on how these scopes work, see [Scopes](scopes.md#Scopes).

---

## Key Terms

### Scope
`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not explicitly set, the task runs within the `compute_scope`. If both `scope` and `compute_scope` both are unspecified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope.

**Example:**
```julia
g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y)
```
Task `g` executes only on worker 3. Its result can be accessed by any worker.

---

### Compute Scope
Like `scope`,`compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, the they default to `DefaultScope()`.

**Example:**
```julia
g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y)
```
Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker.

---

### Result Scope

The result_scope limits the workers from which a task's result can be accessed. This is crucial for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any worker.

**Example:**
```julia
g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1,3, 4]) f(x,y)
```
The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3.

---

## Interaction of `compute_scope` and `result_scope`

When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty then the scheduler throws a `Dagger.Sch.SchedulerException` error.

**Example:**
```julia
g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y)
```
The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), and its result access is also restricted to thread 2 of worker 2.

---

## Chunk Inputs to Tasks

This section explains how `scope`, `compute_scope`, and `result_scope` affect tasks when a `Chunk` is the primary input to `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task).

Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, `chunk_proc` is the chunk's processor, and `chunk_scope` is its defined accessibility.

When `Dagger.tochunk(...)` is directly spawned:
- The task executes on `chunk_proc`.
- The result is accessible only within `chunk_scope`.
- This behavior occurs irrespective of the `scope`, `compute_scope`, and `result_scope` values provided in the `@spawn` macro.
- Dagger validates that there is an intersection between the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`) and the `result_scope`. If no intersection exists, the scheduler throws an exception.

!!! info While `chunk_proc` is currently required when constructing a chunk, it is largely unused in actual scheduling logic. It exists primarily for backward compatibility and may be deprecated in the future.

**Usage:**
```julia
h1 = Dagger.@spawn scope=Dagger.scope(worker=3) Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope)
h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope)
h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope)
h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) Dagger.tochunk(g(40, 41), chunk_proc, chunk_scope)
h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope(worker=2,threads=[2,3]) Dagger.tochunk(g(50, 51), chunk_proc, chunk_scope)
```
In all these cases (`h1` through `h5`), the tasks get executed on processor `chunk_proc` of chunk, and its result is accessible only within `chunk_scope`.

---

## Function with Chunk Arguments as Tasks

This section details behavior when `scope`, `compute_scope`, and `result_scope` are used with tasks where a function is the input, and its arguments include `Chunk`s.

Assume `g(x, y) = x * 2 + y * 3` is a function, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)` is a chunk argument, where `arg_proc` is the chunk's processor and `arg_scope` is its defined scope.

### Scope
If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`.

```julia
h = Dagger.@spawn scope=Dagger.scope(worker=3) g(arg, 11)
```
Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any worker.

---

### Compute scope and Chunk argument scopes interaction
If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. `result_scope` defaults to `AnyScope()`.

```julia
h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 11)
h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 21)
```
Tasks `h1` and `h2` execute on any worker within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is stored and accessible from anywhere.

---

### Result scope and Chunk argument scopes interaction
If only `result_scope` is specified, computation happens on any worker within `arg_scope`, and the result is only accessible from `result_scope`.

```julia
h = Dagger.@spawn result_scope=Dagger.scope(worker=3) g(arg, 11)
```
Task `h` executes on any worker within `arg_scope`. The result is accessible from `result_scope`.

---

### Compute, result, and chunk argument scopes interaction
When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception.

```julia
h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31)
```
Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg`, `compute`, and `result` scopes), and its result access is also restricted to thread 2 of worker 2.
2 changes: 1 addition & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import SparseArrays: sprand, SparseMatrixCSC
import MemPool
import MemPool: DRef, FileRef, poolget, poolset

import Base: collect, reduce
import Base: collect, reduce, view

import LinearAlgebra
import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric
Expand Down
148 changes: 98 additions & 50 deletions src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,51 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
want_index::Bool
domain::ArrayDomain{N}
domainchunks
partitioning::AbstractBlocks
partitioning::AbstractBlocks{N}
procgrid::Union{AbstractArray{<:Processor, N}, Nothing}
end
size(a::AllocateArray) = size(a.domain)

function AllocateArray(assignment::AssignmentType{N}, eltype::Type{T}, f, want_index::Bool, d::ArrayDomain{N}, domainchunks, p::AbstractBlocks{N}) where {T,N}
sizeA = map(length, d.indexes)
procgrid = nothing
availprocs = collect(Dagger.all_processors())
sort!(availprocs, by = x -> (x.owner, x.tid))
if assignment isa Symbol
if assignment == :arbitrary
procgrid = nothing
elseif assignment == :blockrow
q = ntuple(i -> i == 1 ? Int(ceil(sizeA[1] / p.blocksize[1])) : 1, N)
rows_per_proc, extra = divrem(Int(ceil(sizeA[1] / p.blocksize[1])), num_processors())
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
procgrid = reshape(vcat(fill.(availprocs, counts)...), q)
elseif assignment == :blockcol
q = ntuple(i -> i == N ? Int(ceil(sizeA[N] / p.blocksize[N])) : 1, N)
cols_per_proc, extra = divrem(Int(ceil(sizeA[N] / p.blocksize[N])), num_processors())
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
procgrid = reshape(vcat(fill.(availprocs, counts)...), q)
elseif assignment == :cyclicrow
q = ntuple(i -> i == 1 ? num_processors() : 1, N)
procgrid = reshape(availprocs, q)
elseif assignment == :cycliccol
q = ntuple(i -> i == N ? num_processors() : 1, N)
procgrid = reshape(availprocs, q)
else
error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol")
end
elseif assignment isa AbstractArray{<:Int, N}
missingprocs = filter(q -> q ∉ procs(), assignment)
isempty(missingprocs) || error("Missing processors: $missingprocs")
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
elseif assignment isa AbstractArray{<:Processor, N}
missingprocs = filter(q -> q ∉ availprocs, assignment)
isempty(missingprocs) || error("Missing processors: $missingprocs")
procgrid = assignment
end

return AllocateArray{T,N}(eltype, f, want_index, d, domainchunks, p, procgrid)
end

function _cumlength(len, step)
nice_pieces = div(len, step)
extra = rem(len, step)
Expand All @@ -34,79 +75,86 @@ function allocate_array(f, T, sz)
end
allocate_array_func(::Processor, f) = f
function stage(ctx, a::AllocateArray)
if a.want_index
thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, i, size(x)) for (i, x) in enumerate(a.domainchunks)]
else
thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, size(x)) for (i, x) in enumerate(a.domainchunks)]
chunks = map(CartesianIndices(a.domainchunks)) do I
x = a.domainchunks[I]
i = LinearIndices(a.domainchunks)[I]
args = a.want_index ? (i, size(x)) : (size(x),)
scope = isnothing(a.procgrid) ? nothing : ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])

if isnothing(scope)
Dagger.@spawn allocate_array(a.f, a.eltype, args...)
else
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
end
end
return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning)
return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning)
end

const BlocksOrAuto = Union{Blocks{N} where N, AutoBlocks}

function Base.rand(p::Blocks, eltype::Type, dims::Dims)
function Base.rand(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, rand, false, d, partition(p, d), p)
a = AllocateArray(assignment, eltype, rand, false, d, partition(p, d), p)
return _to_darray(a)
end
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...) = rand(p, T, dims)
Base.rand(p::BlocksOrAuto, T::Type, dims::Dims) = rand(p, T, dims)
Base.rand(p::BlocksOrAuto, dims::Integer...) = rand(p, Float64, dims)
Base.rand(p::BlocksOrAuto, dims::Dims) = rand(p, Float64, dims)
Base.rand(::AutoBlocks, eltype::Type, dims::Dims) =
rand(auto_blocks(dims), eltype, dims)
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment)
Base.rand(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment)
Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment)
Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment)
Base.rand(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
rand(auto_blocks(dims), eltype, dims; assignment=assignment)

function Base.randn(p::Blocks, eltype::Type, dims::Dims)
function Base.randn(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, randn, false, d, partition(p, d), p)
a = AllocateArray(assignment, eltype, randn, false, d, partition(p, d), p)
return _to_darray(a)
end
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...) = randn(p, T, dims)
Base.randn(p::BlocksOrAuto, T::Type, dims::Dims) = randn(p, T, dims)
Base.randn(p::BlocksOrAuto, dims::Integer...) = randn(p, Float64, dims)
Base.randn(p::BlocksOrAuto, dims::Dims) = randn(p, Float64, dims)
Base.randn(::AutoBlocks, eltype::Type, dims::Dims) =
randn(auto_blocks(dims), eltype, dims)
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment)
Base.randn(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment)
Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment)
Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment)
Base.randn(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
randn(auto_blocks(dims), eltype, dims; assignment=assignment)

function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat)
function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p)
a = AllocateArray(assignment, eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p)
return _to_darray(a)
end
sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...) =
sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end])
sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat) =
sprand(p, T, dims, sparsity)
sprand(p::BlocksOrAuto, dims_and_sparsity::Real...) =
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end])
sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat) =
sprand(p, Float64, dims, sparsity)
sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) =
sprand(auto_blocks(dims), eltype, dims, sparsity)
sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) =
sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment)
sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
sprand(p, T, dims, sparsity; assignment=assignment)
sprand(p::BlocksOrAuto, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) =
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment)
sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
sprand(p, Float64, dims, sparsity; assignment=assignment)
sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
sprand(auto_blocks(dims), eltype, dims, sparsity; assignment=assignment)

function Base.ones(p::Blocks, eltype::Type, dims::Dims)
function Base.ones(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, ones, false, d, partition(p, d), p)
a = AllocateArray(assignment, eltype, ones, false, d, partition(p, d), p)
return _to_darray(a)
end
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...) = ones(p, T, dims)
Base.ones(p::BlocksOrAuto, T::Type, dims::Dims) = ones(p, T, dims)
Base.ones(p::BlocksOrAuto, dims::Integer...) = ones(p, Float64, dims)
Base.ones(p::BlocksOrAuto, dims::Dims) = ones(p, Float64, dims)
Base.ones(::AutoBlocks, eltype::Type, dims::Dims) =
ones(auto_blocks(dims), eltype, dims)
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment)
Base.ones(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment)
Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment)
Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment)
Base.ones(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
ones(auto_blocks(dims), eltype, dims; assignment=assignment)

function Base.zeros(p::Blocks, eltype::Type, dims::Dims)
function Base.zeros(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, zeros, false, d, partition(p, d), p)
a = AllocateArray(assignment, eltype, zeros, false, d, partition(p, d), p)
return _to_darray(a)
end
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...) = zeros(p, T, dims)
Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims) = zeros(p, T, dims)
Base.zeros(p::BlocksOrAuto, dims::Integer...) = zeros(p, Float64, dims)
Base.zeros(p::BlocksOrAuto, dims::Dims) = zeros(p, Float64, dims)
Base.zeros(::AutoBlocks, eltype::Type, dims::Dims) =
zeros(auto_blocks(dims), eltype, dims)
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment)
Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment)
Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment)
Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment)
Base.zeros(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
zeros(auto_blocks(dims), eltype, dims; assignment=assignment)

function Base.zero(x::DArray{T,N}) where {T,N}
dims = ntuple(i->x.domain.indexes[i].stop, N)
Expand Down
Loading