diff --git a/docs/src/darray.md b/docs/src/darray.md index 715a6cbe8..2b74c7be7 100644 --- a/docs/src/darray.md +++ b/docs/src/darray.md @@ -211,6 +211,165 @@ across the workers in the Julia cluster in a relatively even distribution; future operations on a `DArray` may produce a different distribution from the one chosen by previous calls. + + +### Explicit Processor Mapping of DArray Blocks + +This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors or threads within the cluster. Fine-grained control over data locality can be crucial for optimizing the performance of certain distributed algorithms. + +You specify the mapping using the optional `assignment` keyword argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`) and the `distribute` function. + +The `assignment` argument accepts the following values: + +* `:arbitrary` (Default): + + * If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior. +* `:blockcyclic`: + + * If `assignment` is set to `:blockcyclic`, `DArray` blocks are assigned to processors in a block-cyclic manner. Blocks are distributed cyclically across processors, iterating through the processors in increasing rank along the *last* dimension of the block distribution. + * Any other symbol used for `assignment` results in an error. +* `AbstractArray{<:Int, N}`: + + * Provide an N-dimensional array of integer worker IDs. The dimension `N` must match the number of dimensions of the `DArray`. + * Dagger maps blocks to worker IDs in a block-cyclic manner. The block at index `(i, j, ...)` is assigned to the first thread of the processor with ID `assignment[i, j, ...]`. This pattern repeats in a block-cyclic fashion to assign all blocks. +* `AbstractArray{<:Processor, N}`: + + * Provide an N-dimensional array of `Processor` objects. The dimension `N` must match the number of dimensions of the `DArray` blocks. + * Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the `assignment` array. The block at index `(i, j, ...)` is assigned to the processor at `assignment[i, j, ...]`. This pattern repeats in a block-cyclic fashion to assign all blocks. + +#### Examples and Usage + +The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array: + +* `DArray`: For N-dimensional distributed arrays. + +* `DVector`: Specifically for 1-dimensional distributed arrays. + +* `DMatrix`: Specifically for 2-dimensional distributed arrays. + +* `distribute`: General function to distribute arrays. + +Here are some examples using a setup with one processor and three worker processors. + +First, let's create some sample arrays: + +```julia +A = rand(7, 11) # 2D array +v = rand(15) # 1D array +M = rand(5, 5, 5) # 3D array +``` + +1. **Arbitrary Assignment:** + + ```julia + Ad = distribute(A, Blocks(2, 2), :arbitrary) + # DMatrix(A, Blocks(2, 2), :arbitrary) + + vd = distribute(v, Blocks(3), :arbitrary) + # DVector(v, Blocks(3), :arbitrary) + + Md = distribute(M, Blocks(2, 2, 2), :arbitrary) + # DArray(M, Blocks(2,2,2), :arbitrary) + ``` + + This creates distributed arrays with the specified block sizes, and Dagger assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1) + + ``` + +2. **Block-Cyclic Assignment:** + + ```julia + Ad = distribute(A, Blocks(2, 2), :blockcyclic) + # DMatrix(A, Blocks(2, 2), :blockcyclic) + + vd = distribute(v, Blocks(3), :blockcyclic) + # DVector(v, Blocks(3), :blockcyclic) + + Md = distribute(M, Blocks(2, 2, 2), :blockcyclic) + # DArray(M, Blocks(2,2,2), :blockcyclic) + ``` + + This assigns blocks cyclically along the last dimension across the available processors with increasing rank. For the 2D case (`Ad`), the assignment will look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + + ``` + +3. **Block-Cyclic Assignment with Integer Array:** + + ```julia + assignment_2d = [2 1; 4 3] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), [3 1; 4 2]) + + assignment_1d = [2,3,1,4] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), [2,3,1,4]) + + assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3)) + + ``` + + Here, the assignment arrays define how processors are arranged. For example, `assignment_2d` creates a 2x2 processor grid for the 2D array. + + The assignment for `Ad` would be: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + + ``` + +4. **Block-Cyclic Assignment with Processor Array:** + + ```julia + assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1); + Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), assignment_2d) + + assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), assignment_1d) + + assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)], + [Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), assignment_3d) + + ``` + + If the assignment is a matrix of `Processor` objects, the blocks are assigned as follows: + For `Ad`: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + + ``` + + + ## Broadcasting As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's diff --git a/docs/src/task-affinity.md b/docs/src/task-affinity.md new file mode 100644 index 000000000..16ecb8e9b --- /dev/null +++ b/docs/src/task-affinity.md @@ -0,0 +1,47 @@ +```@meta +CurrentModule = Dagger +``` + +# Task Affinity + + +Dagger.jl's `@spawn` macro offers fine-grained control over task execution by using the `compute_scope` and `result_scope` options to precisely control where tasks run and where their results can be accessed. + +## Compute Scope + +`compute_scope` defines exactly where a task's computation must occur. This option overrides the standard `scope` option if both are provided. + +```julia +g = Dagger.@spawn compute_scope=ExactScope(Dagger.ThreadProc(3, 1)) f(x,y) +``` + +In this example, task `f(x,y)` is scheduled to run specifically on thread 1 of processor 3. + + +## Result Scope + +`result_scope` restricts the locations from which a task's result can be fetched. This is useful for managing data locality and access patterns. + +```julia +g = Dagger.@spawn result_scope=ExactScope(Dagger.OSProc(2)) f(x,y) +``` + +Here, the result of `f(x,y)` (referenced by `g`) will be primarily accessible from worker process 2. Fetching from other locations might require data movement. + +## Interaction of compute_scope and result_scope + +When both `compute_scope` and `result_scope` are specified for a task, Scheduler determines the execution location based on their intersection: + +- **Intersection Exists:** If there is an intersection between the compute_scope and result_scope, the task's computation will be scheduled to occur within this intersection. This is the preferred scenario. + +- **No Intersection:** If there is no intersection, the task's computation will occur in the compute_scope. However, the result_scope will still be respected for accessing the result. + +### Syntax: +```julia +g = Dagger.@spawn compute_scope=ExactScope(Dagger.ThreadProc(3, 1)) result_scope=ExactScope(Dagger.ThreadProc(2, 2)) f(x,y) +``` + +In this case, the task computes on `Dagger.ThreadProc(3, 1)`. Result access is restricted to `Dagger.ThreadProc(2, 2)`. + +!!! note "Chunk Inputs" + If the input to `Dagger.@spawn` is already a `Dagger.tochunk`, the `compute_scope` and `result_scope` options will have no effect on the task's execution or result accessibility. diff --git a/src/array/darray.jl b/src/array/darray.jl index 37c61a936..6e82af3f6 100644 --- a/src/array/darray.jl +++ b/src/array/darray.jl @@ -419,6 +419,7 @@ struct Distribute{T,N,B<:AbstractBlocks} <: ArrayOp{T, N} domainchunks partitioning::B data::AbstractArray{T,N} + procgrid::Union{AbstractArray{<:Processor, N}, Nothing} end size(x::Distribute) = size(domain(x.data)) @@ -426,19 +427,18 @@ size(x::Distribute) = size(domain(x.data)) Base.@deprecate BlockPartition Blocks -Distribute(p::Blocks, data::AbstractArray) = - Distribute(partition(p, domain(data)), p, data) +Distribute(p::Blocks, data::AbstractArray, procgrid::Union{AbstractArray{<:Processor},Nothing} = nothing) = + Distribute(partition(p, domain(data)), p, data, procgrid) -function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}) where {T,N} +function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} p = Blocks(ntuple(i->first(domainchunks.cumlength[i]), N)) - Distribute(domainchunks, p, data) + Distribute(domainchunks, p, data, procgrid) end -function Distribute(data::AbstractArray{T,N}) where {T,N} - nprocs = sum(w->length(Dagger.get_processors(OSProc(w))), - procs()) +function Distribute(data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} + nprocs = sum(w->length(get_processors(OSProc(w))),procs()) p = Blocks(ntuple(i->max(cld(size(data, i), nprocs), 1), N)) - return Distribute(partition(p, domain(data)), p, data) + return Distribute(partition(p, domain(data)), p, data, procgrid) end function stage(ctx::Context, d::Distribute) @@ -451,7 +451,8 @@ function stage(ctx::Context, d::Distribute) Nd = ndims(x) T = eltype(d.data) concat = x.concat - cs = map(d.domainchunks) do idx + cs = map(CartesianIndices(d.domainchunks)) do I + idx = d.domainchunks[I] chunks = stage(ctx, x[idx]).chunks shape = size(chunks) # TODO: fix hashing @@ -466,12 +467,20 @@ function stage(ctx::Context, d::Distribute) end end else - cs = map(d.domainchunks) do c + cs = map(CartesianIndices(d.domainchunks)) do I # TODO: fix hashing #hash = uhash(c, Base.hash(Distribute, Base.hash(d.data))) - Dagger.@spawn identity(d.data[c]) + c = d.domainchunks[I] + if isnothing(d.procgrid) + Dagger.@spawn identity(d.data[c]) + else + proc = d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)] + scope = ExactScope(proc) + Dagger.@spawn compute_scope=scope identity(d.data[c]) + end end end + return DArray(eltype(d.data), domain(d.data), d.domainchunks, @@ -494,29 +503,65 @@ function auto_blocks(dims::Dims{N}) where N end auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A)) -distribute(A::AbstractArray) = distribute(A, AutoBlocks()) -distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} = - _to_darray(Distribute(dist, A)) -distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A)) -function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N} +distribute(A::AbstractArray, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = distribute(A, AutoBlocks(), assignment) +function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N} + procgrid = nothing + availprocs = [proc for i in procs() for proc in get_processors(OSProc(i))] + sort!(availprocs, by = x -> (x.owner, x.tid)) + if assignment isa Symbol + if assignment == :arbitrary + procgrid = nothing + elseif assignment == :blockrow + p = ntuple(i -> i == 1 ? Int(ceil(size(A,1) / dist.blocksize[1])) : 1, N) + rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), num_processors()) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :blockcol + p = ntuple(i -> i == N ? Int(ceil(size(A,N) / dist.blocksize[N])) : 1, N) + cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), num_processors()) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :cyclicrow + p = ntuple(i -> i == 1 ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + elseif assignment == :cycliccol + p = ntuple(i -> i == N ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + else + error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol") + end + elseif assignment isa AbstractArray{<:Int, N} + missingprocs = filter(p -> p ∉ procs(), assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment] + elseif assignment isa AbstractArray{<:Processor, N} + missingprocs = filter(p -> p ∉ availprocs, assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = assignment + end + + return _to_darray(Distribute(dist, A, procgrid)) +end + +distribute(A::AbstractArray, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = distribute(A, auto_blocks(A), assignment) +function distribute(x::AbstractArray{T,N}, n::NTuple{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N} p = map((d, dn)->ceil(Int, d / dn), size(x), n) - distribute(x, Blocks(p)) + distribute(x, Blocks(p), assignment) end -distribute(x::AbstractVector, n::Int) = distribute(x, (n,)) -distribute(x::AbstractVector, n::Vector{<:Integer}) = - distribute(x, DomainBlocks((1,), (cumsum(n),))) +distribute(x::AbstractVector, n::Int, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) = distribute(x, (n,), assignment) +distribute(x::AbstractVector, n::Vector{<:Integer}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) = distribute(x, DomainBlocks((1,), (cumsum(n),)), assignment) -DVector(A::AbstractVector{T}, part::Blocks{1}) where T = distribute(A, part) -DMatrix(A::AbstractMatrix{T}, part::Blocks{2}) where T = distribute(A, part) -DArray(A::AbstractArray{T,N}, part::Blocks{N}) where {T,N} = distribute(A, part) +DVector(A::AbstractVector{T}, part::Blocks{1}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = distribute(A, part, assignment) +DMatrix(A::AbstractMatrix{T}, part::Blocks{2}, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = distribute(A, part, assignment) +DArray(A::AbstractArray{T,N}, part::Blocks{N}, assignment::Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} = :arbitrary) where {T,N} = distribute(A, part, assignment) -DVector(A::AbstractVector{T}) where T = DVector(A, AutoBlocks()) -DMatrix(A::AbstractMatrix{T}) where T = DMatrix(A, AutoBlocks()) -DArray(A::AbstractArray) = DArray(A, AutoBlocks()) +DVector(A::AbstractVector{T}, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = DVector(A, AutoBlocks(), assignment) +DMatrix(A::AbstractMatrix{T}, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = DMatrix(A, AutoBlocks(), assignment) +DArray(A::AbstractArray, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = DArray(A, AutoBlocks(), assignment) -DVector(A::AbstractVector{T}, ::AutoBlocks) where T = DVector(A, auto_blocks(A)) -DMatrix(A::AbstractMatrix{T}, ::AutoBlocks) where T = DMatrix(A, auto_blocks(A)) -DArray(A::AbstractArray, ::AutoBlocks) = DArray(A, auto_blocks(A)) +DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int, 1}, AbstractArray{<:Processor, 1}} = :arbitrary) where T = DVector(A, auto_blocks(A), assignment) +DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int, 2}, AbstractArray{<:Processor, 2}} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment) +DArray(A::AbstractArray, ::AutoBlocks, assignment::Union{Symbol, AbstractArray{<:Int}, AbstractArray{<:Processor}} = :arbitrary) = DArray(A, auto_blocks(A), assignment) function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N} collect(x) == y diff --git a/src/chunks.jl b/src/chunks.jl index 1eb56714e..887aaf1aa 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -264,7 +264,7 @@ be used. All other kwargs are passed directly to `MemPool.poolset`. """ -function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S} +function tochunk(x::X, proc::P=OSProc(), scope::S=DefaultScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S} if device === nothing device = if Sch.walk_storage_safe(x) MemPool.GLOBAL_DEVICE[] @@ -284,7 +284,7 @@ function savechunk(data, dir, f) end fr = FileRef(f, sz) proc = OSProc() - scope = AnyScope() # FIXME: Scoped to this node + scope = DefaultScope() # FIXME: Scoped to this node Chunk{typeof(data),typeof(fr),typeof(proc),typeof(scope)}(typeof(data), domain(data), fr, proc, scope, true) end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index b894f4526..0ee2b48a8 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -14,7 +14,7 @@ import Random: randperm import Base: @invokelatest import ..Dagger -import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime import ..Dagger: @dagdebug, @safe_lock_spin1 import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek @@ -733,7 +733,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) # proclist overrides scope selection AnyScope() else - DefaultScope() + !(constrain(task.compute_scope, task.result_scope) isa InvalidScope) ? constrain(task.compute_scope, task.result_scope) : task.compute_scope end end for (_,input) in task.inputs @@ -744,7 +744,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) input else nothing - end + end chunk isa Chunk || continue scope = constrain(scope, chunk.scope) if scope isa Dagger.InvalidScope @@ -1086,7 +1086,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options, propagated, ids, positions, (log_sink=ctx.log_sink, profile=ctx.profile), - sch_handle, state.uid]) + sch_handle, state.uid, thunk.result_scope]) end # N.B. We don't batch these because we might get a deserialization # error due to something not being defined on the worker, and then we don't @@ -1488,7 +1488,7 @@ function do_task(to_proc, task_desc) scope, Tf, data, send_result, persist, cache, meta, options, propagated, ids, positions, - ctx_vars, sch_handle, sch_uid = task_desc + ctx_vars, sch_handle, sch_uid, result_scope = task_desc ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile) from_proc = OSProc() @@ -1693,10 +1693,10 @@ function do_task(to_proc, task_desc) end timespan_finish(ctx, :storage_safe_scan, (;thunk_id, processor=to_proc), (;T=typeof(res))) end - + # Construct result # TODO: We should cache this locally - send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache, + send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache, tag=options.storage_root_tag, leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()), retain=options.storage_retain) diff --git a/src/thunk.jl b/src/thunk.jl index b24806f85..05446acf2 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -73,6 +73,8 @@ mutable struct Thunk eager_ref::Union{DRef,Nothing} options::Any # stores scheduler-specific options propagates::Tuple # which options we'll propagate + compute_scope::AbstractScope + result_scope::AbstractScope function Thunk(f, xs...; syncdeps=nothing, id::Int=next_id(), @@ -85,6 +87,8 @@ mutable struct Thunk eager_ref=nothing, processor=nothing, scope=nothing, + compute_scope=DefaultScope(), + result_scope=AnyScope(), options=nothing, propagates=(), kwargs... @@ -102,14 +106,14 @@ mutable struct Thunk end end @assert all(x->x isa Pair, xs) - if options !== nothing + if options !== nothing @assert isempty(kwargs) new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, - cache_ref, affinity, eager_ref, options, propagates) + cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope) else new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...), - propagates) + propagates, compute_scope, result_scope) end end end @@ -479,6 +483,9 @@ function spawn(f, args...; kwargs...) # Wrap f in a Chunk if necessary processor = haskey(options, :processor) ? options.processor : nothing scope = haskey(options, :scope) ? options.scope : nothing + compute_scope = haskey(options, :compute_scope) ? options.compute_scope : nothing + result_scope = haskey(options, :result_scope) ? options.result_scope : nothing + if !isnothing(processor) || !isnothing(scope) f = tochunk(f, something(processor, get_options(:processor, OSProc())), diff --git a/test/array/allocation.jl b/test/array/allocation.jl index a95ef2efc..3475f4301 100644 --- a/test/array/allocation.jl +++ b/test/array/allocation.jl @@ -201,6 +201,265 @@ end end end +@testset "Constructor with assignment" begin + + availprocs = [proc for i in procs() for proc in Dagger.get_processors(Dagger.OSProc(i))] + sort!(availprocs, by = x -> (x.owner, x.tid)) + numprocs = length(availprocs) + + + function chunk_processors(Ad::DArray) + [Dagger.processor(Ad.chunks[idx].future.future.v.value[2]) for idx in CartesianIndices(size(Dagger.domainchunks(Ad)))] + end + + function tile_processors(proc_grid::AbstractArray{<:Dagger.Processor,N}, block_grid::Tuple{Vararg{Int,N}}) where N + reps = Int.(ceil.(block_grid ./ size(proc_grid))) + tiled = repeat(proc_grid, reps...) + idx_slices = [1:block_grid[d] for d in 1:length(block_grid)] + return tiled[idx_slices...] + end + + function get_default_blockgrid(data, numprocs) + ndims_data = ndims(data) + size_data = size(data) + ntuple(i->i == ndims_data ? cld( size_data[ndims_data], cld(size_data[ndims_data], numprocs) ) : 1, ndims_data) + end + + + A = rand(41, 35, 12) + v = rand(23) + M = rand(76,118) + + t_blocks_a = (4,3,2) + d_blocks_a = Dagger.Blocks(t_blocks_a) + blocks_a = cld.(size(A), t_blocks_a) + + n_blocks_v = 3 + t_blocks_v = (n_blocks_v,) + v_blocks_v = [n_blocks_v] + d_blocks_v = Dagger.Blocks(t_blocks_v) + blocks_v = cld.(size(v), t_blocks_v) + blocks_nv = blocks_v[1] + + t_blocks_m = (2,3) + d_blocks_m = Dagger.Blocks(t_blocks_m) + blocks_m = cld.(size(M), t_blocks_m) + + + @testset "Arbitrary Assignment (:arbitrary)" begin + assignment = :arbitrary + + @testset "Auto Blocks" begin + + @test distribute(A, assignment) isa DArray && distribute(A, AutoBlocks(), assignment) isa DArray + @test distribute(v, assignment) isa DVector && distribute(v, AutoBlocks(), assignment) isa DVector + @test distribute(M, assignment) isa DMatrix && distribute(M, AutoBlocks(), assignment) isa DMatrix + + @test DArray( A, assignment) isa DArray && DArray( A, AutoBlocks(), assignment) isa DArray + @test DVector(v, assignment) isa DVector && DVector( v, AutoBlocks(), assignment) isa DVector + @test DMatrix(M, assignment) isa DMatrix && DMatrix( M, AutoBlocks(), assignment) isa DMatrix + + end + + @testset "Explicit Blocks" begin + + @test distribute(A, d_blocks_a, assignment) isa DArray && distribute(A, blocks_a, assignment) isa DArray + @test distribute(v, d_blocks_v, assignment) isa DVector && distribute(v, blocks_v, assignment) isa DVector + @test distribute(v, n_blocks_v, assignment) isa DVector + # @test distribute(v, v_blocks_v, assignment) isa DVector ## )distribute(::Vector{Float64}, ::DomainBlocks{1}, ::Symbol) + @test distribute(M, d_blocks_m, assignment) isa DMatrix && distribute(M, blocks_m, assignment) isa DMatrix + + @test DArray( A, d_blocks_a, assignment) isa DArray + @test DVector(v, d_blocks_v, assignment) isa DVector + @test DMatrix(M, d_blocks_m, assignment) isa DMatrix + + end + + end + + + @testset "Blockcyclic Assignment (:blockcyclic)" begin + assignment = :blockcyclic + + function get_default_procgrid(data, numprocs) + ndims_data = ndims(data) + reshape(availprocs, ntuple(i -> i == ndims_data ? numprocs : 1, ndims_data)) + end + + @testset "Auto Blocks" begin + + dist_A_def_auto = distribute(A, assignment); wait(dist_A_def_auto) + dist_A_auto_def = distribute(A, AutoBlocks(), assignment); wait(dist_A_auto_def) + dist_v_def_auto = distribute(v, assignment); wait(dist_v_def_auto) + dist_v_auto_def = distribute(v, AutoBlocks(), assignment); wait(dist_v_auto_def) + dist_M_def_auto = distribute(M, assignment); wait(dist_M_def_auto) + dist_M_auto_def = distribute(M, AutoBlocks(), assignment); wait(dist_M_auto_def) + + darr_A_def_auto = DArray( A, assignment); wait(darr_A_def_auto) + darr_A_auto_def = DArray( A, AutoBlocks(), assignment); wait(darr_A_auto_def) + dvec_v_def_auto = DVector( v, assignment); wait(dvec_v_def_auto) + dvec_v_auto_def = DVector( v, AutoBlocks(), assignment); wait(dvec_v_auto_def) + dmat_M_def_auto = DMatrix( M, assignment); wait(dmat_M_def_auto) + dmat_M_auto_def = DMatrix( M, AutoBlocks(), assignment); wait(dmat_M_auto_def) + + @test chunk_processors(dist_A_def_auto) == chunk_processors(dist_A_auto_def) == chunk_processors(darr_A_def_auto) == chunk_processors(darr_A_auto_def) == tile_processors(get_default_procgrid(A, numprocs), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_def_auto) == chunk_processors(dist_v_auto_def) == chunk_processors(dvec_v_def_auto) == chunk_processors(dvec_v_auto_def) == tile_processors(get_default_procgrid(v, numprocs), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_def_auto) == chunk_processors(dist_M_auto_def) == chunk_processors(dmat_M_def_auto) == chunk_processors(dmat_M_auto_def) == tile_processors(get_default_procgrid(M, numprocs), get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_def = distribute(A, d_blocks_a, assignment); wait(dist_A_exp_def) + dist_A_blocks_exp = distribute(A, blocks_a, assignment); wait(dist_A_blocks_exp) + dist_v_exp_def = distribute(v, d_blocks_v, assignment); wait(dist_v_exp_def) + dist_v_blocks_exp = distribute(v, blocks_v, assignment); wait(dist_v_blocks_exp) + dist_v_nblocks_exp = distribute(v, blocks_nv, assignment); wait(dist_v_nblocks_exp) + # dist_v_vblocks_exp = distribute(v, v_blocks_v, assignment); wait(dist_v_vblocks_exp) + dist_M_exp_def = distribute(M, d_blocks_m, assignment); wait(dist_M_exp_def) + dist_M_blocks_exp = distribute(M, blocks_m, assignment); wait(dist_M_blocks_exp) + + darr_A_exp_def = DArray( A, d_blocks_a, assignment); wait(darr_A_exp_def) + dvec_v_exp_def = DVector( v, d_blocks_v, assignment); wait(dvec_v_exp_def) + dmat_M_exp_def = DMatrix( M, d_blocks_m, assignment); wait(dmat_M_exp_def) + + + @test chunk_processors(dist_A_exp_def) == chunk_processors(dist_A_blocks_exp) == chunk_processors(darr_A_exp_def) == tile_processors(get_default_procgrid(A, numprocs), blocks_a) + @test chunk_processors(dist_v_exp_def) == chunk_processors(dist_v_blocks_exp) == chunk_processors(dvec_v_exp_def) == tile_processors(get_default_procgrid(v, numprocs), blocks_v) + @test chunk_processors(dist_v_nblocks_exp) == tile_processors(get_default_procgrid(v, numprocs), blocks_v) + # @test chunk_processors(dist_v_vblocks_exp) == tile_processors(get_default_procgrid(v, numprocs), blocksv) ## Failed: no method matching distribute(::Vector{Float64}, ::DomainBlocks{1}, ::Symbol) + @test chunk_processors(dist_M_exp_def) == chunk_processors(dist_M_blocks_exp) == chunk_processors(dmat_M_exp_def) == tile_processors(get_default_procgrid(M, numprocs), blocks_m) + + end + + end + + + @testset "OSProc ID Array Assignment (AbstractArray{<:Int, N})" begin + + function get_random_osproc_ids(data) + ndims_data = ndims(data) + if ndims_data == 3 + return rand(Dagger.procs(), 3, 2, 2) + elseif ndims_data == 1 + return rand(Dagger.procs(), 11) + elseif ndims_data == 2 + return rand(Dagger.procs(), 2, 5) + end + end + + function get_random_osprocs(proc_ids) + [Dagger.ThreadProc(proc, 1) for proc in proc_ids] + end + + rand_osproc_ids_A = rand(Dagger.procs(), 3, 2, 2) + rand_osproc_ids_v = rand(Dagger.procs(), 11) + rand_osproc_ids_M = rand(Dagger.procs(), 2, 5) + + @testset "Auto Blocks" begin + + dist_A_rand_osproc_auto = distribute(A, rand_osproc_ids_A); wait(dist_A_rand_osproc_auto) + dist_A_auto_rand_osproc = distribute(A, AutoBlocks(), rand_osproc_ids_A); wait(dist_A_auto_rand_osproc) + # dist_v_rand_osproc_auto = distribute(v, rand_osproc_ids_v); wait(dist_v_rand_osproc_auto) + dist_v_auto_rand_osproc = distribute(v, AutoBlocks(), rand_osproc_ids_v); wait(dist_v_auto_rand_osproc) + dist_M_rand_osproc_auto = distribute(M, rand_osproc_ids_M); wait(dist_M_rand_osproc_auto) + dist_M_auto_rand_osproc = distribute(M, AutoBlocks(), rand_osproc_ids_M); wait(dist_M_auto_rand_osproc) + + darr_A_rand_osproc_auto = DArray( A, rand_osproc_ids_A); wait(darr_A_rand_osproc_auto) + darr_A_auto_rand_osproc = DArray( A, AutoBlocks(), rand_osproc_ids_A); wait(darr_A_auto_rand_osproc) + dvec_v_rand_osproc_auto = DVector( v, rand_osproc_ids_v); wait(dvec_v_rand_osproc_auto) + dvec_v_auto_rand_osproc = DVector( v, AutoBlocks(), rand_osproc_ids_v); wait(dvec_v_auto_rand_osproc) + dmat_M_rand_osproc_auto = DMatrix( M, rand_osproc_ids_M); wait(dmat_M_rand_osproc_auto) + dmat_M_auto_rand_osproc = DMatrix( M, AutoBlocks(), rand_osproc_ids_M); wait(dmat_M_auto_rand_osproc) + + @test chunk_processors(dist_A_rand_osproc_auto) == chunk_processors(dist_A_auto_rand_osproc) == chunk_processors(darr_A_rand_osproc_auto) == chunk_processors(darr_A_auto_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_A), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_auto_rand_osproc) == chunk_processors(dvec_v_rand_osproc_auto) == chunk_processors(dvec_v_auto_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_v), get_default_blockgrid(v, numprocs)) + # @test chunk_processors(dist_v_rand_osproc_auto) == tile_processors(rand_osproc_ids_v, get_default_blockgrid(v, numprocs)) ## Failed: no method matching distribute(::Vector{Float64}, ::DomainBlocks{1}, ::Symbol) + @test chunk_processors(dist_M_rand_osproc_auto) == chunk_processors(dist_M_auto_rand_osproc) == chunk_processors(dmat_M_rand_osproc_auto) == chunk_processors(dmat_M_auto_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_M), get_default_blockgrid(M, numprocs)) + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_osproc = distribute(A, d_blocks_a, rand_osproc_ids_A); wait(dist_A_exp_rand_osproc) + dist_A_blocks_rand_osproc = distribute(A, blocks_a, rand_osproc_ids_A); wait(dist_A_blocks_rand_osproc) + dist_v_exp_rand_osproc = distribute(v, d_blocks_v, rand_osproc_ids_v); wait(dist_v_exp_rand_osproc) + dist_v_blocks_rand_osproc = distribute(v, blocks_v, rand_osproc_ids_v); wait(dist_v_blocks_rand_osproc) + dist_v_nblocks_rand_osproc = distribute(v, blocks_nv, rand_osproc_ids_v); wait(dist_v_nblocks_rand_osproc) + # dist_v_vblocks_rand_osproc = distribute(v, v_blocks_v, rand_osproc_ids_v); wait(dist_v_vblocks_rand_osproc) + dist_M_exp_rand_osproc = distribute(M, d_blocks_m, rand_osproc_ids_M); wait(dist_M_exp_rand_osproc) + dist_M_blocks_rand_osproc = distribute(M, blocks_m, rand_osproc_ids_M); wait(dist_M_blocks_rand_osproc) + + darr_A_exp_rand_osproc = DArray( A, d_blocks_a, rand_osproc_ids_A); wait(darr_A_exp_rand_osproc) + dvec_v_exp_rand_osproc = DVector( v, d_blocks_v, rand_osproc_ids_v); wait(dvec_v_exp_rand_osproc) + dmat_M_exp_rand_osproc = DMatrix( M, d_blocks_m, rand_osproc_ids_M); wait(dmat_M_exp_rand_osproc) + + @test chunk_processors(dist_A_exp_rand_osproc) == chunk_processors(dist_A_blocks_rand_osproc) == chunk_processors(darr_A_exp_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_A), blocks_a) + @test chunk_processors(dist_v_exp_rand_osproc) == chunk_processors(dist_v_blocks_rand_osproc) == chunk_processors(dvec_v_exp_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(dist_v_nblocks_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_v), blocks_v) + # @test chunk_processors(dist_v_vblocks_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_v), blocksv) ## Failed: no method matching distribute(::Vector{Float64}, ::DomainBlocks{1}, ::Symbol) + @test chunk_processors(dist_M_exp_rand_osproc) == chunk_processors(dist_M_blocks_rand_osproc) == chunk_processors(dmat_M_exp_rand_osproc) == tile_processors(get_random_osprocs(rand_osproc_ids_M), blocks_m) + + end + + end + + + @testset "Explicit Processor Array Assignment (AbstractArray{<:Processor, N})" begin + + rand_procs_A = reshape(availprocs[ rand(Dagger.procs(), 6) ], 2, 3, 1) + rand_procs_v = reshape(availprocs[ rand(Dagger.procs(), 5) ], 5) + rand_procs_M = reshape(availprocs[ rand(Dagger.procs(), 14) ], 2, 7) + + + @testset "Auto Blocks" begin + + dist_A_rand_procs_auto = distribute(A, rand_procs_A); wait(dist_A_rand_procs_auto) + dist_A_auto_rand_procs = distribute(A, AutoBlocks(), rand_procs_A); wait(dist_A_auto_rand_procs) + dist_v_rand_procs_auto = distribute(v, rand_procs_v); wait(dist_v_rand_procs_auto) + dist_v_auto_rand_procs = distribute(v, AutoBlocks(), rand_procs_v); wait(dist_v_auto_rand_procs) + dist_M_rand_procs_auto = distribute(M, rand_procs_M); wait(dist_M_rand_procs_auto) + dist_M_auto_rand_procs = distribute(M, AutoBlocks(), rand_procs_M); wait(dist_M_auto_rand_procs) + + darr_A_rand_procs_auto = DArray( A, rand_procs_A); wait(darr_A_rand_procs_auto) + darr_A_auto_rand_procs = DArray( A, AutoBlocks(), rand_procs_A); wait(darr_A_auto_rand_procs) + dvec_v_rand_procs_auto = DVector( v, rand_procs_v); wait(dvec_v_rand_procs_auto) + dvec_v_auto_rand_procs = DVector( v, AutoBlocks(), rand_procs_v); wait(dvec_v_auto_rand_procs) + dmat_M_rand_procs_auto = DMatrix( M, rand_procs_M); wait(dmat_M_rand_procs_auto) + dmat_M_auto_rand_procs = DMatrix( M, AutoBlocks(), rand_procs_M); wait(dmat_M_auto_rand_procs) + + @test chunk_processors(dist_A_rand_procs_auto) == chunk_processors(dist_A_auto_rand_procs) == chunk_processors(darr_A_rand_procs_auto) == chunk_processors(darr_A_auto_rand_procs) == tile_processors(rand_procs_A, get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_rand_procs_auto) == chunk_processors(dist_v_auto_rand_procs) == chunk_processors(dvec_v_rand_procs_auto) == chunk_processors(dvec_v_auto_rand_procs) == tile_processors(rand_procs_v, get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_rand_procs_auto) == chunk_processors(dist_M_auto_rand_procs) == chunk_processors(dmat_M_rand_procs_auto) == chunk_processors(dmat_M_auto_rand_procs) == tile_processors(rand_procs_M, get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_procs = distribute(A, d_blocks_a, rand_procs_A); wait(dist_A_exp_rand_procs) + dist_A_blocks_rand_procs = distribute(A, blocks_a, rand_procs_A); wait(dist_A_blocks_rand_procs) + dist_v_exp_rand_procs = distribute(v, d_blocks_v, rand_procs_v); wait(dist_v_exp_rand_procs) + dist_v_blocks_rand_procs = distribute(v, blocks_v, rand_procs_v); wait(dist_v_blocks_rand_procs) + dist_v_nblocks_rand_procs = distribute(v, blocks_nv, rand_procs_v); wait(dist_v_nblocks_rand_procs) + # dist_v_vblocks_rand_procs = distribute(v, v_blocks_v, rand_procs_v); wait(dist_v_vblocks_rand_procs) + dist_M_exp_rand_procs = distribute(M, d_blocks_m, rand_procs_M); wait(dist_M_exp_rand_procs) + dist_M_blocks_rand_procs = distribute(M, blocks_m, rand_procs_M); wait(dist_M_blocks_rand_procs) + + darr_A_exp_rand_procs = DArray( A, d_blocks_a, rand_procs_A); wait(darr_A_exp_rand_procs) + dvec_v_exp_rand_procs = DVector( v, d_blocks_v, rand_procs_v); wait(dvec_v_exp_rand_procs) + dmat_M_exp_rand_procs = DMatrix( M, d_blocks_m, rand_procs_M); wait(dmat_M_exp_rand_procs) + + @test chunk_processors(dist_A_exp_rand_procs) == chunk_processors(dist_A_blocks_rand_procs) == chunk_processors(darr_A_exp_rand_procs) == tile_processors(rand_procs_A, blocks_a) + @test chunk_processors(dist_v_exp_rand_procs) == chunk_processors(dist_v_blocks_rand_procs) == chunk_processors(dvec_v_exp_rand_procs) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(dist_v_nblocks_rand_procs) == tile_processors(rand_procs_v, blocks_v) + # @test chunk_processors(dist_v_vblocks_rand_procs) == tile_processors(rand_procs_v, blocks_v) ## Failed: no method matching distribute(::Vector{Float64}, ::DomainBlocks{1}, ::Symbol) + @test chunk_processors(dist_M_exp_rand_procs) == chunk_processors(dist_M_blocks_rand_procs) == chunk_processors(dmat_M_exp_rand_procs) == tile_processors(rand_procs_M, blocks_m) + + end + + end + +end + @testset "view" begin A = rand(64, 64) DA = view(A, Blocks(8, 8))