@@ -246,6 +246,46 @@ function rr_localpart(ref, identity)
246
246
return size (lp)
247
247
end
248
248
249
+ function Base. serialize (S:: AbstractSerializer , d:: DArray )
250
+ # Only send the ident for participating workers - we expect the DArray to exist in the
251
+ # remote registry
252
+ destpid = Base. worker_id_from_socket (S. io)
253
+ Serializer. serialize_type (S, typeof (d))
254
+ if (destpid in d. pids) || (destpid == d. identity[1 ])
255
+ serialize (S, (true , d. identity)) # (identity_only, identity)
256
+ else
257
+ serialize (S, (false , d. identity))
258
+ for n in [:dims , :pids , :indexes , :cuts ]
259
+ serialize (S, getfield (d, n))
260
+ end
261
+ end
262
+ end
263
+
264
+ function Base. deserialize {T<:DArray} (S:: AbstractSerializer , t:: Type{T} )
265
+ what = deserialize (S)
266
+ identity_only = what[1 ]
267
+ identity = what[2 ]
268
+
269
+ if identity_only
270
+ global registry
271
+ if haskey (registry, (identity, :DARRAY ))
272
+ return registry[(identity, :DARRAY )]
273
+ else
274
+ # access to fields will throw an error, at least the deserialization process will not
275
+ # result in worker death
276
+ d = T ()
277
+ d. identity = identity
278
+ return d
279
+ end
280
+ else
281
+ # We are not a participating worker, deser fields and instantiate locally.
282
+ dims = deserialize (S)
283
+ pids = deserialize (S)
284
+ indexes = deserialize (S)
285
+ cuts = deserialize (S)
286
+ return T (identity, dims, pids, indexes, cuts)
287
+ end
288
+ end
249
289
250
290
Base. similar (d:: DArray , T:: Type , dims:: Dims ) = DArray (I-> Array (T, map (length,I)), dims, procs (d))
251
291
Base. similar (d:: DArray , T:: Type ) = similar (d, T, size (d))
@@ -551,14 +591,10 @@ Base.getindex(d::DArray) = d[1]
551
591
Base. getindex (d:: DArray , I:: Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}} ...) = view (d, I... )
552
592
553
593
Base. copy! (dest:: SubOrDArray , src:: SubOrDArray ) = begin
554
- if ! (size (dest) == size (src) &&
555
- procs (dest) == procs (src) &&
556
- dest. indexes == src. indexes &&
557
- dest. cuts == src. cuts)
558
- throw (DimensionMismatch (" destination array doesn't fit to source array" ))
559
- end
560
- @sync for p in procs (dest)
561
- @async remotecall_fetch ((dest,src)-> (copy! (localpart (dest), localpart (src)); nothing ), p, dest, src)
594
+ asyncmap (procs (dest)) do p
595
+ remotecall_fetch (p) do
596
+ localpart (dest)[:] = src[localindexes (dest)... ]
597
+ end
562
598
end
563
599
return dest
564
600
end
0 commit comments