Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,254 @@ across the workers in the Julia cluster in a relatively even distribution;
future operations on a `DArray` may produce a different distribution from the
one chosen by previous calls.

### Explicit Processor Mapping of DArray Blocks

This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms.

You can specify the mapping using the optional `assignment` argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`), the `distribute` function, and also directly within constructor-like functions such as `rand`, `randn`, `sprand`, `ones`, and `zeros` using the `assignment` optional keyword argument.

The `assignment` argument accepts the following values:

* `:arbitrary` **(Default)**:

* If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior.

* `:blockrow`:

* Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks.

* `:blockcol`:

* Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks.

* `:cyclicrow`:

* Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks.

* `:cycliccol`:

* Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks.

* Any other symbol used for `assignment` results in an error.

* `AbstractArray{<:Int, N}`:

* Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`.
* Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first CPU thread of the worker with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.

* `AbstractArray{<:Processor, N}`:

* Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks.
* Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions.

#### Examples and Usage

The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument.

* `DArray`: For N-dimensional distributed arrays.

* `DVector`: Specifically for 1-dimensional distributed arrays.

* `DMatrix`: Specifically for 2-dimensional distributed arrays.

* `distribute`: General function to distribute arrays of any dimensionality.

* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`.

Here are some examples using a setup with one master process and three worker processes.

First, let's create some sample arrays for `distribute` (and constructor functions):

```julia
A = rand(7, 11) # 2D array
v = ones(15) # 1D array
M = zeros(5, 5, 5) # 3D array
```

1. **Arbitrary Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :arbitrary)
# DMatrix(A, Blocks(2, 2), :arbitrary)

vd = distribute(v, Blocks(3), :arbitrary)
# DVector(v, Blocks(3), :arbitrary)

Md = distribute(M, Blocks(2, 2, 2), :arbitrary)
# DArray(M, Blocks(2,2,2), :arbitrary)

Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary)
# distribute(rand(7, 11), Blocks(2, 2), :arbitrary)
```

This creates distributed arrays with the specified block sizes, and assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1)
ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1)
```

2. **Structured Assignments:**

* **`:blockrow` Assignment:**

```julia
Ad = distribute(A, Blocks(1, 2), :blockrow)
# DMatrix(A, Blocks(1, 2), :blockrow)
vd = distribute(v, Blocks(3), :blockrow)
# DVector(v, Blocks(3), :blockrow)
Md = distribute(M, Blocks(2, 2, 2), :blockrow)
# DArray(M, Blocks(2,2,2), :blockrow)
Od = ones(Blocks(1, 2), 7, 11; assignment=:blockrow)
# distribute(ones(7, 11), Blocks(1, 2), :blockrow)
```

This creates distributed arrays with the specified block sizes, and assigns contiguous row-blocks to processors evenly. For example, the assignment for `Ad` (and `Od`) will look like this:

```julia
7×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
```

* **`:blockcol` Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :blockcol)
# DMatrix(A, Blocks(2, 2), :blockcol)
vd = distribute(v, Blocks(3), :blockcol)
# DVector(v, Blocks(3), :blockcol)
Md = distribute(M, Blocks(2, 2, 2), :blockcol)
# DArray(M, Blocks(2,2,2), :blockcol)
Rd = randn(Blocks(2, 2), 7, 11; assignment=:blockcol)
# distribute(randn(7, 11), Blocks(2, 2), :blockcol)
```

This creates distributed arrays with the specified block sizes, and assigns contiguous column-blocks to processors evenly. For example, the assignment for `Ad` (and `Rd`) will look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1)
```

* **`:cyclicrow` Assignment:**

```julia
Ad = distribute(A, Blocks(1, 2), :cyclicrow)
# DMatrix(A, Blocks(1, 2), :cyclicrow)
vd = distribute(v, Blocks(3), :cyclicrow)
# DVector(v, Blocks(3), :cyclicrow)
Md = distribute(M, Blocks(2, 2, 2), :cyclicrow)
# DArray(M, Blocks(2,2,2), :cyclicrow)
Zd = zeros(Blocks(1, 2), 7, 11; assignment=:cyclicrow)
# distribute(zeros(7, 11), Blocks(1, 2), :cyclicrow)
```

This creates distributed arrays with the specified block sizes, and assigns row-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Zd`) will look like this:

```julia
7×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1)
ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1)
ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1)
ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1)
```

* **`:cycliccol` Assignment:**

```julia
Ad = distribute(A, Blocks(2, 2), :cycliccol)
# DMatrix(A, Blocks(2, 2), :cycliccol)
vd = distribute(v, Blocks(3), :cycliccol)
# DVector(v, Blocks(3), :cycliccol)
Md = distribute(M, Blocks(2, 2, 2), :cycliccol)
# DArray(M, Blocks(2,2,2), :cycliccol)
Od = ones(Blocks(2, 2), 7, 11; assignment=:cycliccol)
# distribute(ones(7, 11), Blocks(2, 2), :cycliccol)
```

This creates distributed arrays with the specified block sizes, and assigns column-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Od`) will look like this:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1)
```

3. **Block-Cyclic Assignment with Integer Array:**

```julia
assignment_2d = [2 1; 4 3]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), [2 1; 4 3])

assignment_1d = [2,3,1,4]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), [2,3,1,4])

assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3))
Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d)
# distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d)
```

The assignment is an integer matrix of worker IDs, the blocks are assigned in block-cyclic manner to the first CPU thread of each worker. The assignment for `Ad` (and `Rd`) would be:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1)
ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1)
```

4. **Block-Cyclic Assignment with Processor Array:**

```julia
assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1);
Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)]
Ad = distribute(A, Blocks(2, 2), assignment_2d)
# DMatrix(A, Blocks(2, 2), assignment_2d)

assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)]
vd = distribute(v, Blocks(3), assignment_1d)
# DVector(v, Blocks(3), assignment_1d)

assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)],
[Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3)
Md = distribute(M, Blocks(2, 2, 2), assignment_3d)
# DArray(M, Blocks(2, 2, 2), assignment_3d)
Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d))
# distribute(rand(7,11), Blocks(2, 2), assignment_2d)
```

The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to each processor. The assignment for `Ad` (and `Rd`) would be:

```julia
4×6 Matrix{Dagger.ThreadProc}:
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1)
ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2)
```

## Broadcasting

As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's
Expand Down
Loading