Skip to content

Commit f9e1ad3

Browse files
authored
Merge pull request #97 from JuliaParallel/anj/copy
Allow different chunk sizes in the two arrays in copy!
2 parents 586e691 + 9ef6a8c commit f9e1ad3

File tree

2 files changed

+53
-8
lines changed

2 files changed

+53
-8
lines changed

src/core.jl

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,46 @@ function rr_localpart(ref, identity)
246246
return size(lp)
247247
end
248248

249+
function Base.serialize(S::AbstractSerializer, d::DArray)
250+
# Only send the ident for participating workers - we expect the DArray to exist in the
251+
# remote registry
252+
destpid = Base.worker_id_from_socket(S.io)
253+
Serializer.serialize_type(S, typeof(d))
254+
if (destpid in d.pids) || (destpid == d.identity[1])
255+
serialize(S, (true, d.identity)) # (identity_only, identity)
256+
else
257+
serialize(S, (false, d.identity))
258+
for n in [:dims, :pids, :indexes, :cuts]
259+
serialize(S, getfield(d, n))
260+
end
261+
end
262+
end
263+
264+
function Base.deserialize{T<:DArray}(S::AbstractSerializer, t::Type{T})
265+
what = deserialize(S)
266+
identity_only = what[1]
267+
identity = what[2]
268+
269+
if identity_only
270+
global registry
271+
if haskey(registry, (identity, :DARRAY))
272+
return registry[(identity, :DARRAY)]
273+
else
274+
# access to fields will throw an error, at least the deserialization process will not
275+
# result in worker death
276+
d = T()
277+
d.identity = identity
278+
return d
279+
end
280+
else
281+
# We are not a participating worker, deser fields and instantiate locally.
282+
dims = deserialize(S)
283+
pids = deserialize(S)
284+
indexes = deserialize(S)
285+
cuts = deserialize(S)
286+
return T(identity, dims, pids, indexes, cuts)
287+
end
288+
end
249289

250290
Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d))
251291
Base.similar(d::DArray, T::Type) = similar(d, T, size(d))
@@ -551,14 +591,10 @@ Base.getindex(d::DArray) = d[1]
551591
Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...)
552592

553593
Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin
554-
if !(size(dest) == size(src) &&
555-
procs(dest) == procs(src) &&
556-
dest.indexes == src.indexes &&
557-
dest.cuts == src.cuts)
558-
throw(DimensionMismatch("destination array doesn't fit to source array"))
559-
end
560-
@sync for p in procs(dest)
561-
@async remotecall_fetch((dest,src)->(copy!(localpart(dest), localpart(src)); nothing), p, dest, src)
594+
asyncmap(procs(dest)) do p
595+
remotecall_fetch(p) do
596+
localpart(dest)[:] = src[localindexes(dest)...]
597+
end
562598
end
563599
return dest
564600
end

src/linalg.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,15 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe
165165
return y
166166
end
167167

168+
function Base.LinAlg.scale!(x::AbstractVector, A::DMatrix)
169+
asyncmap(procs(A)) do p
170+
remotecall_fetch(p) do
171+
scale!(Array(x[localindexes(A)[1]]), localpart(A))
172+
nothing
173+
end
174+
end
175+
A
176+
end
168177

169178
# Level 3
170179
function _matmatmul!::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA)

0 commit comments

Comments
 (0)