Skip to content

Commit 018b9aa

Browse files
authored
Merge pull request #173 from JuliaParallel/vc/bcview
Introduce makelocal
2 parents d828092 + ec6eddc commit 018b9aa

File tree

5 files changed

+61
-19
lines changed

5 files changed

+61
-19
lines changed

src/broadcast.jl

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,14 @@ bcdistribute_args(args::Tuple{}) = ()
126126
# dropping axes here since recomputing is easier
127127
@inline bclocal(bc::Broadcasted{DArrayStyle{Style}}, idxs) where Style = Broadcasted{Style}(bc.f, bclocal_args(_bcview(axes(bc), idxs), bc.args))
128128

129-
# bclocal will do a view of the data and the copy it over, except
130-
# when the shard match precisly (TODO: make sure that the invariant holds more often)
129+
# bclocal will do a view of the data and the copy it over
130+
# except when the data already is local
131131
function bclocal(x::DArray{T, N, AT}, idxs) where {T, N, AT}
132132
bcidxs = _bcview(axes(x), idxs)
133-
lpidx = localpartindex(x)
134-
if lpidx != 0 && all(x.indices[lpidx] .== bcidxs)
135-
return localpart(x)
136-
end
137-
return convert(__type(AT), view(x, bcidxs...))
133+
makelocal(x, bcidxs...)
138134
end
139135
bclocal(x, idxs) = x
140136

141137
@inline bclocal_args(idxs, args::Tuple) = (bclocal(args[1], idxs), bclocal_args(idxs, tail(args))...)
142138
bclocal_args(idxs, args::Tuple{Any}) = (bclocal(args[1], idxs),)
143139
bclocal_args(idxs, args::Tuple{}) = ()
144-
145-
__type(T) = Base.typename(T).wrapper

src/darray.jl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,36 @@ end
334334

335335
localpart(d::DArray, localidx...) = localpart(d)[localidx...]
336336

337+
_localindex(i::Integer, offset) = i - offset
338+
_localindex(i::AbstractRange, offset) = (first(i)-offset):step(i):(last(i)-offset)
339+
_localindex(i::AbstractUnitRange, offset) = (first(i)-offset):(last(i)-offset)
340+
341+
"""
342+
makelocal(A::DArray, I...)
343+
344+
Equivalent to `Array(view(A, I...))` but optimised for the case that the data is local.
345+
Can return a view into `localpart(A)`
346+
"""
347+
function makelocal(A::DArray{<:Any, <:Any, AT}, I::Vararg{Any, N}) where {N, AT}
348+
Base.@_inline_meta
349+
J = map(i->Base.unalias(A, i), to_indices(A, I))
350+
J = map(j-> isa(j, Base.Slice) ? j.indices : j, J)
351+
@boundscheck checkbounds(A, J...)
352+
353+
lidcs = localindices(A)
354+
if Base.checkbounds_indices(Bool, lidcs, J)
355+
# data we want is local
356+
viewidcs = ntuple(i -> _localindex(J[i], first(lidcs[i]) - 1), ndims(A))
357+
view(localpart(A), viewidcs...)
358+
else
359+
# Make more efficient (?maybe) by allocating new memory
360+
# only for the remote part
361+
viewidcs = ntuple(i -> _localindex(J[i], 0), ndims(A))
362+
arr = similar(AT, map(length, viewidcs)...)
363+
copyto!(arr, view(A, viewidcs...))
364+
end
365+
end
366+
337367
# shortcut to set/get localparts of a distributed object
338368
function Base.getindex(d::DArray, s::Symbol)
339369
@assert s in [:L, :l, :LP, :lp]
@@ -539,6 +569,11 @@ function Array{S,N}(s::SubDArray{T,N}) where {S,T,N}
539569
end
540570
end
541571
a = Array{S}(undef, size(s))
572+
copyto!(a, s)
573+
end
574+
575+
function Base.copyto!(a::Array, s::SubDArray)
576+
N = ndims(a)
542577
a[[1:size(a,i) for i=1:N]...] = s
543578
return a
544579
end
@@ -615,12 +650,11 @@ function Base.isassigned(D::DArray, i::Integer...)
615650
end
616651
end
617652

618-
619653
function Base.copyto!(dest::SubOrDArray, src::AbstractArray)
620654
asyncmap(procs(dest)) do p
621655
remotecall_fetch(p) do
622656
ldest = localpart(dest)
623-
ldest[:] = Array(view(src, localindices(dest)...))
657+
copyto!(ldest, view(src, localindices(dest)...))
624658
end
625659
end
626660
return dest

src/linalg.jl

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,10 @@ function dot(x::DVector, y::DVector)
3838
if length(x) != length(y)
3939
throw(DimensionMismatch(""))
4040
end
41-
if (procs(x) != procs(y)) || (x.cuts != y.cuts)
42-
throw(ArgumentError("vectors don't have the same distribution. Not handled for efficiency reasons."))
43-
end
4441

4542
results=Any[]
46-
@sync begin
47-
for i = eachindex(x.pids)
48-
@async push!(results, remotecall_fetch((x, y, i) -> dot(localpart(x), fetch(y, i)), x.pids[i], x, y, i))
49-
end
43+
asyncmap(procs(x)) do p
44+
push!(results, remotecall_fetch((x, y) -> dot(localpart(x), makelocal(y, localindices(x)...)), x, y))
5045
end
5146
return reduce(+, results)
5247
end

src/mapreduce.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...)
88
function Base.map!(f::F, dest::DArray, src::DArray{<:Any,<:Any,A}) where {F,A}
99
asyncmap(procs(dest)) do p
1010
remotecall_fetch(p) do
11-
map!(f, localpart(dest), A(view(src, localindices(dest)...)))
11+
map!(f, localpart(dest), makelocal(src, localindices(dest)...))
1212
return nothing
1313
end
1414
end

test/darray.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,25 @@ end
712712

713713
check_leaks()
714714

715+
@testset "makelocal" begin
716+
A = randn(5*nprocs(), 5*nprocs())
717+
dA = distribute(A, procs=procs())
718+
for i in 1:size(dA, 2)
719+
a = DistributedArrays.makelocal(dA, :, i)
720+
@test all(Array(view(dA, :, i)) .== a)
721+
@test all( view( A, :, i) .== a)
722+
end
723+
for i in 1:size(dA, 1)
724+
a = DistributedArrays.makelocal(dA, i, :)
725+
@test all(Array(view(dA, i:i, :)) .== a)
726+
@test all( view( A, i:i, :) .== a)
727+
end
728+
a = DistributedArrays.makelocal(dA, 1:5, 1:5)
729+
@test all(Array(view(dA, 1:5, 1:5)) .== a)
730+
@test all( view( A, 1:5, 1:5) .== a)
731+
close(dA)
732+
end
733+
715734
@testset "test convert from subdarray" begin
716735
a = drand(20, 20);
717736

0 commit comments

Comments
 (0)