Skip to content

Commit c5b8afc

Browse files
authored
added ddata and spmd modes (#112)
1 parent 96b872d commit c5b8afc

File tree

8 files changed

+1384
-683
lines changed

8 files changed

+1384
-683
lines changed

README.md

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,14 @@ equal the number of processes.
6868

6969
* `distribute(a::Array)` converts a local array to a distributed array.
7070

71-
* `localpart(a::DArray)` obtains the locally-stored portion
71+
* `localpart(d::DArray)` obtains the locally-stored portion
7272
of a `DArray`.
7373

74+
* Localparts can be retrived and set via the indexing syntax too.
75+
Indexing via symbols is used for this, specifically symbols `:L`,`:LP`,`:l`,`:lp` which
76+
are all equivalent. For example, `d[:L]` returns the localpart of `d`
77+
while `d[:L]=v` sets `v` as the localpart of `d`.
78+
7479
* `localindexes(a::DArray)` gives a tuple of the index ranges owned by the
7580
local process.
7681

@@ -238,3 +243,135 @@ a reference to a DArray object on the creating process for as long as it is bein
238243

239244
`darray_closeall()` is another useful function to manage distributed memory. It releases all darrays created from
240245
the calling process, including any temporaries created during computation.
246+
247+
Working with distributed non-array data
248+
---------------------------------------
249+
250+
The function `ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Vector=[])` can be used
251+
to created a distributed vector whose localparts need not be Arrays.
252+
253+
It returns a `DArray{T,1,T}`, i.e., the element type and localtype of the array are the same.
254+
255+
`ddata()` constructs a distributed vector of length `nworkers()` where each localpart can hold any value,
256+
initially initialized to `nothing`.
257+
258+
Argument `data` if supplied is distributed over the `pids`. `length(data)` must be a multiple of `length(pids)`.
259+
If the multiple is 1, returns a `DArray{T,1,T}` where T is `eltype(data)`. If the multiple is greater than 1,
260+
returns a `DArray{T,1,Array{T,1}}`, i.e., it is equivalent to calling `distribute(data)`.
261+
262+
`gather{T}(d::DArray{T,1,T})` returns an Array{T,1} consisting of all distributed elements of `d`
263+
264+
Given a `DArray{T,1,T}` object `d`, `d[:L]` returns the localpart on a worker. `d[i]` returns the `localpart`
265+
on the ith worker that `d` is distributed over.
266+
267+
SPMD Mode (An MPI Style SPMD mode with MPI like primitives)
268+
------------------------------------------------------------
269+
270+
We can easily run the same block of code on all workers in an SPMD mode using the `spmd` function.
271+
272+
```
273+
# define foo() on all workers
274+
@everywhere function foo(arg1, arg2)
275+
....
276+
end
277+
278+
# call foo() everywhere using the `spmd` function
279+
d_in=DArray(.....)
280+
d_out=ddata()
281+
spmd(foo,d_in,d_out; pids=workers()) # executes on all workers
282+
```
283+
284+
`spmd` is defined as `spmd(f, args...; pids=procs(), context=nothing)`
285+
286+
`args` is one or more arguments to be passed to `f`. `pids` identifies the workers
287+
that `f` needs to be run on. `context` identifies a run context, which is explained
288+
later.
289+
290+
The following primitives can be used in SPMD mode.
291+
292+
`sendto(pid, data; tag=nothing)` - sends `data` to `pid`
293+
`recvfrom(pid; tag=nothing)` - receives data from `pid`
294+
`recvfrom_any(; tag=nothing)` - receives data from any `pid`
295+
`barrier(;pids=procs(), tag=nothing)` - all tasks wait and then proceeed
296+
`bcast(data, pid; tag=nothing, pids=procs())` - broadcasts the same data over `pids` from `pid`
297+
`scatter(x, pid; tag=nothing, pids=procs())` - distributes `x` over `pids` from `pid`
298+
`gather(x, pid; tag=nothing, pids=procs())` - collects data from `pids` onto worker `pid`
299+
300+
Tag `tag` should be used to differentiate between consecutive calls of the same type, for example,
301+
consecutive `bcast` calls.
302+
303+
`spmd` and spmd related functions are defined in submodule `DistributedArrays.SPMD`. You will need to
304+
import it explcitly, or prefix functions that can can only be used in spmd mode with `SPMD.`, for example,
305+
`SPMD.sendto`.
306+
307+
NOTE: It is not a good idea to instantiate `DArray` objects within an SPMD function/block, as this will
308+
result in `N` copies of the the object. Similarly calling `@everywhere` or `spmd` from within a an SPMD
309+
function/block will result in `N*N` parallel runs. In SPMD mode the function/block is executed concurrently
310+
on all workers.
311+
312+
Example
313+
-------
314+
315+
This toy example exchanges data with each of its neighbors `n` times.
316+
317+
```
318+
using DistributedArrays
319+
addprocs(8)
320+
@everywhere importall DistributedArrays
321+
@everywhere importall DistributedArrays.SPMD
322+
323+
d_in=d=DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1])
324+
d_out=ddata()
325+
326+
# define the function everywhere
327+
@everywhere function foo_spmd(d_in, d_out, n)
328+
pids = sort(vec(procs(d_in)))
329+
pididx = findfirst(pids, myid())
330+
mylp = d_in[:L]
331+
localsum = 0
332+
333+
# Have each worker exchange data with its neighbors
334+
n_pididx = pididx+1 > length(pids) ? 1 : pididx+1
335+
p_pididx = pididx-1 < 1 ? length(pids) : pididx-1
336+
337+
for i in 1:n
338+
sendto(pids[n_pididx], mylp[2])
339+
sendto(pids[p_pididx], mylp[1])
340+
341+
mylp[2] = recvfrom(pids[p_pididx])
342+
mylp[1] = recvfrom(pids[n_pididx])
343+
344+
barrier(;pids=pids)
345+
localsum = localsum + mylp[1] + mylp[2]
346+
end
347+
348+
# finally store the sum in d_out
349+
d_out[:L] = localsum
350+
end
351+
352+
# run foo_spmd on all workers
353+
spmd(foo_spmd, d_in, d_out, 10)
354+
355+
# print values of d_in and d_out after the run
356+
println(d_in)
357+
println(d_out)
358+
```
359+
360+
SPMD Context
361+
------------
362+
363+
Each SPMD run is implictly executed in a different context. This allows for multiple `spmd` calls to
364+
be active at the same time. A SPMD context can be explicitly specified via keyword arg `context` to `spmd`.
365+
366+
`context(pids=procs())` returns a new SPMD context.
367+
368+
A SPMD context also provides a context local storage, a dict, which can be used to store
369+
key-value pairs between spmd runs under the same context.
370+
371+
`context_local_storage()` returns the dictionary associated with the context.
372+
373+
NOTE: Implicitly defined contexts, i.e., `spmd` calls without specifying a `context` create a context
374+
which live only for the duration of the call. Explictly created context objects can be released
375+
early by calling `close(stxt::SPMDContext)`. This will release the local storage dictionaries
376+
on all participating `pids`. Else they will be released when the context object is gc'ed
377+
on the node that created it.

src/DistributedArrays.jl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,24 @@ importall Base
99
import Base.Callable
1010
import Base.BLAS: axpy!
1111

12+
# DArray exports
1213
export (.+), (.-), (.*), (./), (.%), (.<<), (.>>), div, mod, rem, (&), (|), ($)
1314
export DArray, SubDArray, SubOrDArray, @DArray
1415
export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindexes, ppeval, samedist
15-
export close, darray_closeall
1616

17+
# non-array distributed data
18+
export ddata, gather
19+
20+
# immediate release of localparts
21+
export close, d_closeall
22+
23+
include("darray.jl")
1724
include("core.jl")
1825
include("serialize.jl")
1926
include("mapreduce.jl")
2027
include("linalg.jl")
2128
include("sort.jl")
2229

30+
include("spmd.jl")
31+
2332
end # module

0 commit comments

Comments
 (0)