Skip to content

Commit 1201e25

Browse files
authored
updated distribute to use PartitionedSerializer. (#93)
Added variant of map_localparts with local arrays
1 parent 5eedeb1 commit 1201e25

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

src/DistributedArrays.jl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -538,14 +538,8 @@ Distribute a local array `A` like the distributed array `DA`.
538538
function distribute(A::AbstractArray, DA::DArray)
539539
size(DA) == size(A) || throw(DimensionMismatch("Distributed array has size $(size(DA)) but array has $(size(A))"))
540540

541-
owner = myid()
542-
rr = RemoteChannel()
543-
put!(rr, A)
544-
545-
d = DArray(DA) do I
546-
remotecall_fetch(() -> fetch(rr)[I...], owner)
547-
end
548-
return d
541+
pas = PartitionedSerializer(A, procs(DA), DA.indexes)
542+
return DArray(I->verify_and_get(pas, I), DA)
549543
end
550544

551545
Base.convert{T,N,S<:AbstractArray}(::Type{DArray{T,N,S}}, A::S) = distribute(convert(AbstractArray{T,N}, A))
@@ -870,6 +864,21 @@ map_localparts(f::Callable, d::DArray) = DArray(i->f(localpart(d)), d)
870864
map_localparts(f::Callable, d1::DArray, d2::DArray) = DArray(d1) do I
871865
f(localpart(d1), localpart(d2))
872866
end
867+
868+
function map_localparts(f::Callable, DA::DArray, A::Array)
869+
pas = PartitionedSerializer(A, procs(DA), DA.indexes)
870+
DArray(DA) do I
871+
f(localpart(DA), verify_and_get(pas, I))
872+
end
873+
end
874+
875+
function map_localparts(f::Callable, A::Array, DA::DArray)
876+
pas = PartitionedSerializer(A, procs(DA), DA.indexes)
877+
DArray(DA) do I
878+
f(verify_and_get(pas, I), localpart(DA))
879+
end
880+
end
881+
873882
function map_localparts!(f::Callable, d::DArray)
874883
@sync for p in procs(d)
875884
@async remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d)

0 commit comments

Comments
 (0)