Skip to content

Commit 56643dc

Browse files
committed
fixup! Enhance DArray Distribution with Processor Assignment
1 parent 14913e3 commit 56643dc

File tree

4 files changed

+370
-425
lines changed

4 files changed

+370
-425
lines changed

src/array/alloc.jl

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
1414
function AllocateArray(eltype::Type{T}, f, want_index::Bool, d::ArrayDomain{N}, domainchunks, p::AbstractBlocks{N}, assignment::Union{AssignmentType{N},Nothing} = nothing) where {T,N}
1515
sizeA = map(length, d.indexes)
1616
procgrid = nothing
17-
availprocs = collect(Dagger.all_processors())
17+
availprocs = collect(Dagger.compatible_processors())
1818
sort!(availprocs, by = x -> (x.owner, x.tid))
1919
if assignment isa Symbol
2020
if assignment == :arbitrary
@@ -40,15 +40,15 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
4040
end
4141
elseif assignment isa AbstractArray{<:Int, N}
4242
missingprocs = filter(q -> q procs(), assignment)
43-
isempty(missingprocs) || error("Missing processors: $missingprocs")
43+
isempty(missingprocs) || error("Specified workers are not available: $missingprocs")
4444
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
4545
elseif assignment isa AbstractArray{<:Processor, N}
4646
missingprocs = filter(q -> q availprocs, assignment)
47-
isempty(missingprocs) || error("Missing processors: $missingprocs")
47+
isempty(missingprocs) || error("Specified processors are not available: $missingprocs")
4848
procgrid = assignment
4949
end
5050

51-
return new{T,N}(eltype, f, want_index, d, domainchunks, p, procgrid)
51+
return new{T,N}(eltype, f, want_index, d, domainchunks, p, procgrid)
5252
end
5353

5454
end
@@ -80,13 +80,13 @@ function stage(ctx, a::AllocateArray)
8080
x = a.domainchunks[I]
8181
i = LinearIndices(a.domainchunks)[I]
8282
args = a.want_index ? (i, size(x)) : (size(x),)
83-
scope = isnothing(a.procgrid) ? nothing : ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])
8483

85-
if isnothing(scope)
86-
Dagger.@spawn allocate_array(a.f, a.eltype, args...)
84+
if isnothing(a.procgrid)
85+
scope = nothing
8786
else
88-
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
87+
scope = ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])
8988
end
89+
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
9090
end
9191
return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning)
9292
end
@@ -98,64 +98,58 @@ function Base.rand(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentTy
9898
a = AllocateArray(eltype, rand, false, d, partition(p, d), p, assignment)
9999
return _to_darray(a)
100100
end
101-
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment)
102-
Base.rand(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment)
103-
Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment)
104-
Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment)
101+
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment)
102+
Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment)
103+
Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment)
105104
Base.rand(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
106-
rand(auto_blocks(dims), eltype, dims; assignment=assignment)
105+
rand(auto_blocks(dims), eltype, dims; assignment)
107106

108107
function Base.randn(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
109108
d = ArrayDomain(map(x->1:x, dims))
110109
a = AllocateArray(eltype, randn, false, d, partition(p, d), p, assignment)
111110
return _to_darray(a)
112111
end
113-
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment)
114-
Base.randn(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment)
115-
Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment)
116-
Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment)
112+
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment)
113+
Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment)
114+
Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment)
117115
Base.randn(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
118-
randn(auto_blocks(dims), eltype, dims; assignment=assignment)
116+
randn(auto_blocks(dims), eltype, dims; assignment)
119117

120118
function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary)
121119
d = ArrayDomain(map(x->1:x, dims))
122120
a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p, assignment)
123121
return _to_darray(a)
124122
end
125123
sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) =
126-
sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment)
127-
# sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
128-
# sprand(p, T, dims, sparsity; assignment=assignment)
124+
sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment)
129125
sprand(p::BlocksOrAuto, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) =
130-
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment)
126+
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment)
131127
sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
132-
sprand(p, Float64, dims, sparsity; assignment=assignment)
128+
sprand(p, Float64, dims, sparsity; assignment)
133129
sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
134-
sprand(auto_blocks(dims), eltype, dims, sparsity; assignment=assignment)
130+
sprand(auto_blocks(dims), eltype, dims, sparsity; assignment)
135131

136132
function Base.ones(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
137133
d = ArrayDomain(map(x->1:x, dims))
138134
a = AllocateArray(eltype, ones, false, d, partition(p, d), p, assignment)
139135
return _to_darray(a)
140136
end
141-
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment)
142-
Base.ones(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment)
143-
Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment)
144-
Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment)
137+
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment)
138+
Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment)
139+
Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment)
145140
Base.ones(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
146-
ones(auto_blocks(dims), eltype, dims; assignment=assignment)
141+
ones(auto_blocks(dims), eltype, dims; assignment)
147142

148143
function Base.zeros(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
149144
d = ArrayDomain(map(x->1:x, dims))
150145
a = AllocateArray(eltype, zeros, false, d, partition(p, d), p, assignment)
151146
return _to_darray(a)
152147
end
153-
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment)
154-
Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment)
155-
Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment)
156-
Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment)
148+
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment)
149+
Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment)
150+
Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment)
157151
Base.zeros(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
158-
zeros(auto_blocks(dims), eltype, dims; assignment=assignment)
152+
zeros(auto_blocks(dims), eltype, dims; assignment)
159153

160154
function Base.zero(x::DArray{T,N}) where {T,N}
161155
dims = ntuple(i->x.domain.indexes[i].stop, N)

src/array/darray.jl

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,13 @@ function stage(ctx::Context, d::Distribute)
457457
shape = size(chunks)
458458
# TODO: fix hashing
459459
#hash = uhash(idx, Base.hash(Distribute, Base.hash(d.data)))
460-
Dagger.spawn(shape, chunks...) do shape, parts...
460+
if isnothing(d.procgrid)
461+
options = Options(compute_scope=nothing)
462+
else
463+
scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)])
464+
options = Options(compute_scope=scope)
465+
end
466+
Dagger.spawn(options, shape, chunks...) do shape, parts...
461467
if prod(shape) == 0
462468
return Array{T}(undef, shape)
463469
end
@@ -472,11 +478,11 @@ function stage(ctx::Context, d::Distribute)
472478
#hash = uhash(c, Base.hash(Distribute, Base.hash(d.data)))
473479
c = d.domainchunks[I]
474480
if isnothing(d.procgrid)
475-
Dagger.@spawn identity(d.data[c])
481+
scope = nothing
476482
else
477483
scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)])
478-
Dagger.@spawn compute_scope=scope identity(d.data[c])
479484
end
485+
Dagger.@spawn compute_scope=scope identity(d.data[c])
480486
end
481487
end
482488
return DArray(eltype(d.data),
@@ -504,9 +510,9 @@ auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A))
504510
const AssignmentType{N} = Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}}
505511

506512
distribute(A::AbstractArray, assignment::AssignmentType = :arbitrary) = distribute(A, AutoBlocks(), assignment)
507-
function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N}
513+
function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N}
508514
procgrid = nothing
509-
availprocs = collect(Dagger.all_processors())
515+
availprocs = collect(Dagger.compatible_processors())
510516
sort!(availprocs, by = x -> (x.owner, x.tid))
511517
if assignment isa Symbol
512518
if assignment == :arbitrary
@@ -532,11 +538,11 @@ function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::Assignme
532538
end
533539
elseif assignment isa AbstractArray{<:Int, N}
534540
missingprocs = filter(p -> p procs(), assignment)
535-
isempty(missingprocs) || error("Missing processors: $missingprocs")
541+
isempty(missingprocs) || error("Specified workers are not available: $missingprocs")
536542
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
537543
elseif assignment isa AbstractArray{<:Processor, N}
538544
missingprocs = filter(p -> p availprocs, assignment)
539-
isempty(missingprocs) || error("Missing processors: $missingprocs")
545+
isempty(missingprocs) || error("Specified processors are not available: $missingprocs")
540546
procgrid = assignment
541547
end
542548

src/array/operators.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ function stage(ctx::Context, node::BCast{B,T,N}) where {B,T,N}
7777
end
7878
end |> Tuple
7979
dmn = DomainBlocks(ntuple(_->1, length(s)), splits)
80-
stage(ctx, Distribute(dmn, part, arg)).chunks
80+
stage(ctx, Distribute(dmn, part, arg, nothing)).chunks
8181
else
8282
arg
8383
end

0 commit comments

Comments
 (0)