Skip to content

Commit d823023

Browse files
authored
Merge pull request #90 from JuliaParallel/vc/refactor_constructor
Refactor constructor
2 parents 43fa380 + 802f940 commit d823023

File tree

3 files changed

+88
-46
lines changed

3 files changed

+88
-46
lines changed

src/DistributedArrays.jl

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ export close, darray_closeall
2828
const registry=Dict{Tuple, Any}()
2929
const refs=Set() # Collection of darray identities created on this node
3030

31+
let DID::Int = 1
32+
global next_did
33+
next_did() = (id = DID; DID += 1; (myid(), id))
34+
end
35+
36+
"""
37+
next_did()
38+
39+
Produces an incrementing ID that will be used for DArrays.
40+
"""
41+
next_did
3142

3243
"""
3344
DArray(init, dims, [procs, dist])
@@ -91,13 +102,49 @@ typealias SubOrDArray{T,N} Union{DArray{T,N}, SubDArray{T,N}}
91102

92103
## core constructors ##
93104

105+
function DArray(identity, init, dims, pids, idxs, cuts)
106+
r=Channel(1)
107+
@sync begin
108+
for i = 1:length(pids)
109+
@async begin
110+
local typA
111+
if isa(init, Function)
112+
typA=remotecall_fetch(construct_localparts, pids[i], init, identity, dims, pids, idxs, cuts)
113+
else
114+
# constructing from an array of remote refs.
115+
typA=remotecall_fetch(construct_localparts, pids[i], init[i], identity, dims, pids, idxs, cuts)
116+
end
117+
!isready(r) && put!(r, typA)
118+
end
119+
end
120+
end
121+
122+
typA = take!(r)
123+
if myid() in pids
124+
d = registry[(identity, :DARRAY)]
125+
else
126+
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
127+
end
128+
d
129+
end
130+
131+
function construct_localparts(init, identity, dims, pids, idxs, cuts)
132+
A = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init)
133+
global registry
134+
registry[(identity, :LOCALPART)] = A
135+
typA = typeof(A)
136+
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
137+
registry[(identity, :DARRAY)] = d
138+
typA
139+
end
140+
94141
function DArray(init, dims, procs, dist)
95142
np = prod(dist)
96143
procs = reshape(procs[1:np], ntuple(i->dist[i], length(dist)))
97144
idxs, cuts = chunk_idxs([dims...], dist)
98145
identity = next_did()
99146

100-
return construct_darray(identity, init, dims, procs, idxs, cuts)
147+
return DArray(identity, init, dims, procs, idxs, cuts)
101148
end
102149

103150
function DArray(init, dims, procs)
@@ -146,7 +193,7 @@ function DArray(refs)
146193
ncuts = Array{Int,1}[unshift!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)]
147194
ndims = tuple([sort(unique(lastidxs[x,:]))[end]-1 for x in 1:length(dimdist)]...)
148195

149-
construct_darray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts)
196+
DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts)
150197
end
151198
if VERSION < v"0.5.0-"
152199
macro DArray(ex::Expr)
@@ -185,48 +232,7 @@ else
185232
end
186233

187234
# new DArray similar to an existing one
188-
DArray(init, d::DArray) = construct_darray(next_did(), init, size(d), procs(d), d.indexes, d.cuts)
189-
190-
function construct_darray(identity, init, dims, pids, idxs, cuts)
191-
r=Channel(1)
192-
@sync begin
193-
for i = 1:length(pids)
194-
@async begin
195-
local typA
196-
if isa(init, Function)
197-
typA=remotecall_fetch(construct_localparts, pids[i], init, identity, dims, pids, idxs, cuts)
198-
else
199-
# constructing from an array of remote refs.
200-
typA=remotecall_fetch(construct_localparts, pids[i], init[i], identity, dims, pids, idxs, cuts)
201-
end
202-
!isready(r) && put!(r, typA)
203-
end
204-
end
205-
end
206-
207-
typA = take!(r)
208-
if myid() in pids
209-
d = registry[(identity, :DARRAY)]
210-
else
211-
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
212-
end
213-
d
214-
end
215-
216-
function construct_localparts(init, identity, dims, pids, idxs, cuts)
217-
A = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init)
218-
global registry
219-
registry[(identity, :LOCALPART)] = A
220-
typA = typeof(A)
221-
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
222-
registry[(identity, :DARRAY)] = d
223-
typA
224-
end
225-
226-
let DID::Int = 1
227-
global next_did
228-
next_did() = (id = DID; DID += 1; (myid(), id))
229-
end
235+
DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indexes, d.cuts)
230236

231237
function release_localpart(identity)
232238
global registry
@@ -523,6 +529,25 @@ function distribute(A::AbstractArray;
523529
return DArray(I->verify_and_get(pas, I), size(A), procs, dist)
524530
end
525531

532+
"""
533+
distribute(A, DA)
534+
535+
Distribute a local array `A` like the distributed array `DA`.
536+
537+
"""
538+
function distribute(A::AbstractArray, DA::DArray)
539+
size(DA) == size(A) || throw(DimensionMismatch("Distributed array has size $(size(DA)) but array has $(size(A))"))
540+
541+
owner = myid()
542+
rr = RemoteChannel()
543+
put!(rr, A)
544+
545+
d = DArray(DA) do I
546+
remotecall_fetch(() -> fetch(rr)[I...], owner)
547+
end
548+
return d
549+
end
550+
526551
Base.convert{T,N,S<:AbstractArray}(::Type{DArray{T,N,S}}, A::S) = distribute(convert(AbstractArray{T,N}, A))
527552

528553
Base.convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) = begin
@@ -743,7 +768,7 @@ mapreducedim_within(f, op, A::DArray, region) = begin
743768
indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indexes[i][j], ndims(A))
744769
end
745770
cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)]
746-
return construct_darray(next_did(), I -> mapreducedim(f, op, localpart(A), region),
771+
return DArray(next_did(), I -> mapreducedim(f, op, localpart(A), region),
747772
tuple(arraysize...), procs(A), indx, cuts)
748773
end
749774

test/darray.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,23 @@ facts("test distribute") do
2828
@fact size(procs(DA)) --> (1,2)
2929
close(DA)
3030
end
31+
32+
context("Create darray with unconventional distribution and distibute like it") do
33+
block = 10
34+
Y = nworkers() * block
35+
X = nworkers() * block
36+
remote_parts = map(workers()) do wid
37+
remotecall(rand, wid, block, Y)
38+
end
39+
DA1 = DArray(reshape(remote_parts, (length(remote_parts), 1)))
40+
A = rand(X, Y)
41+
DA2 = distribute(A, DA1)
42+
43+
@fact size(DA1) --> size(DA2)
44+
45+
close(DA1)
46+
close(DA2)
47+
end
3148
end
3249

3350
check_leaks()

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using DistributedArrays
1515
using StatsBase # for fit(Histogram, ...)
1616
@everywhere using StatsBase # because exported functions are not exported on workers with using
1717

18-
@everywhere srand(123 + myid())
18+
@everywhere srand(1234 + myid())
1919

2020
include("darray.jl")
2121

0 commit comments

Comments
 (0)