Skip to content

Commit 86c6fc3

Browse files
authored
Merge pull request #630 from AkhilAkkapelli/darray-assign
Enhance DArray Distribution with Processor Assignment
2 parents cd8ca95 + 03ee835 commit 86c6fc3

File tree

8 files changed

+898
-90
lines changed

8 files changed

+898
-90
lines changed

docs/src/darray.md

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,254 @@ across the workers in the Julia cluster in a relatively even distribution;
211211
future operations on a `DArray` may produce a different distribution from the
212212
one chosen by previous calls.
213213

214+
### Explicit Processor Mapping of DArray Blocks
215+
216+
This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms.
217+
218+
You can specify the mapping using the optional `assignment` argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`), the `distribute` function, and also directly within constructor-like functions such as `rand`, `randn`, `sprand`, `ones`, and `zeros` using the `assignment` optional keyword argument.
219+
220+
The `assignment` argument accepts the following values:
221+
222+
* `:arbitrary` **(Default)**:
223+
224+
* If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior.
225+
226+
* `:blockrow`:
227+
228+
* Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks.
229+
230+
* `:blockcol`:
231+
232+
* Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks.
233+
234+
* `:cyclicrow`:
235+
236+
* Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks.
237+
238+
* `:cycliccol`:
239+
240+
* Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks.
241+
242+
* Any other symbol used for `assignment` results in an error.
243+
244+
* `AbstractArray{<:Int, N}`:
245+
246+
* Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`.
247+
* Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first CPU thread of the worker with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.
248+
249+
* `AbstractArray{<:Processor, N}`:
250+
251+
* Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks.
252+
* Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.
253+
254+
#### Examples and Usage
255+
256+
The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument.
257+
258+
* `DArray`: For N-dimensional distributed arrays.
259+
260+
* `DVector`: Specifically for 1-dimensional distributed arrays.
261+
262+
* `DMatrix`: Specifically for 2-dimensional distributed arrays.
263+
264+
* `distribute`: General function to distribute arrays of any dimensionality.
265+
266+
* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`.
267+
268+
Here are some examples using a setup with one master process and three worker processes.
269+
270+
First, let's create some sample arrays for `distribute` (and constructor functions):
271+
272+
```julia
273+
A = rand(7, 11) # 2D array
274+
v = ones(15) # 1D array
275+
M = zeros(5, 5, 5) # 3D array
276+
```
277+
278+
1. **Arbitrary Assignment:**
279+
280+
```julia
281+
Ad = distribute(A, Blocks(2, 2), :arbitrary)
282+
# DMatrix(A, Blocks(2, 2), :arbitrary)
283+
284+
vd = distribute(v, Blocks(3), :arbitrary)
285+
# DVector(v, Blocks(3), :arbitrary)
286+
287+
Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
288+
# DArray(M, Blocks(2,2,2), :arbitrary)
289+
290+
Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary)
291+
# distribute(rand(7, 11), Blocks(2, 2), :arbitrary)
292+
```
293+
294+
This creates distributed arrays with the specified block sizes, and assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this:
295+
296+
```julia
297+
4×6 Matrix{Dagger.ThreadProc}:
298+
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1)
299+
ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1)
300+
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1)
301+
ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1)
302+
```
303+
304+
2. **Structured Assignments:**
305+
306+
* **`:blockrow` Assignment:**
307+
308+
```julia
309+
Ad = distribute(A, Blocks(1, 2), :blockrow)
310+
# DMatrix(A, Blocks(1, 2), :blockrow)
311+
vd = distribute(v, Blocks(3), :blockrow)
312+
# DVector(v, Blocks(3), :blockrow)
313+
Md = distribute(M, Blocks(2, 2, 2), :blockrow)
314+
# DArray(M, Blocks(2,2,2), :blockrow)
315+
Od = ones(Blocks(1, 2), 7, 11; assignment=:blockrow)
316+
# distribute(ones(7, 11), Blocks(1, 2), :blockrow)
317+
```
318+
319+
This creates distributed arrays with the specified block sizes, and assigns contiguous row-blocks to processors evenly. For example, the assignment for `Ad` (and `Od`) will look like this:
320+
321+
```julia
322+
7×6 Matrix{Dagger.ThreadProc}:
323+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
324+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
325+
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
326+
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
327+
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
328+
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
329+
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
330+
```
331+
332+
* **`:blockcol` Assignment:**
333+
334+
```julia
335+
Ad = distribute(A, Blocks(2, 2), :blockcol)
336+
# DMatrix(A, Blocks(2, 2), :blockcol)
337+
vd = distribute(v, Blocks(3), :blockcol)
338+
# DVector(v, Blocks(3), :blockcol)
339+
Md = distribute(M, Blocks(2, 2, 2), :blockcol)
340+
# DArray(M, Blocks(2,2,2), :blockcol)
341+
Rd = randn(Blocks(2, 2), 7, 11; assignment=:blockcol)
342+
# distribute(randn(7, 11), Blocks(2, 2), :blockcol)
343+
```
344+
345+
This creates distributed arrays with the specified block sizes, and assigns contiguous column-blocks to processors evenly. For example, the assignment for `Ad` (and `Rd`) will look like this:
346+
347+
```julia
348+
4×6 Matrix{Dagger.ThreadProc}:
349+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
350+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
351+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
352+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
353+
```
354+
355+
* **`:cyclicrow` Assignment:**
356+
357+
```julia
358+
Ad = distribute(A, Blocks(1, 2), :cyclicrow)
359+
# DMatrix(A, Blocks(1, 2), :cyclicrow)
360+
vd = distribute(v, Blocks(3), :cyclicrow)
361+
# DVector(v, Blocks(3), :cyclicrow)
362+
Md = distribute(M, Blocks(2, 2, 2), :cyclicrow)
363+
# DArray(M, Blocks(2,2,2), :cyclicrow)
364+
Zd = zeros(Blocks(1, 2), 7, 11; assignment=:cyclicrow)
365+
# distribute(zeros(7, 11), Blocks(1, 2), :cyclicrow)
366+
```
367+
368+
This creates distributed arrays with the specified block sizes, and assigns row-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Zd`) will look like this:
369+
370+
```julia
371+
7×6 Matrix{Dagger.ThreadProc}:
372+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
373+
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
374+
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
375+
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
376+
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
377+
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
378+
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
379+
```
380+
381+
* **`:cycliccol` Assignment:**
382+
383+
```julia
384+
Ad = distribute(A, Blocks(2, 2), :cycliccol)
385+
# DMatrix(A, Blocks(2, 2), :cycliccol)
386+
vd = distribute(v, Blocks(3), :cycliccol)
387+
# DVector(v, Blocks(3), :cycliccol)
388+
Md = distribute(M, Blocks(2, 2, 2), :cycliccol)
389+
# DArray(M, Blocks(2,2,2), :cycliccol)
390+
Od = ones(Blocks(2, 2), 7, 11; assignment=:cycliccol)
391+
# distribute(ones(7, 11), Blocks(2, 2), :cycliccol)
392+
```
393+
394+
This creates distributed arrays with the specified block sizes, and assigns column-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Od`) will look like this:
395+
396+
```julia
397+
4×6 Matrix{Dagger.ThreadProc}:
398+
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
399+
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
400+
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
401+
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
402+
```
403+
404+
3. **Block-Cyclic Assignment with Integer Array:**
405+
406+
```julia
407+
assignment_2d = [2 1; 4 3]
408+
Ad = distribute(A, Blocks(2, 2), assignment_2d)
409+
# DMatrix(A, Blocks(2, 2), [2 1; 4 3])
410+
411+
assignment_1d = [2,3,1,4]
412+
vd = distribute(v, Blocks(3), assignment_1d)
413+
# DVector(v, Blocks(3), [2,3,1,4])
414+
415+
assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3)
416+
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
417+
# DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3))
418+
Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d)
419+
# distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d)
420+
```
421+
422+
The assignment is an integer matrix of worker IDs, the blocks are assigned in block-cyclic manner to the first CPU thread of each worker. The assignment for `Ad` (and `Rd`) would be:
423+
424+
```julia
425+
4×6 Matrix{Dagger.ThreadProc}:
426+
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
427+
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
428+
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
429+
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
430+
```
431+
432+
4. **Block-Cyclic Assignment with Processor Array:**
433+
434+
```julia
435+
assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1);
436+
Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)]
437+
Ad = distribute(A, Blocks(2, 2), assignment_2d)
438+
# DMatrix(A, Blocks(2, 2), assignment_2d)
439+
440+
assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)]
441+
vd = distribute(v, Blocks(3), assignment_1d)
442+
# DVector(v, Blocks(3), assignment_1d)
443+
444+
assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)],
445+
[Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3)
446+
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
447+
# DArray(M, Blocks(2, 2, 2), assignment_3d)
448+
Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d))
449+
# distribute(rand(7,11), Blocks(2, 2), assignment_2d)
450+
```
451+
452+
The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to each processor. The assignment for `Ad` (and `Rd`) would be:
453+
454+
```julia
455+
4×6 Matrix{Dagger.ThreadProc}:
456+
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
457+
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
458+
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
459+
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
460+
```
461+
214462
## Broadcasting
215463

216464
As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's

0 commit comments

Comments
 (0)