Skip to content

Commit 43fa380

Browse files
authored
Merge pull request #91 from JuliaParallel/amitm/optdist
Implement a partitioned distributor of indexables. Optimize distribute.
2 parents 1bf03c6 + 180fc3c commit 43fa380

File tree

1 file changed

+42
-7
lines changed

1 file changed

+42
-7
lines changed

src/DistributedArrays.jl

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -517,14 +517,10 @@ Convert a local array to distributed.
517517
function distribute(A::AbstractArray;
518518
procs = workers()[1:min(nworkers(), maximum(size(A)))],
519519
dist = defaultdist(size(A), procs))
520+
idxs, _ = chunk_idxs([size(A)...], dist)
520521

521-
owner = myid()
522-
rr = RemoteChannel()
523-
put!(rr, A)
524-
d = DArray(size(A), procs, dist) do I
525-
remotecall_fetch(() -> fetch(rr)[I...], owner)
526-
end
527-
return d
522+
pas = PartitionedSerializer(A, procs, idxs)
523+
return DArray(I->verify_and_get(pas, I), size(A), procs, dist)
528524
end
529525

530526
Base.convert{T,N,S<:AbstractArray}(::Type{DArray{T,N,S}}, A::S) = distribute(convert(AbstractArray{T,N}, A))
@@ -1491,5 +1487,44 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...)
14911487
return DArray(local_sorted_refs)
14921488
end
14931489

1490+
# Serialize only those parts of the object as required by the destination worker.
1491+
type PartitionedSerializer
1492+
indexable_obj # An indexable object, Array, SparseMatrix, etc.
1493+
# Complete object on the serializing side.
1494+
# Part object on the deserialized side.
1495+
pids::Nullable{Array}
1496+
idxs::Nullable{Array}
1497+
local_idxs::Nullable{Tuple}
1498+
1499+
PartitionedSerializer(obj, local_idxs::Tuple) = new(obj, Nullable{Array}(), Nullable{Array}(), local_idxs)
1500+
function PartitionedSerializer(obj, pids::Array, idxs::Array)
1501+
pas = new(obj,pids,idxs,Nullable{Array}())
1502+
1503+
if myid() in pids
1504+
pas.local_idxs = idxs[findfirst(pids, myid())]
1505+
end
1506+
return pas
1507+
end
1508+
end
1509+
1510+
function Base.serialize(S::AbstractSerializer, pas::PartitionedSerializer)
1511+
pid = Base.worker_id_from_socket(S.io)
1512+
I = get(pas.idxs)[findfirst(get(pas.pids), pid)]
1513+
Serializer.serialize_type(S, typeof(pas))
1514+
serialize(S, pas.indexable_obj[I...])
1515+
serialize(S, I)
1516+
end
1517+
1518+
1519+
function Base.deserialize{T<:PartitionedSerializer}(S::AbstractSerializer, t::Type{T})
1520+
obj_part = deserialize(S)
1521+
I = deserialize(S)
1522+
return PartitionedSerializer(obj_part, I)
1523+
end
1524+
1525+
function verify_and_get(pas::PartitionedSerializer, I)
1526+
@assert I == get(pas.local_idxs, [])
1527+
return pas.indexable_obj
1528+
end
14941529

14951530
end # module

0 commit comments

Comments
 (0)