Skip to content

Commit 2d9397b

Browse files
authored
fixups in PartitionedSerializer. Handle case where ser/deser is not (#92)
called on the PartitionedSerializer object.
1 parent d823023 commit 2d9397b

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

src/DistributedArrays.jl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,7 +1523,7 @@ type PartitionedSerializer
15231523

15241524
PartitionedSerializer(obj, local_idxs::Tuple) = new(obj, Nullable{Array}(), Nullable{Array}(), local_idxs)
15251525
function PartitionedSerializer(obj, pids::Array, idxs::Array)
1526-
pas = new(obj,pids,idxs,Nullable{Array}())
1526+
pas = new(obj,pids,idxs,Nullable{Tuple}())
15271527

15281528
if myid() in pids
15291529
pas.local_idxs = idxs[findfirst(pids, myid())]
@@ -1540,16 +1540,22 @@ function Base.serialize(S::AbstractSerializer, pas::PartitionedSerializer)
15401540
serialize(S, I)
15411541
end
15421542

1543-
15441543
function Base.deserialize{T<:PartitionedSerializer}(S::AbstractSerializer, t::Type{T})
15451544
obj_part = deserialize(S)
15461545
I = deserialize(S)
15471546
return PartitionedSerializer(obj_part, I)
15481547
end
15491548

15501549
function verify_and_get(pas::PartitionedSerializer, I)
1551-
@assert I == get(pas.local_idxs, [])
1552-
return pas.indexable_obj
1550+
# Handle the special case where myid() is part of pas.pids.
1551+
# For this case serialize/deserialize is not called as the remotecall is executed locally
1552+
if myid() in get(pas.pids, [])
1553+
@assert I == get(pas.idxs)[findfirst(get(pas.pids),myid())]
1554+
return pas.indexable_obj[I...]
1555+
else
1556+
@assert I == get(pas.local_idxs, ())
1557+
return pas.indexable_obj
1558+
end
15531559
end
15541560

15551561
end # module

test/darray.jl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,26 @@ function check_leaks()
99
end
1010

1111
facts("test distribute") do
12-
A = randn(100,100)
12+
A = rand(1:100, (100,100))
1313

1414
context("test default distribute") do
1515
DA = distribute(A)
1616
@fact length(procs(DA)) --> nworkers()
17+
@fact sum(DA) --> sum(A)
1718
close(DA)
1819
end
1920

2021
context("test distribute with procs arguments") do
21-
DA = distribute(A, procs = [1, 2])
22-
@fact length(procs(DA)) --> 2
22+
DA = distribute(A, procs = procs())
23+
@fact length(procs(DA)) --> nprocs()
24+
@fact sum(DA) --> sum(A)
2325
close(DA)
2426
end
2527

2628
context("test distribute with procs and dist arguments") do
2729
DA = distribute(A, procs = [1, 2], dist = [1,2])
2830
@fact size(procs(DA)) --> (1,2)
31+
@fact sum(DA) --> sum(A)
2932
close(DA)
3033
end
3134

0 commit comments

Comments
 (0)