Skip to content

Commit cbc570b

Browse files
traktofonandreasnoack
authored andcommitted
WIP: make it work on julia-0.7
1 parent bc31263 commit cbc570b

File tree

9 files changed

+46
-60
lines changed

9 files changed

+46
-60
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ os:
33
- linux
44
- osx
55
julia:
6-
- 0.6
6+
- 0.7
77
- nightly
88
matrix:
99
# allow_failures:

src/DistributedArrays.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ using LinearAlgebra
88

99
import Base: +, -, *, div, mod, rem, &, |, xor
1010
import Base.Callable
11-
import LinearAlgebra.BLAS: axpy!
11+
import LinearAlgebra: axpy!, dot, norm,
1212

1313
import Primes
1414
import Primes: factor

src/darray.jl

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ mutable struct DArray{T,N,A} <: AbstractArray{T,N}
2828
pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i
2929
indices::Array{NTuple{N,UnitRange{Int}},N} # indices held by piece i
3030
cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d
31-
localpart::Union{A,Missing}
32-
31+
localpart::Union{A,Nothing}
3332
release::Bool
3433

3534
function DArray{T,N,A}(id, dims, pids, indices, cuts, lp) where {T,N,A}
@@ -64,7 +63,7 @@ function d_from_weakref_or_d(id)
6463
end
6564

6665
Base.eltype(::Type{DArray{T}}) where {T} = T
67-
empty_localpart(T,N,A) = A(Array{T}(ntuple(zero, N)))
66+
empty_localpart(T,N,A) = A(Array{T}(undef, ntuple(zero, N)))
6867

6968
const SubDArray{T,N,D<:DArray} = SubArray{T,N,D}
7069
const SubOrDArray{T,N} = Union{DArray{T,N}, SubDArray{T,N}}
@@ -148,7 +147,7 @@ function ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Ve
148147
d = registry[id]
149148
d = isa(d, WeakRef) ? d.value : d
150149
else
151-
d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, missing)
150+
d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, nothing)
152151
end
153152
d
154153
end
@@ -196,10 +195,10 @@ function DArray(refs)
196195
end
197196
end
198197

199-
nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(dimdist...)
198+
nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(undef, dimdist...)
200199

201200
for i in 1:length(nindices)
202-
subidx = CartesianIndices(dimdist)[i]
201+
subidx = CartesianIndices(dimdist)[i] #ind2sub(dimdist, i)
203202
nindices[i] = ntuple(length(subidx)) do x
204203
idx_in_dim = subidx[x]
205204
startidx = 1
@@ -243,7 +242,7 @@ DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indices,
243242

244243
sz_localpart_ref(ref, id) = size(fetch(ref))
245244

246-
Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array{T}(map(length,I)), dims, procs(d))
245+
Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array{T}(undef, map(length,I)), dims, procs(d))
247246
Base.similar(d::DArray, T::Type) = similar(d, T, size(d))
248247
Base.similar(d::DArray{T}, dims::Dims) where {T} = similar(d, T, dims)
249248
Base.similar(d::DArray{T}) where {T} = similar(d, T, size(d))
@@ -288,7 +287,7 @@ function defaultdist(sz::Int, nc::Int)
288287
if sz >= nc
289288
return round.(Int, range(1, stop=sz+1, length=nc+1))
290289
else
291-
return [[1:(sz+1);]; zeros(Int, nc-sz);]
290+
return [[1:(sz+1);]; zeros(Int, nc-sz)]
292291
end
293292
end
294293

@@ -488,7 +487,7 @@ function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N}
488487
end
489488
end
490489
a = Array{S}(undef, size(s))
491-
a[[1:size(a,i) for i=1:N]...] = s
490+
a[[1:size(a,i) for i=1:N]...] .= s
492491
return a
493492
end
494493

@@ -509,7 +508,7 @@ function Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array}
509508
d1offs = first(I[1])
510509
nd = length(I)
511510

512-
B = Array{T}(sz)
511+
B = Array{T}(undef, sz)
513512
nr = size(B,1)
514513
sztail = size(B)[2:end]
515514

@@ -565,14 +564,15 @@ function Base.isassigned(D::DArray, i::Integer...)
565564
end
566565

567566

568-
function Base.copy!(dest::SubOrDArray, src::SubOrDArray)
567+
Base.copyto!(dest::SubOrDArray, src::SubOrDArray) = begin
569568
asyncmap(procs(dest)) do p
570569
remotecall_fetch(p) do
571570
localpart(dest)[:] = src[localindices(dest)...]
572571
end
573572
end
574573
return dest
575574
end
575+
Base.copy!(dest::SubOrDArray, src::SubOrDArray) = copyto!(dest, src)
576576

577577
# local copies are obtained by convert(Array, ) or assigning from
578578
# a SubDArray to a local Array.

src/linalg.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D
221221
if tA == 'T'
222222
return transpose(localpart(A))*convert(localtype(B), Bjk)
223223
elseif tA == 'C'
224-
return localpart(A)'*convert(localtype(B), Bjk)
224+
return ctranspose(localpart(A))*convert(localtype(B), Bjk)
225225
else
226226
return localpart(A)*convert(localtype(B), Bjk)
227227
end

src/mapreduce.jl

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## higher-order functions ##
22

3+
import Base: +, -, div, mod, rem, &, |, xor
4+
35
Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...)
46

57
function Base.map!(f::F, dest::DArray, src::DArray) where {F}
@@ -12,22 +14,21 @@ function Base.map!(f::F, dest::DArray, src::DArray) where {F}
1214
return dest
1315
end
1416

15-
# Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray
16-
Base.BroadcastStyle(::Type{<:DArray}) = Broadcast.ArrayStyle{DArray}()
17+
#Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray
1718

18-
Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{DArray}) = DArray
19-
Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{Array}) = DArray
20-
Base.Broadcast.promote_containertype(::Type{DArray}, ct) = DArray
21-
Base.Broadcast.promote_containertype(::Type{Array}, ::Type{DArray}) = DArray
22-
Base.Broadcast.promote_containertype(ct, ::Type{DArray}) = DArray
19+
Base.BroadcastStyle(::Type{DArray}, ::Type{DArray}) = DArray
20+
Base.BroadcastStyle(::Type{DArray}, ::Type{Array}) = DArray
21+
Base.BroadcastStyle(::Type{DArray}, ct) = DArray
22+
#Base.Broadcast.promote_containertype(::Type{Array}, ::Type{DArray}) = DArray
23+
#Base.Broadcast.promote_containertype(ct, ::Type{DArray}) = DArray
2324

2425
Base.Broadcast.broadcast_indices(::Type{DArray}, A) = indices(A)
2526
Base.Broadcast.broadcast_indices(::Type{DArray}, A::Ref) = ()
2627

2728
# FixMe!
2829
## 1. Support for arbitrary indices including OneTo
2930
## 2. This is as type unstable as it can be. Overhead might not matter too much for DArrays though.
30-
function Base.Broadcast.broadcast_c(f, ::Type{DArray}, As...)
31+
function Base.broadcast(f, ::Type{DArray}, ::Nothing, ::Nothing, As...)
3132
T = Base.Broadcast._broadcast_eltype(f, As...)
3233
shape = Base.Broadcast.broadcast_indices(As...)
3334
iter = Base.CartesianIndices(shape)
@@ -89,7 +90,8 @@ function mapreducedim_within(f, op, A::DArray, region)
8990
gridsize = [size(A.indices)...]
9091
arraysize[[region...]] = gridsize[[region...]]
9192
indx = similar(A.indices)
92-
for i in CartesianIndices(size(indx))
93+
94+
for i in CartesianIndices(indx)
9395
indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indices[i][j], ndims(A))
9496
end
9597
cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)]
@@ -157,24 +159,6 @@ function Base.extrema(d::DArray)
157159
return reduce((t,s) -> (min(t[1], s[1]), max(t[2], s[2])), r)
158160
end
159161

160-
# mapreduce like
161-
# for (fn, fr1, fr2) in ((:abs, :max),
162-
# (:minabs, :abs, :min),
163-
# (:sumabs, :abs, :+),
164-
# (:sumabs2, :abs2, :+))
165-
# @eval (Base.$fn)(d::DArray) = mapreduce($fr1, $fr2, d)
166-
# end
167-
168-
# semi mapreduce
169-
# for (fn, fr) in ((:any, :|),
170-
# (:all, :&),
171-
# (:count, :+))
172-
# @eval begin
173-
# (Base.$fn)(f::typeof(identity), d::DArray) = mapreduce(f, $fr, d)
174-
# (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, $fr, d)
175-
# end
176-
# end
177-
178162
# Unary vector functions
179163
(-)(D::DArray) = map(-, D)
180164

@@ -361,5 +345,5 @@ function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D
361345
# the DArray constructor.
362346
sd = [size(D[1].pids)...]
363347
nd = remotecall_fetch((r)->ndims(fetch(r)), refs[1].where, refs[1])
364-
DArray(reshape(refs, tuple([sd[1:nd - 1]; sd[end];]...)))
348+
DArray(reshape(refs, tuple([sd[1:nd - 1]; sd[end]]...)))
365349
end

src/serialize.jl

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
function Serialization.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where {T,N,A}
22
# Only send the ident for participating workers - we expect the DArray to exist in the
33
# remote registry. DO NOT send the localpart.
4-
destpid = Distributed.worker_id_from_socket(S.io)
5-
Serializer.serialize_type(S, typeof(d))
4+
destpid = worker_id_from_socket(S.io)
5+
serialize_type(S, typeof(d))
66
if (destpid in d.pids) || (destpid == d.id[1])
77
serialize(S, (true, d.id)) # (id_only, id)
88
else
@@ -43,28 +43,28 @@ end
4343

4444
# Serialize only those parts of the object as required by the destination worker.
4545
mutable struct DestinationSerializer
46-
generate::Union{Function,Missing} # Function to generate the part to be serialized
47-
pids::Union{Array,Missing} # MUST have the same shape as the distribution
48-
49-
deser_obj::Union{Any,Missing} # Deserialized part
46+
generate::Union{Function,Nothing} # Function to generate the part to be serialized
47+
pids::Union{Array,Nothing} # MUST have the same shape as the distribution
48+
deser_obj::Any # Deserialized part
5049

5150
DestinationSerializer(f,p,d) = new(f,p,d)
5251
end
5352

54-
DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, missing)
53+
DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, nothing)
5554

5655
# contructs a DestinationSerializer after verifying that the shape of pids.
5756
function verified_destination_serializer(f::Function, pids::Array, verify_size)
5857
@assert size(pids) == verify_size
5958
return DestinationSerializer(f, pids)
6059
end
6160

62-
DestinationSerializer(deser_obj::Any) = DestinationSerializer(missing, missing, deser_obj)
61+
DestinationSerializer(deser_obj::Any) = DestinationSerializer(nothing, nothing, deser_obj)
6362

6463
function Serialization.serialize(S::AbstractSerializer, s::DestinationSerializer)
65-
pid = Distributed.worker_id_from_socket(S.io)
64+
pid = worker_id_from_socket(S.io)
6665
pididx = findfirst(isequal(pid), s.pids)
67-
Serialization.serialize_type(S, typeof(s))
66+
@assert pididx !== nothing
67+
serialize_type(S, typeof(s))
6868
serialize(S, s.generate(pididx))
6969
end
7070

@@ -75,9 +75,9 @@ end
7575

7676

7777
function localpart(s::DestinationSerializer)
78-
if !ismissing(s.deser_obj)
78+
if s.deser_obj !== nothing
7979
return s.deser_obj
80-
elseif !ismissing(s.generate) && (myid() in s.pids)
80+
elseif s.generate !== nothing && (myid() in s.pids)
8181
# Handle the special case where myid() is part of s.pids.
8282
# In this case serialize/deserialize is not called as the remotecall is executed locally
8383
return s.generate(findfirst(isequal(myid()), s.pids))

src/spmd.jl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ using Distributed
44

55
import DistributedArrays: gather, next_did, close
66
export sendto, recvfrom, recvfrom_any, barrier, bcast, scatter, gather
7-
export context_local_storage, context, spmd, close
7+
export context_local_storage, context, spmd
88

99

1010
mutable struct WorkerDataChannel
1111
pid::Int
12-
rc::Union{RemoteChannel,Missing}
12+
rc::Union{RemoteChannel,Nothing}
1313
lock::ReentrantLock
1414

15-
WorkerDataChannel(pid) = new(pid, missing, ReentrantLock())
15+
WorkerDataChannel(pid) = new(pid, nothing, ReentrantLock())
1616
end
1717

1818
mutable struct SPMDContext
@@ -61,7 +61,7 @@ const map_ctxts = Dict{Tuple, SPMDContext}()
6161
function get_dc(wc::WorkerDataChannel)
6262
lock(wc.lock)
6363
try
64-
if ismissing(wc.rc)
64+
if wc.rc === nothing
6565
if wc.pid == myid()
6666
myrc = RemoteChannel(()->Channel(typemax(Int)))
6767
wc.rc = myrc
@@ -254,7 +254,7 @@ function delete_ctxt_id(ctxt_id)
254254
nothing
255255
end
256256

257-
function close(ctxt::SPMDContext)
257+
function Base.close(ctxt::SPMDContext)
258258
for p in ctxt.pids
259259
Base.remote_do(delete_ctxt_id, p, ctxt.id)
260260
end

test/darray.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ check_leaks()
118118
end
119119

120120
@testset "test invalid use of @DArray" begin
121-
@test_throws ArgumentError eval(:((@DArray [1,2,3,4])))
121+
#@test_throws ArgumentError eval(:((@DArray [1,2,3,4])))
122+
@test_throws LoadError eval(:((@DArray [1,2,3,4])))
122123
end
123124
end
124125

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ end
1111
@everywhere using DistributedArrays
1212
@everywhere using DistributedArrays.SPMD
1313
@everywhere using Random
14+
@everywhere using LinearAlgebra
1415

1516
@everywhere srand(1234 + myid())
1617

0 commit comments

Comments
 (0)