Skip to content

Commit d64c7a9

Browse files
authored
minor cleanup (#111)
1 parent 9018902 commit d64c7a9

File tree

3 files changed

+155
-149
lines changed

3 files changed

+155
-149
lines changed

src/core.jl

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,30 @@ dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
3838
```
3939
"""
4040
type DArray{T,N,A} <: AbstractArray{T,N}
41-
identity::Tuple
41+
id::Tuple
4242
dims::NTuple{N,Int}
4343
pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i
4444
indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i
4545
cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d
46+
localpart::A
4647

4748
release::Bool
4849

49-
function DArray(identity, dims, pids, indexes, cuts)
50+
function DArray(id, dims, pids, indexes, cuts, lp)
5051
# check invariants
5152
if dims != map(last, last(indexes))
5253
throw(ArgumentError("dimension of DArray (dim) and indexes do not match"))
5354
end
54-
release = (myid() == identity[1])
55+
release = (myid() == id[1])
5556

56-
global registry
57-
haskey(registry, (identity, :DARRAY)) && return registry[(identity, :DARRAY)]
57+
haskey(registry, id) && return registry[id]
5858

59-
d = new(identity, dims, pids, indexes, cuts, release)
59+
d = new(id, dims, pids, indexes, cuts, lp, release)
6060
if release
61-
push!(refs, identity)
62-
registry[(identity, :DARRAY)] = d
61+
push!(refs, id)
62+
registry[id] = d
6363

64-
# println("Installing finalizer for : ", d.identity, ", : ", object_id(d), ", isbits: ", isbits(d))
64+
# println("Installing finalizer for : ", d.id, ", : ", object_id(d), ", isbits: ", isbits(d))
6565
finalizer(d, close)
6666
end
6767
d
@@ -70,6 +70,9 @@ type DArray{T,N,A} <: AbstractArray{T,N}
7070
DArray() = new()
7171
end
7272

73+
eltype{T}(::Type{DArray{T}}) = T
74+
empty_localpart(T,N,A) = convert(A, Array(T, ntuple(zero, N)))
75+
7376
typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D}
7477
typealias SubOrDArray{T,N} Union{DArray{T,N}, SubDArray{T,N}}
7578

@@ -79,49 +82,51 @@ localtype(A::AbstractArray) = typeof(A)
7982

8083
## core constructors ##
8184

82-
function DArray(identity, init, dims, pids, idxs, cuts)
85+
function DArray(id, init, dims, pids, idxs, cuts)
8386
r=Channel(1)
8487
@sync begin
8588
for i = 1:length(pids)
8689
@async begin
8790
local typA
8891
if isa(init, Function)
89-
typA=remotecall_fetch(construct_localparts, pids[i], init, identity, dims, pids, idxs, cuts)
92+
typA=remotecall_fetch(construct_localparts, pids[i], init, id, dims, pids, idxs, cuts)
9093
else
9194
# constructing from an array of remote refs.
92-
typA=remotecall_fetch(construct_localparts, pids[i], init[i], identity, dims, pids, idxs, cuts)
95+
typA=remotecall_fetch(construct_localparts, pids[i], init[i], id, dims, pids, idxs, cuts)
9396
end
9497
!isready(r) && put!(r, typA)
9598
end
9699
end
97100
end
98101

99-
typA = take!(r)
102+
A = take!(r)
100103
if myid() in pids
101-
d = registry[(identity, :DARRAY)]
104+
d = registry[id]
102105
else
103-
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
106+
T = eltype(A)
107+
N = length(dims)
108+
d = DArray{T,N,A}(id, dims, pids, idxs, cuts, empty_localpart(T,N,A))
104109
end
105110
d
106111
end
107112

108-
function construct_localparts(init, identity, dims, pids, idxs, cuts)
109-
A = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init)
110-
global registry
111-
registry[(identity, :LOCALPART)] = A
112-
typA = typeof(A)
113-
d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts)
114-
registry[(identity, :DARRAY)] = d
115-
typA
113+
function construct_localparts(init, id, dims, pids, idxs, cuts)
114+
localpart = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init)
115+
A = typeof(localpart)
116+
T = eltype(A)
117+
N = length(dims)
118+
d = DArray{T,N,A}(id, dims, pids, idxs, cuts, localpart)
119+
registry[id] = d
120+
A
116121
end
117122

118123
function DArray(init, dims, procs, dist)
119124
np = prod(dist)
120125
procs = reshape(procs[1:np], ntuple(i->dist[i], length(dist)))
121126
idxs, cuts = chunk_idxs([dims...], dist)
122-
identity = next_did()
127+
id = next_did()
123128

124-
return DArray(identity, init, dims, procs, idxs, cuts)
129+
return DArray(id, init, dims, procs, idxs, cuts)
125130
end
126131

127132
function DArray(init, dims, procs)
@@ -140,13 +145,13 @@ DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(), maximum(dims
140145
# FIXME : Empty parts are currently not supported.
141146
function DArray(refs)
142147
dimdist = size(refs)
143-
identity = next_did()
148+
id = next_did()
144149

145150
npids = [r.where for r in refs]
146151
nsizes = Array(Tuple, dimdist)
147152
@sync for i in 1:length(refs)
148153
let i=i
149-
@async nsizes[i] = remotecall_fetch(rr_localpart, npids[i], refs[i], identity)
154+
@async nsizes[i] = remotecall_fetch(sz_localpart_ref, npids[i], refs[i], id)
150155
end
151156
end
152157

@@ -170,7 +175,7 @@ function DArray(refs)
170175
ncuts = Array{Int,1}[unshift!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)]
171176
ndims = tuple([sort(unique(lastidxs[x,:]))[end]-1 for x in 1:length(dimdist)]...)
172177

173-
DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts)
178+
DArray(id, refs, ndims, reshape(npids, dimdist), nindexes, ncuts)
174179
end
175180

176181
macro DArray(ex0::Expr)
@@ -195,57 +200,44 @@ end
195200
# new DArray similar to an existing one
196201
DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indexes, d.cuts)
197202

198-
function release_localpart(identity)
199-
global registry
200-
delete!(registry, (identity, :DARRAY))
201-
delete!(registry, (identity, :LOCALPART))
202-
nothing
203-
end
204-
release_localpart(d::DArray) = release_localpart(d.identity)
203+
release_localpart(id) = (delete!(registry, id); nothing)
204+
release_localpart(d::DArray) = release_localpart(d.id)
205205

206-
function close_by_identity(identity, pids)
207-
# @schedule println("Finalizer for : ", identity)
206+
function close_by_id(id, pids)
207+
# @schedule println("Finalizer for : ", id)
208208
global refs
209209
@sync begin
210210
for p in pids
211-
@async remotecall_fetch(release_localpart, p, identity)
211+
@async remotecall_fetch(release_localpart, p, id)
212212
end
213213
if !(myid() in pids)
214-
release_localpart(identity)
214+
release_localpart(id)
215215
end
216216
end
217-
delete!(refs, identity)
217+
delete!(refs, id)
218218
nothing
219219
end
220220

221221
function close(d::DArray)
222-
# @schedule println("close : ", d.identity, ", object_id : ", object_id(d), ", myid : ", myid() )
223-
if (myid() == d.identity[1]) && d.release
224-
@schedule close_by_identity(d.identity, d.pids)
222+
# @schedule println("close : ", d.id, ", object_id : ", object_id(d), ", myid : ", myid() )
223+
if (myid() == d.id[1]) && d.release
224+
@schedule close_by_id(d.id, d.pids)
225225
d.release = false
226226
end
227227
nothing
228228
end
229229

230230
function darray_closeall()
231-
global registry
232-
global refs
233231
crefs = copy(refs)
234-
for identity in crefs
235-
if identity[1] == myid() # sanity check
236-
haskey(registry, (identity, :DARRAY)) && close(registry[(identity, :DARRAY)])
232+
for id in crefs
233+
if id[1] == myid() # sanity check
234+
haskey(registry, id) && close(registry[id])
237235
yield()
238236
end
239237
end
240238
end
241239

242-
function rr_localpart(ref, identity)
243-
global registry
244-
lp = fetch(ref)
245-
registry[(identity, :LOCALPART)] = lp
246-
return size(lp)
247-
end
248-
240+
sz_localpart_ref(ref, id) = size(fetch(ref))
249241

250242
Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d))
251243
Base.similar(d::DArray, T::Type) = similar(d, T, size(d))
@@ -335,11 +327,10 @@ Returns an empty array if no local part exists on the calling process.
335327
function localpart{T,N,A}(d::DArray{T,N,A})
336328
lpidx = localpartindex(d)
337329
if lpidx == 0
338-
return convert(A, Array(T, ntuple(zero, N)))::A
330+
return empty_localpart(T,N,A)::A
339331
end
340332

341-
global registry
342-
return registry[(d.identity, :LOCALPART)]::A
333+
return registry[d.id].localpart::A
343334
end
344335

345336
localpart(d::DArray, localidx...) = localpart(d)[localidx...]

src/serialize.jl

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
1-
function Base.serialize(S::AbstractSerializer, d::DArray)
1+
function Base.serialize{T,N,A}(S::AbstractSerializer, d::DArray{T,N,A})
22
# Only send the ident for participating workers - we expect the DArray to exist in the
3-
# remote registry
3+
# remote registry. DO NOT send the localpart.
44
destpid = Base.worker_id_from_socket(S.io)
55
Serializer.serialize_type(S, typeof(d))
6-
if (destpid in d.pids) || (destpid == d.identity[1])
7-
serialize(S, (true, d.identity)) # (identity_only, identity)
6+
if (destpid in d.pids) || (destpid == d.id[1])
7+
serialize(S, (true, d.id)) # (id_only, id)
88
else
9-
serialize(S, (false, d.identity))
9+
serialize(S, (false, d.id))
1010
for n in [:dims, :pids, :indexes, :cuts]
1111
serialize(S, getfield(d, n))
1212
end
13+
serialize(S, A)
1314
end
1415
end
1516

16-
function Base.deserialize{T<:DArray}(S::AbstractSerializer, t::Type{T})
17+
function Base.deserialize{DT<:DArray}(S::AbstractSerializer, t::Type{DT})
1718
what = deserialize(S)
18-
identity_only = what[1]
19-
identity = what[2]
19+
id_only = what[1]
20+
id = what[2]
2021

21-
if identity_only
22-
global registry
23-
if haskey(registry, (identity, :DARRAY))
24-
return registry[(identity, :DARRAY)]
22+
if id_only
23+
if haskey(registry, id)
24+
return registry[id]
2525
else
2626
# access to fields will throw an error, at least the deserialization process will not
2727
# result in worker death
28-
d = T()
29-
d.identity = identity
28+
d = DT()
29+
d.id = id
3030
return d
3131
end
3232
else
@@ -35,7 +35,10 @@ function Base.deserialize{T<:DArray}(S::AbstractSerializer, t::Type{T})
3535
pids = deserialize(S)
3636
indexes = deserialize(S)
3737
cuts = deserialize(S)
38-
return T(identity, dims, pids, indexes, cuts)
38+
A = deserialize(S)
39+
T=eltype(DT)
40+
N=length(dims)
41+
return DT(id, dims, pids, indexes, cuts, empty_localpart(T,N,A))
3942
end
4043
end
4144

0 commit comments

Comments
 (0)