Skip to content

Commit 2be6ac1

Browse files
committed
fixup! fixup! fixup! Enhance DArray Distribution with Processor Assignment
1 parent 3f4a60c commit 2be6ac1

File tree

3 files changed

+57
-47
lines changed

3 files changed

+57
-47
lines changed

docs/src/darray.md

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,6 @@ across the workers in the Julia cluster in a relatively even distribution;
211211
future operations on a `DArray` may produce a different distribution from the
212212
one chosen by previous calls.
213213

214-
<!-- -->
215-
216214
### Explicit Processor Mapping of DArray Blocks
217215

218216
This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms.
@@ -227,31 +225,33 @@ The `assignment` argument accepts the following values:
227225

228226
* `:blockrow`:
229227

230-
* Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks.
228+
* Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks.
231229

232230
* `:blockcol`:
233231

234232
* Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks.
235233

236234
* `:cyclicrow`:
235+
237236
* Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks.
238237

239238
* `:cycliccol`:
239+
240240
* Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks.
241241

242242
* Any other symbol used for `assignment` results in an error.
243243

244244
* `AbstractArray{<:Int, N}`:
245245

246246
* Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`.
247-
* Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first thread of the processor with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.
247+
* Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first CPU thread of the worker with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.
248248

249249
* `AbstractArray{<:Processor, N}`:
250250

251251
* Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks.
252252
* Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.
253253

254-
#### Examples and Usage
254+
#### Examples and Usage
255255

256256
The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument.
257257

@@ -261,11 +261,11 @@ The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`
261261

262262
* `DMatrix`: Specifically for 2-dimensional distributed arrays.
263263

264-
* `distribute`: General function to distribute arrays.
264+
* `distribute`: General function to distribute arrays of any dimensionality.
265265

266266
* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`.
267267

268-
Here are some examples using a setup with one master processor and three worker processors.
268+
Here are some examples using a setup with one master process and three worker processes.
269269

270270
First, let's create some sample arrays for `distribute` (and constructor functions):
271271

@@ -281,10 +281,10 @@ M = zeros(5, 5, 5) # 3D array
281281
Ad = distribute(A, Blocks(2, 2), :arbitrary)
282282
# DMatrix(A, Blocks(2, 2), :arbitrary)
283283

284-
vd = distribute(v, Blocks(3), :arbitrary)
284+
vd = distribute(v, Blocks(3), :arbitrary)
285285
# DVector(v, Blocks(3), :arbitrary)
286286

287-
Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
287+
Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
288288
# DArray(M, Blocks(2,2,2), :arbitrary)
289289

290290
Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary)
@@ -303,7 +303,7 @@ M = zeros(5, 5, 5) # 3D array
303303

304304
2. **Structured Assignments:**
305305

306-
* **`:blockrow` Assignment:**
306+
* **`:blockrow` Assignment:**
307307

308308
```julia
309309
Ad = distribute(A, Blocks(1, 2), :blockrow)
@@ -329,7 +329,7 @@ M = zeros(5, 5, 5) # 3D array
329329
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
330330
```
331331

332-
* **`:blockcol` Assignment:**
332+
* **`:blockcol` Assignment:**
333333

334334
```julia
335335
Ad = distribute(A, Blocks(2, 2), :blockcol)
@@ -350,9 +350,9 @@ M = zeros(5, 5, 5) # 3D array
350350
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
351351
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
352352
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
353-
```
353+
```
354354

355-
* **`:cyclicrow` Assignment:**
355+
* **`:cyclicrow` Assignment:**
356356

357357
```julia
358358
Ad = distribute(A, Blocks(1, 2), :cyclicrow)
@@ -378,7 +378,7 @@ M = zeros(5, 5, 5) # 3D array
378378
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
379379
```
380380

381-
* **`:cycliccol` Assignment:**
381+
* **`:cycliccol` Assignment:**
382382

383383
```julia
384384
Ad = distribute(A, Blocks(2, 2), :cycliccol)
@@ -405,21 +405,21 @@ M = zeros(5, 5, 5) # 3D array
405405

406406
```julia
407407
assignment_2d = [2 1; 4 3]
408-
Ad = distribute(A, Blocks(2, 2), assignment_2d)
409-
# DMatrix(A, Blocks(2, 2), [3 1; 4 2])
410-
408+
Ad = distribute(A, Blocks(2, 2), assignment_2d)
409+
# DMatrix(A, Blocks(2, 2), [2 1; 4 3])
410+
411411
assignment_1d = [2,3,1,4]
412-
vd = distribute(v, Blocks(3), assignment_1d)
412+
vd = distribute(v, Blocks(3), assignment_1d)
413413
# DVector(v, Blocks(3), [2,3,1,4])
414-
414+
415415
assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3)
416416
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
417417
# DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3))
418418
Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d)
419419
# distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d)
420420
```
421421

422-
The assignment is a integer matrix of `Processor` ID’s, the blocks are assigned in block-cyclic manner to first thread `Processor` ID’s. The assignment for `Ad` (and `Rd`) would be
422+
The assignment is an integer matrix of worker IDs, the blocks are assigned in block-cyclic manner to the first CPU thread of each worker. The assignment for `Ad` (and `Rd`) would be:
423423

424424
```julia
425425
4×6 Matrix{Dagger.ThreadProc}:
@@ -434,22 +434,22 @@ M = zeros(5, 5, 5) # 3D array
434434
```julia
435435
assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1);
436436
Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)]
437-
Ad = distribute(A, Blocks(2, 2), assignment_2d)
437+
Ad = distribute(A, Blocks(2, 2), assignment_2d)
438438
# DMatrix(A, Blocks(2, 2), assignment_2d)
439-
439+
440440
assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)]
441-
vd = distribute(v, Blocks(3), assignment_1d)
441+
vd = distribute(v, Blocks(3), assignment_1d)
442442
# DVector(v, Blocks(3), assignment_1d)
443-
443+
444444
assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)],
445445
[Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3)
446-
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
446+
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
447447
# DArray(M, Blocks(2, 2, 2), assignment_3d)
448448
Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d))
449449
# distribute(rand(7,11), Blocks(2, 2), assignment_2d)
450450
```
451451

452-
The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to `Processor` objects. The assignment for `Ad` (and `Rd`) would be:
452+
The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to each processor. The assignment for `Ad` (and `Rd`) would be:
453453

454454
```julia
455455
4×6 Matrix{Dagger.ThreadProc}:
@@ -459,8 +459,6 @@ M = zeros(5, 5, 5) # 3D array
459459
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
460460
```
461461

462-
<!-- -->
463-
464462
## Broadcasting
465463

466464
As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's

src/array/alloc.jl

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,37 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
1515
sizeA = map(length, d.indexes)
1616
procgrid = nothing
1717
availprocs = collect(Dagger.compatible_processors())
18-
sort!(availprocs, by = x -> (x.owner, x.tid))
18+
if !(assignment isa AbstractArray{<:Processor, N})
19+
filter!(p -> p isa ThreadProc, availprocs)
20+
sort!(availprocs, by = x -> (x.owner, x.tid))
21+
end
22+
np = length(availprocs)
1923
if assignment isa Symbol
2024
if assignment == :arbitrary
2125
procgrid = nothing
2226
elseif assignment == :blockrow
2327
q = ntuple(i -> i == 1 ? Int(ceil(sizeA[1] / p.blocksize[1])) : 1, N)
24-
rows_per_proc, extra = divrem(Int(ceil(sizeA[1] / p.blocksize[1])), num_processors())
25-
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
28+
rows_per_proc, extra = divrem(Int(ceil(sizeA[1] / p.blocksize[1])), np)
29+
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:np]
2630
procgrid = reshape(vcat(fill.(availprocs, counts)...), q)
2731
elseif assignment == :blockcol
2832
q = ntuple(i -> i == N ? Int(ceil(sizeA[N] / p.blocksize[N])) : 1, N)
29-
cols_per_proc, extra = divrem(Int(ceil(sizeA[N] / p.blocksize[N])), num_processors())
30-
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
33+
cols_per_proc, extra = divrem(Int(ceil(sizeA[N] / p.blocksize[N])), np)
34+
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:np]
3135
procgrid = reshape(vcat(fill.(availprocs, counts)...), q)
3236
elseif assignment == :cyclicrow
33-
q = ntuple(i -> i == 1 ? num_processors() : 1, N)
37+
q = ntuple(i -> i == 1 ? np : 1, N)
3438
procgrid = reshape(availprocs, q)
3539
elseif assignment == :cycliccol
36-
q = ntuple(i -> i == N ? num_processors() : 1, N)
40+
q = ntuple(i -> i == N ? np : 1, N)
3741
procgrid = reshape(availprocs, q)
3842
else
3943
error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol")
4044
end
4145
elseif assignment isa AbstractArray{<:Int, N}
4246
missingprocs = filter(q -> q procs(), assignment)
4347
isempty(missingprocs) || error("Specified workers are not available: $missingprocs")
44-
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
48+
procgrid = [ThreadProc(proc, 1) for proc in assignment]
4549
elseif assignment isa AbstractArray{<:Processor, N}
4650
missingprocs = filter(q -> q availprocs, assignment)
4751
isempty(missingprocs) || error("Specified processors are not available: $missingprocs")
@@ -86,7 +90,11 @@ function stage(ctx, a::AllocateArray)
8690
else
8791
scope = ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])
8892
end
89-
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
93+
if a.want_index
94+
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, i, args...)
95+
else
96+
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
97+
end
9098
end
9199
return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning)
92100
end

src/array/darray.jl

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -513,33 +513,37 @@ distribute(A::AbstractArray, assignment::AssignmentType = :arbitrary) = distribu
513513
function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N}
514514
procgrid = nothing
515515
availprocs = collect(Dagger.compatible_processors())
516-
sort!(availprocs, by = x -> (x.owner, x.tid))
516+
if !(assignment isa AbstractArray{<:Processor, N})
517+
filter!(p -> p isa ThreadProc, availprocs)
518+
sort!(availprocs, by = x -> (x.owner, x.tid))
519+
end
520+
np = length(availprocs)
517521
if assignment isa Symbol
518522
if assignment == :arbitrary
519523
procgrid = nothing
520524
elseif assignment == :blockrow
521525
p = ntuple(i -> i == 1 ? Int(ceil(size(A,1) / dist.blocksize[1])) : 1, N)
522-
rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), num_processors())
523-
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
524-
procgrid = reshape(vcat(fill.(availprocs, counts)...), p)
526+
rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), np)
527+
counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:np]
528+
procgrid = reshape(vcat(fill.(availprocs, counts)...), p)
525529
elseif assignment == :blockcol
526530
p = ntuple(i -> i == N ? Int(ceil(size(A,N) / dist.blocksize[N])) : 1, N)
527-
cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), num_processors())
528-
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()]
531+
cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), np)
532+
counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:np]
529533
procgrid = reshape(vcat(fill.(availprocs, counts)...), p)
530534
elseif assignment == :cyclicrow
531-
p = ntuple(i -> i == 1 ? num_processors() : 1, N)
535+
p = ntuple(i -> i == 1 ? np : 1, N)
532536
procgrid = reshape(availprocs, p)
533537
elseif assignment == :cycliccol
534-
p = ntuple(i -> i == N ? num_processors() : 1, N)
538+
p = ntuple(i -> i == N ? np : 1, N)
535539
procgrid = reshape(availprocs, p)
536540
else
537541
error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol")
538542
end
539543
elseif assignment isa AbstractArray{<:Int, N}
540544
missingprocs = filter(p -> p procs(), assignment)
541545
isempty(missingprocs) || error("Specified workers are not available: $missingprocs")
542-
procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment]
546+
procgrid = [ThreadProc(proc, 1) for proc in assignment]
543547
elseif assignment isa AbstractArray{<:Processor, N}
544548
missingprocs = filter(p -> p availprocs, assignment)
545549
isempty(missingprocs) || error("Specified processors are not available: $missingprocs")
@@ -563,7 +567,7 @@ DArray(A::AbstractArray{T,N}, part::Blocks{N}, assignment::AssignmentType{N} = :
563567

564568
DVector(A::AbstractVector{T}, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, AutoBlocks(), assignment)
565569
DMatrix(A::AbstractMatrix{T}, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, AutoBlocks(), assignment)
566-
DArray(A::AbstractArray, assignment::AssignmentType = :arbitrary) = DArray(A, AutoBlocks(), assignment)
570+
DArray(A::AbstractArray, assignment::AssignmentType = :arbitrary) = DArray(A, AutoBlocks(), assignment)
567571

568572
DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, auto_blocks(A), assignment)
569573
DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment)

0 commit comments

Comments
 (0)