diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..e69de29bb diff --git a/docs/make.jl b/docs/make.jl index fffad5017..1e05647d8 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -20,6 +20,7 @@ makedocs(; "Parallel Nested Loops" => "use-cases/parallel-nested-loops.md", ], "Task Spawning" => "task-spawning.md", + "Task Affinity" => "task-affinity.md", "Data Management" => "data-management.md", "Distributed Arrays" => "darray.md", "Streaming Tasks" => "streaming.md", diff --git a/docs/src/darray.md b/docs/src/darray.md index 715a6cbe8..53cd15cff 100644 --- a/docs/src/darray.md +++ b/docs/src/darray.md @@ -211,6 +211,342 @@ across the workers in the Julia cluster in a relatively even distribution; future operations on a `DArray` may produce a different distribution from the one chosen by previous calls. +### DArray Chunk Slicing with `view` + +Dagger's `view` function allows you to efficiently create a "view" of a `DArray`'s `Chunk` or a `DTask` that produces a `DArray` `Chunk`. This enables operations on specific parts of your distributed data using standard Julia array slicing, without needing to materialize the entire chunk. + +```view(c::Chunk, slices...) & view(c::DTask, slices...)``` + +These methods create a `view` for a `DArray` `Chunk` object or for a `DTask` that will produce a `DArray` `Chunk`. You specify the desired sub-region using standard Julia array slicing syntax, identical to how you would slice a regular Array. + +#### Examples + +```julia +julia> A = rand(64, 64) +64×64 Matrix{Float64}: +[...] + +julia> DA = DArray(A, Blocks(8,8)) +64x64 DMatrix{Float64} with 8x8 partitions of size 8x8: +[...] + +julia> chunk = DA.chunks[1,1] +DTask (finished) + +julia> view(chunk, :, :) # View the entire 8x8 chunk +ChunkSlice{2}(Dagger.Chunk(...), (Colon(), Colon())) + +julia> view(chunk, 1:4, 1:4) # View the top-left 4x4 sub-region of the chunk +ChunkSlice{2}(Dagger.Chunk(...), (1:4, 1:4)) + +julia> view(chunk, 1, :) # View the first row of the chunk +ChunkSlice{2}(Dagger.Chunk(...), (1, Colon())) + +julia> view(chunk, :, 5) # View the fifth column of the chunk +ChunkSlice{2}(Dagger.Chunk(...), (Colon(), 5)) + +julia> view(chunk, 1:2:7, 2:2:8) # View with stepped ranges +ChunkSlice{2}(Dagger.Chunk(...), (1:2:7, 2:2:8)) +``` + +#### Example Usage: Parallel Row Summation of a DArray using `view` +This example demonstrates how to sum multiple rows of a `DArray` by using `view` to process individual rows within chunks to get Row Sum Vector. + +```julia +julia> A = DArray(rand(10,1000), Blocks(2,1000)) +10x1000 DMatrix{Float64} with 5x1 partitions of size 2x1000: +[...] + +# Helper function to sum a single row and store it in a provided array view +julia> @everywhere function sum_array_row!(row_sum::AbstractArray{Float64}, x::AbstractArray{Float64}) + row_sum[1] = sum(x) +end + +# Number of rows +julia> nrows = size(A,1) + +# Initialize a zero array in the final row sums +julia> row_sums = zeros(nrows) + +# Spawn tasks to sum each row in parallel using views +julia> Dagger.spawn_datadeps() do + sz = size(A.chunks,1) + nrows_per_chunk = nrows ÷ sz + for i in 1:sz + for j in 1:nrows_per_chunk + Dagger.@spawn sum_array_row!(Out(view(row_sums, (nrows_per_chunk*(i-1)+j):(nrows_per_chunk*(i-1)+j))), In(Dagger.view(BD.chunks[i,1], j:j, :))) + end + end + +# Print the result +julia> println("Row sum Vector: ", row_sums) +Row sum Vector: [499.8765, 500.1234, ..., 499.9876] +``` + + + +### Explicit Processor Mapping of DArray Blocks + +This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms. + +You can specify the mapping using the optional `assignment` argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`), the `distribute` function, and also directly within constructor-like functions such as `rand`, `randn`, `sprand`, `ones`, and `zeros` using the `assignment` optional keyword argument. + +The `assignment` argument accepts the following values: + +* `:arbitrary` **(Default)**: + + * If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior. + +* `:blockrow`: + + * Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks. + +* `:blockcol`: + + * Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks. + +* `:cyclicrow`: + * Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks. + +* `:cycliccol`: + * Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks. + +* Any other symbol used for `assignment` results in an error. + +* `AbstractArray{<:Int, N}`: + + * Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`. + * Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first thread of the processor with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions. + +* `AbstractArray{<:Processor, N}`: + + * Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks. + * Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions. + +#### Examples and Usage + +The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument. + +* `DArray`: For N-dimensional distributed arrays. + +* `DVector`: Specifically for 1-dimensional distributed arrays. + +* `DMatrix`: Specifically for 2-dimensional distributed arrays. + +* `distribute`: General function to distribute arrays. + +* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`. + +Here are some examples using a setup with one master processor and three worker processors. + +First, let's create some sample arrays for `distribute` (and constructor functions): + +```julia +A = rand(7, 11) # 2D array +v = ones(15) # 1D array +M = zeros(5, 5, 5) # 3D array +``` + +1. **Arbitrary Assignment:** + + ```julia + Ad = distribute(A, Blocks(2, 2), :arbitrary) + # DMatrix(A, Blocks(2, 2), :arbitrary) + + vd = distribute(v, Blocks(3), :arbitrary) + # DVector(v, Blocks(3), :arbitrary) + + Md = distribute(M, Blocks(2, 2, 2), :arbitrary) + # DArray(M, Blocks(2,2,2), :arbitrary) + + Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary) + # distribute(rand(7, 11), Blocks(2, 2), :arbitrary) + ``` + + This creates distributed arrays with the specified block sizes, and assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1) + ``` + +2. **Structured Assignments:** + + * **`:blockrow` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(1, 2), :blockrow) + # DMatrix(A, Blocks(1, 2), :blockrow) + + vd = distribute(v, Blocks(3), :blockrow) + # DVector(v, Blocks(3), :blockrow) + + Md = distribute(M, Blocks(2, 2, 2), :blockrow) + # DArray(M, Blocks(2,2,2), :blockrow) + + Od = ones(Blocks(1, 2), 7, 11; assignment=:blockrow) + # distribute(ones(7, 11), Blocks(1, 2), :blockrow) + ``` + + This creates distributed arrays with the specified block sizes, and assigns contiguous row-blocks to processors evenly. For example, the assignment for `Ad` (and `Od`) will look like this: + + ```julia + 7×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ``` + + * **`:blockcol` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(2, 2), :blockcol) + # DMatrix(A, Blocks(2, 2), :blockcol) + + vd = distribute(v, Blocks(3), :blockcol) + # DVector(v, Blocks(3), :blockcol) + + Md = distribute(M, Blocks(2, 2, 2), :blockcol) + # DArray(M, Blocks(2,2,2), :blockcol) + + Rd = randn(Blocks(2, 2), 7, 11; assignment=:blockcol) + # distribute(randn(7, 11), Blocks(2, 2), :blockcol) + ``` + + This creates distributed arrays with the specified block sizes, and assigns contiguous column-blocks to processors evenly. For example, the assignment for `Ad` (and `Rd`) will look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ``` + +* **`:cyclicrow` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(1, 2), :cyclicrow) + # DMatrix(A, Blocks(1, 2), :cyclicrow) + + vd = distribute(v, Blocks(3), :cyclicrow) + # DVector(v, Blocks(3), :cyclicrow) + + Md = distribute(M, Blocks(2, 2, 2), :cyclicrow) + # DArray(M, Blocks(2,2,2), :cyclicrow) + + Zd = zeros(Blocks(1, 2), 7, 11; assignment=:cyclicrow) + # distribute(zeros(7, 11), Blocks(1, 2), :cyclicrow) + ``` + + This creates distributed arrays with the specified block sizes, and assigns row-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Zd`) will look like this: + + ```julia + 7×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ``` + +* **`:cycliccol` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(2, 2), :cycliccol) + # DMatrix(A, Blocks(2, 2), :cycliccol) + + vd = distribute(v, Blocks(3), :cycliccol) + # DVector(v, Blocks(3), :cycliccol) + + Md = distribute(M, Blocks(2, 2, 2), :cycliccol) + # DArray(M, Blocks(2,2,2), :cycliccol) + + Od = ones(Blocks(2, 2), 7, 11; assignment=:cycliccol) + # distribute(ones(7, 11), Blocks(2, 2), :cycliccol) + ``` + + This creates distributed arrays with the specified block sizes, and assigns column-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Od`) will look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ``` + +3. **Block-Cyclic Assignment with Integer Array:** + + ```julia + assignment_2d = [2 1; 4 3] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), [3 1; 4 2]) + + assignment_1d = [2,3,1,4] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), [2,3,1,4]) + + assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3)) + + Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d) + # distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d) + ``` + + The assignment is a integer matrix of `Processor` ID’s, the blocks are assigned in block-cyclic manner to first thread `Processor` ID’s. The assignment for `Ad` (and `Rd`) would be + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ``` + +4. **Block-Cyclic Assignment with Processor Array:** + + ```julia + assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1); + Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), assignment_2d) + + assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), assignment_1d) + + assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)], + [Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), assignment_3d) + + Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d)) + # distribute(rand(7,11), Blocks(2, 2), assignment_2d) + ``` + + The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to `Processor` objects. The assignment for `Ad` (and `Rd`) would be: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + ``` + + + ## Broadcasting As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's @@ -446,4 +782,4 @@ From `LinearAlgebra`: - `*` (Out-of-place Matrix-(Matrix/Vector) multiply) - `mul!` (In-place Matrix-Matrix multiply) - `cholesky`/`cholesky!` (In-place/Out-of-place Cholesky factorization) -- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only)) +- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only)) \ No newline at end of file diff --git a/docs/src/task-affinity.md b/docs/src/task-affinity.md new file mode 100644 index 000000000..9ee69c154 --- /dev/null +++ b/docs/src/task-affinity.md @@ -0,0 +1,127 @@ +# Task Affinity + +Dagger.jl's `@spawn` macro allows precise control over task execution and result accessibility using `scope`, `compute_scope`, and `result_scope`, which specify various chunk scopes of the task. + +For more information on how these scopes work, see [Scopes](scopes.md#Scopes). + +--- + +## Key Terms + +### Scope +`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not explicitly set, the task runs within the `compute_scope`. If both `scope` and `compute_scope` both are unspecified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y) +``` +Task `g` executes only on worker 3. Its result can be accessed by any worker. + +--- + +### Compute Scope +Like `scope`,`compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, the they default to `DefaultScope()`. + +**Example:** +```julia +g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +``` +Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker. + +--- + +### Result Scope + +The result_scope limits the workers from which a task's result can be accessed. This is crucial for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any worker. + +**Example:** +```julia +g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1,3, 4]) f(x,y) +``` +The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3. + +--- + +## Interaction of `compute_scope` and `result_scope` + +When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty then the scheduler throws a `Dagger.Sch.SchedulerException` error. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y) +``` +The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), and its result access is also restricted to thread 2 of worker 2. + +--- + +## Chunk Inputs to Tasks + +This section explains how `scope`, `compute_scope`, and `result_scope` affect tasks when a `Chunk` is the primary input to `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task). + +Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, `chunk_proc` is the chunk's processor, and `chunk_scope` is its defined accessibility. + +When `Dagger.tochunk(...)` is directly spawned: +- The task executes on `chunk_proc`. +- The result is accessible only within `chunk_scope`. +- This behavior occurs irrespective of the `scope`, `compute_scope`, and `result_scope` values provided in the `@spawn` macro. +- Dagger validates that there is an intersection between the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`) and the `result_scope`. If no intersection exists, the scheduler throws an exception. + +!!! info While `chunk_proc` is currently required when constructing a chunk, it is largely unused in actual scheduling logic. It exists primarily for backward compatibility and may be deprecated in the future. + +**Usage:** +```julia +h1 = Dagger.@spawn scope=Dagger.scope(worker=3) Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope) +h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope) +h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope) +h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) Dagger.tochunk(g(40, 41), chunk_proc, chunk_scope) +h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope(worker=2,threads=[2,3]) Dagger.tochunk(g(50, 51), chunk_proc, chunk_scope) +``` +In all these cases (`h1` through `h5`), the tasks get executed on processor `chunk_proc` of chunk, and its result is accessible only within `chunk_scope`. + +--- + +## Function with Chunk Arguments as Tasks + +This section details behavior when `scope`, `compute_scope`, and `result_scope` are used with tasks where a function is the input, and its arguments include `Chunk`s. + +Assume `g(x, y) = x * 2 + y * 3` is a function, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)` is a chunk argument, where `arg_proc` is the chunk's processor and `arg_scope` is its defined scope. + +### Scope +If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=3) g(arg, 11) +``` +Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any worker. + +--- + +### Compute scope and Chunk argument scopes interaction +If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. `result_scope` defaults to `AnyScope()`. + +```julia +h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 11) +h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g(arg, 21) +``` +Tasks `h1` and `h2` execute on any worker within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is stored and accessible from anywhere. + +--- + +### Result scope and Chunk argument scopes interaction +If only `result_scope` is specified, computation happens on any worker within `arg_scope`, and the result is only accessible from `result_scope`. + +```julia +h = Dagger.@spawn result_scope=Dagger.scope(worker=3) g(arg, 11) +``` +Task `h` executes on any worker within `arg_scope`. The result is accessible from `result_scope`. + +--- + +### Compute, result, and chunk argument scopes interaction +When `scope`, `compute_scope`, and `result_scope` are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.ProcessScope(2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31) +``` +Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg`, `compute`, and `result` scopes), and its result access is also restricted to thread 2 of worker 2. diff --git a/src/Dagger.jl b/src/Dagger.jl index 0c3761c44..68fd0d89c 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -7,7 +7,7 @@ import SparseArrays: sprand, SparseMatrixCSC import MemPool import MemPool: DRef, FileRef, poolget, poolset -import Base: collect, reduce +import Base: collect, reduce, view import LinearAlgebra import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric diff --git a/src/array/alloc.jl b/src/array/alloc.jl index f67c927de..3f058df33 100644 --- a/src/array/alloc.jl +++ b/src/array/alloc.jl @@ -8,10 +8,51 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N} want_index::Bool domain::ArrayDomain{N} domainchunks - partitioning::AbstractBlocks + partitioning::AbstractBlocks{N} + procgrid::Union{AbstractArray{<:Processor, N}, Nothing} end size(a::AllocateArray) = size(a.domain) +function AllocateArray(assignment::AssignmentType{N}, eltype::Type{T}, f, want_index::Bool, d::ArrayDomain{N}, domainchunks, p::AbstractBlocks{N}) where {T,N} + sizeA = map(length, d.indexes) + procgrid = nothing + availprocs = collect(Dagger.all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + if assignment isa Symbol + if assignment == :arbitrary + procgrid = nothing + elseif assignment == :blockrow + q = ntuple(i -> i == 1 ? Int(ceil(sizeA[1] / p.blocksize[1])) : 1, N) + rows_per_proc, extra = divrem(Int(ceil(sizeA[1] / p.blocksize[1])), num_processors()) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), q) + elseif assignment == :blockcol + q = ntuple(i -> i == N ? Int(ceil(sizeA[N] / p.blocksize[N])) : 1, N) + cols_per_proc, extra = divrem(Int(ceil(sizeA[N] / p.blocksize[N])), num_processors()) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), q) + elseif assignment == :cyclicrow + q = ntuple(i -> i == 1 ? num_processors() : 1, N) + procgrid = reshape(availprocs, q) + elseif assignment == :cycliccol + q = ntuple(i -> i == N ? num_processors() : 1, N) + procgrid = reshape(availprocs, q) + else + error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol") + end + elseif assignment isa AbstractArray{<:Int, N} + missingprocs = filter(q -> q ∉ procs(), assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment] + elseif assignment isa AbstractArray{<:Processor, N} + missingprocs = filter(q -> q ∉ availprocs, assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = assignment + end + + return AllocateArray{T,N}(eltype, f, want_index, d, domainchunks, p, procgrid) +end + function _cumlength(len, step) nice_pieces = div(len, step) extra = rem(len, step) @@ -34,79 +75,86 @@ function allocate_array(f, T, sz) end allocate_array_func(::Processor, f) = f function stage(ctx, a::AllocateArray) - if a.want_index - thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, i, size(x)) for (i, x) in enumerate(a.domainchunks)] - else - thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, size(x)) for (i, x) in enumerate(a.domainchunks)] + chunks = map(CartesianIndices(a.domainchunks)) do I + x = a.domainchunks[I] + i = LinearIndices(a.domainchunks)[I] + args = a.want_index ? (i, size(x)) : (size(x),) + scope = isnothing(a.procgrid) ? nothing : ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)]) + + if isnothing(scope) + Dagger.@spawn allocate_array(a.f, a.eltype, args...) + else + Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...) + end end - return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning) + return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning) end const BlocksOrAuto = Union{Blocks{N} where N, AutoBlocks} -function Base.rand(p::Blocks, eltype::Type, dims::Dims) +function Base.rand(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, rand, false, d, partition(p, d), p) + a = AllocateArray(assignment, eltype, rand, false, d, partition(p, d), p) return _to_darray(a) end -Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...) = rand(p, T, dims) -Base.rand(p::BlocksOrAuto, T::Type, dims::Dims) = rand(p, T, dims) -Base.rand(p::BlocksOrAuto, dims::Integer...) = rand(p, Float64, dims) -Base.rand(p::BlocksOrAuto, dims::Dims) = rand(p, Float64, dims) -Base.rand(::AutoBlocks, eltype::Type, dims::Dims) = - rand(auto_blocks(dims), eltype, dims) +Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment) +Base.rand(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + rand(auto_blocks(dims), eltype, dims; assignment=assignment) -function Base.randn(p::Blocks, eltype::Type, dims::Dims) +function Base.randn(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, randn, false, d, partition(p, d), p) + a = AllocateArray(assignment, eltype, randn, false, d, partition(p, d), p) return _to_darray(a) end -Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...) = randn(p, T, dims) -Base.randn(p::BlocksOrAuto, T::Type, dims::Dims) = randn(p, T, dims) -Base.randn(p::BlocksOrAuto, dims::Integer...) = randn(p, Float64, dims) -Base.randn(p::BlocksOrAuto, dims::Dims) = randn(p, Float64, dims) -Base.randn(::AutoBlocks, eltype::Type, dims::Dims) = - randn(auto_blocks(dims), eltype, dims) +Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment) +Base.randn(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + randn(auto_blocks(dims), eltype, dims; assignment=assignment) -function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) +function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p) + a = AllocateArray(assignment, eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p) return _to_darray(a) end -sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...) = - sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]) -sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat) = - sprand(p, T, dims, sparsity) -sprand(p::BlocksOrAuto, dims_and_sparsity::Real...) = - sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]) -sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat) = - sprand(p, Float64, dims, sparsity) -sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) = - sprand(auto_blocks(dims), eltype, dims, sparsity) +sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) = + sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment) +sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = + sprand(p, T, dims, sparsity; assignment=assignment) +sprand(p::BlocksOrAuto, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) = + sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment) +sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = + sprand(p, Float64, dims, sparsity; assignment=assignment) +sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = + sprand(auto_blocks(dims), eltype, dims, sparsity; assignment=assignment) -function Base.ones(p::Blocks, eltype::Type, dims::Dims) +function Base.ones(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, ones, false, d, partition(p, d), p) + a = AllocateArray(assignment, eltype, ones, false, d, partition(p, d), p) return _to_darray(a) end -Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...) = ones(p, T, dims) -Base.ones(p::BlocksOrAuto, T::Type, dims::Dims) = ones(p, T, dims) -Base.ones(p::BlocksOrAuto, dims::Integer...) = ones(p, Float64, dims) -Base.ones(p::BlocksOrAuto, dims::Dims) = ones(p, Float64, dims) -Base.ones(::AutoBlocks, eltype::Type, dims::Dims) = - ones(auto_blocks(dims), eltype, dims) +Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment) +Base.ones(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + ones(auto_blocks(dims), eltype, dims; assignment=assignment) -function Base.zeros(p::Blocks, eltype::Type, dims::Dims) +function Base.zeros(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, zeros, false, d, partition(p, d), p) + a = AllocateArray(assignment, eltype, zeros, false, d, partition(p, d), p) return _to_darray(a) end -Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...) = zeros(p, T, dims) -Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims) = zeros(p, T, dims) -Base.zeros(p::BlocksOrAuto, dims::Integer...) = zeros(p, Float64, dims) -Base.zeros(p::BlocksOrAuto, dims::Dims) = zeros(p, Float64, dims) -Base.zeros(::AutoBlocks, eltype::Type, dims::Dims) = - zeros(auto_blocks(dims), eltype, dims) +Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment) +Base.zeros(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + zeros(auto_blocks(dims), eltype, dims; assignment=assignment) function Base.zero(x::DArray{T,N}) where {T,N} dims = ntuple(i->x.domain.indexes[i].stop, N) diff --git a/src/array/darray.jl b/src/array/darray.jl index 37c61a936..7147c428a 100644 --- a/src/array/darray.jl +++ b/src/array/darray.jl @@ -419,6 +419,7 @@ struct Distribute{T,N,B<:AbstractBlocks} <: ArrayOp{T, N} domainchunks partitioning::B data::AbstractArray{T,N} + procgrid::Union{AbstractArray{<:Processor, N}, Nothing} end size(x::Distribute) = size(domain(x.data)) @@ -426,19 +427,18 @@ size(x::Distribute) = size(domain(x.data)) Base.@deprecate BlockPartition Blocks -Distribute(p::Blocks, data::AbstractArray) = - Distribute(partition(p, domain(data)), p, data) +Distribute(p::Blocks, data::AbstractArray, procgrid::Union{AbstractArray{<:Processor},Nothing} = nothing) = + Distribute(partition(p, domain(data)), p, data, procgrid) -function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}) where {T,N} +function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} p = Blocks(ntuple(i->first(domainchunks.cumlength[i]), N)) - Distribute(domainchunks, p, data) + Distribute(domainchunks, p, data, procgrid) end -function Distribute(data::AbstractArray{T,N}) where {T,N} - nprocs = sum(w->length(Dagger.get_processors(OSProc(w))), - procs()) +function Distribute(data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} + nprocs = sum(w->length(get_processors(OSProc(w))),procs()) p = Blocks(ntuple(i->max(cld(size(data, i), nprocs), 1), N)) - return Distribute(partition(p, domain(data)), p, data) + return Distribute(partition(p, domain(data)), p, data, procgrid) end function stage(ctx::Context, d::Distribute) @@ -451,7 +451,8 @@ function stage(ctx::Context, d::Distribute) Nd = ndims(x) T = eltype(d.data) concat = x.concat - cs = map(d.domainchunks) do idx + cs = map(CartesianIndices(d.domainchunks)) do I + idx = d.domainchunks[I] chunks = stage(ctx, x[idx]).chunks shape = size(chunks) # TODO: fix hashing @@ -466,12 +467,19 @@ function stage(ctx::Context, d::Distribute) end end else - cs = map(d.domainchunks) do c + cs = map(CartesianIndices(d.domainchunks)) do I # TODO: fix hashing #hash = uhash(c, Base.hash(Distribute, Base.hash(d.data))) - Dagger.@spawn identity(d.data[c]) + c = d.domainchunks[I] + if isnothing(d.procgrid) + Dagger.@spawn identity(d.data[c]) + else + scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)]) + Dagger.@spawn compute_scope=scope identity(d.data[c]) + end end end + return DArray(eltype(d.data), domain(d.data), d.domainchunks, @@ -494,29 +502,67 @@ function auto_blocks(dims::Dims{N}) where N end auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A)) -distribute(A::AbstractArray) = distribute(A, AutoBlocks()) -distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} = - _to_darray(Distribute(dist, A)) -distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A)) -function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N} +const AssignmentType{N} = Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} + +distribute(A::AbstractArray, assignment::AssignmentType = :arbitrary) = distribute(A, AutoBlocks(), assignment) +function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} + procgrid = nothing + availprocs = collect(Dagger.all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + if assignment isa Symbol + if assignment == :arbitrary + procgrid = nothing + elseif assignment == :blockrow + p = ntuple(i -> i == 1 ? Int(ceil(size(A,1) / dist.blocksize[1])) : 1, N) + rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), num_processors()) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :blockcol + p = ntuple(i -> i == N ? Int(ceil(size(A,N) / dist.blocksize[N])) : 1, N) + cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), num_processors()) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :cyclicrow + p = ntuple(i -> i == 1 ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + elseif assignment == :cycliccol + p = ntuple(i -> i == N ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + else + error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol") + end + elseif assignment isa AbstractArray{<:Int, N} + missingprocs = filter(p -> p ∉ procs(), assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment] + elseif assignment isa AbstractArray{<:Processor, N} + missingprocs = filter(p -> p ∉ availprocs, assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = assignment + end + + return _to_darray(Distribute(dist, A, procgrid)) +end + +distribute(A::AbstractArray, ::AutoBlocks, assignment::AssignmentType = :arbitrary) = distribute(A, auto_blocks(A), assignment) +function distribute(x::AbstractArray{T,N}, n::NTuple{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} p = map((d, dn)->ceil(Int, d / dn), size(x), n) - distribute(x, Blocks(p)) + distribute(x, Blocks(p), assignment) end -distribute(x::AbstractVector, n::Int) = distribute(x, (n,)) -distribute(x::AbstractVector, n::Vector{<:Integer}) = - distribute(x, DomainBlocks((1,), (cumsum(n),))) +distribute(x::AbstractVector, n::Int, assignment::AssignmentType{1} = :arbitrary) = distribute(x, (n,), assignment) +distribute(x::AbstractVector, n::Vector{<:Integer}, assignment::AssignmentType{1} = :arbitrary) = distribute(x, n[1], assignment) -DVector(A::AbstractVector{T}, part::Blocks{1}) where T = distribute(A, part) -DMatrix(A::AbstractMatrix{T}, part::Blocks{2}) where T = distribute(A, part) -DArray(A::AbstractArray{T,N}, part::Blocks{N}) where {T,N} = distribute(A, part) +DVector(A::AbstractVector{T}, part::Blocks{1}, assignment::AssignmentType{1} = :arbitrary) where T = distribute(A, part, assignment) +DMatrix(A::AbstractMatrix{T}, part::Blocks{2}, assignment::AssignmentType{2} = :arbitrary) where T = distribute(A, part, assignment) +DArray(A::AbstractArray{T,N}, part::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} = distribute(A, part, assignment) -DVector(A::AbstractVector{T}) where T = DVector(A, AutoBlocks()) -DMatrix(A::AbstractMatrix{T}) where T = DMatrix(A, AutoBlocks()) -DArray(A::AbstractArray) = DArray(A, AutoBlocks()) +DVector(A::AbstractVector{T}, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, AutoBlocks(), assignment) +DMatrix(A::AbstractMatrix{T}, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, AutoBlocks(), assignment) +DArray(A::AbstractArray, assignment::AssignmentType = :arbitrary) = DArray(A, AutoBlocks(), assignment) -DVector(A::AbstractVector{T}, ::AutoBlocks) where T = DVector(A, auto_blocks(A)) -DMatrix(A::AbstractMatrix{T}, ::AutoBlocks) where T = DMatrix(A, auto_blocks(A)) -DArray(A::AbstractArray, ::AutoBlocks) = DArray(A, auto_blocks(A)) +DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, auto_blocks(A), assignment) +DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment) +DArray(A::AbstractArray, ::AutoBlocks, assignment::AssignmentType = :arbitrary) = DArray(A, auto_blocks(A), assignment) function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N} collect(x) == y diff --git a/src/array/lu.jl b/src/array/lu.jl index 6a0b14680..cf1980b09 100644 --- a/src/array/lu.jl +++ b/src/array/lu.jl @@ -1,3 +1,5 @@ +### LU NoPivot + function LinearAlgebra.lu(A::DMatrix{T}, ::LinearAlgebra.NoPivot; check::Bool=true) where T A_copy = LinearAlgebra._lucopy(A, LinearAlgebra.lutype(T)) return LinearAlgebra.lu!(A_copy, LinearAlgebra.NoPivot(); check=check) @@ -7,8 +9,6 @@ function LinearAlgebra.lu!(A::DMatrix{T}, ::LinearAlgebra.NoPivot; check::Bool=t mzone = -one(T) Ac = A.chunks mt, nt = size(Ac) - iscomplex = T <: Complex - trans = iscomplex ? 'C' : 'T' Dagger.spawn_datadeps() do for k in range(1, min(mt, nt)) @@ -29,3 +29,95 @@ function LinearAlgebra.lu!(A::DMatrix{T}, ::LinearAlgebra.NoPivot; check::Bool=t return LinearAlgebra.LU{T,DMatrix{T},DVector{Int}}(A, ipiv, 0) end + +### LU RowMaximum + +function searchmax_pivot!(piv_idx::AbstractArray{Int}, piv_val::AbstractArray{Float64}, A::AbstractArray{T}, offset::Int=0) where T + max_idx = LinearAlgebra.BLAS.iamax(A[:]) + piv_idx[1] = offset+max_idx + piv_val[1] = A[max_idx] +end + +function update_ipiv!(ipivl, piv_idx::AbstractArray{Int}, piv_val::AbstractArray{Float64}, k::Int, nb::Int) + max_piv_idx = LinearAlgebra.BLAS.iamax(piv_val) + ipivl[1] = (max_piv_idx+k-2)*nb + piv_idx[max_piv_idx] +end + +function swaprows_panel!(A::AbstractArray{T}, M::AbstractArray{T}, ipivl::AbstractVector{Int}, m::Int, p::Int, nb::Int) where T + q = div(ipivl[1]-1,nb) + 1 + r = (ipivl[1]-1)%nb+1 + if m == q + A[p,:], M[r,:] = M[r,:], A[p,:] + end +end + +function update_panel!(M::AbstractArray{T}, A::AbstractArray{T}, p::Int) where T + Acinv = 1 / A[p,p] + LinearAlgebra.BLAS.scal!(Acinv, view(M, :, p)) + LinearAlgebra.BLAS.ger!(-1.0, view(M, :, p), view(A, p, p+1:size(A,2)), view(M, :, p+1:size(M,2))) +end + +function swaprows_trail!(A::AbstractArray{T}, M::AbstractArray{T}, ipiv::AbstractVector{Int}, m::Int, nb::Int) where T + for p in eachindex(ipiv) + q = div(ipiv[p]-1,nb) + 1 + r = (ipiv[p]-1)%nb+1 + if m == q + A[p,:], M[r,:] = M[r,:], A[p,:] + end + end +end + +function LinearAlgebra.lu(A::DMatrix{T}, ::LinearAlgebra.RowMaximum; check::Bool=true) where T + A_copy = LinearAlgebra._lucopy(A, LinearAlgebra.lutype(T)) + return LinearAlgebra.lu!(A_copy, LinearAlgebra.RowMaximum(); check=check) +end +function LinearAlgebra.lu!(A::DMatrix{T}, ::LinearAlgebra.RowMaximum; check::Bool=true) where T + zone = one(T) + mzone = -one(T) + + Ac = A.chunks + mt, nt = size(Ac) + m, n = size(A) + mb, nb = A.partitioning.blocksize + + mb != nb && error("Unequal block sizes are not supported: mb = $mb, nb = $nb") + + ipiv = DVector(collect(1:min(m, n)), Blocks(mb)) + ipivc = ipiv.chunks + + max_piv_idx = zeros(Int,mt) + max_piv_val = zeros(mt) + + Dagger.spawn_datadeps() do + for k in 1:min(mt, nt) + for p in 1:min(nb, m-(k-1)*nb, n-(k-1)*nb) + Dagger.@spawn searchmax_pivot!(Out(view(max_piv_idx, k:k)), Out(view(max_piv_val, k:k)), In(view(Ac[k,k],p:min(nb,m-(k-1)*nb),p:p)), p-1) + for i in k+1:mt + Dagger.@spawn searchmax_pivot!(Out(view(max_piv_idx, i:i)), Out(view(max_piv_val, i:i)), In(view(Ac[i,k],:,p:p))) + end + Dagger.@spawn update_ipiv!(InOut(view(ipivc[k],p:p)), In(view(max_piv_idx, k:mt)), In(view(max_piv_val, k:mt)), k, nb) + for i in k:mt + Dagger.@spawn swaprows_panel!(InOut(Ac[k, k]), InOut(Ac[i, k]), InOut(view(ipivc[k],p:p)), i, p, nb) + end + Dagger.@spawn update_panel!(InOut(view(Ac[k,k],p+1:min(nb,m-(k-1)*nb),:)), In(Ac[k,k]), p) + for i in k+1:mt + Dagger.@spawn update_panel!(InOut(Ac[i, k]), In(Ac[k,k]), p) + end + + end + for j in Iterators.flatten((1:k-1, k+1:nt)) + for i in k:mt + Dagger.@spawn swaprows_trail!(InOut(Ac[k, j]), InOut(Ac[i, j]), In(ipivc[k]), i, mb) + end + end + for j in k+1:nt + Dagger.@spawn BLAS.trsm!('L', 'L', 'N', 'U', zone, In(Ac[k, k]), InOut(Ac[k, j])) + for i in k+1:mt + Dagger.@spawn BLAS.gemm!('N', 'N', mzone, In(Ac[i, k]), In(Ac[k, j]), zone, InOut(Ac[i, j])) + end + end + end + end + + return LinearAlgebra.LU{T,DMatrix{T},DVector{Int}}(A, ipiv, 0) +end \ No newline at end of file diff --git a/src/memory-spaces.jl b/src/memory-spaces.jl index 0ef0d1200..3623ff303 100644 --- a/src/memory-spaces.jl +++ b/src/memory-spaces.jl @@ -122,6 +122,7 @@ memory_spans(::T) where T<:AbstractAliasing = throw(ArgumentError("Must define ` memory_spans(x) = memory_spans(aliasing(x)) memory_spans(x, T) = memory_spans(aliasing(x, T)) + struct AliasingWrapper <: AbstractAliasing inner::AbstractAliasing hash::UInt64 @@ -387,3 +388,55 @@ function will_alias(x_span::MemorySpan, y_span::MemorySpan) y_end = y_span.ptr + y_span.len - 1 return x_span.ptr <= y_end && y_span.ptr <= x_end end + +struct ChunkSlice{N} #<: AbstractAliasing + chunk::Chunk + slices::NTuple{N, Union{Int, AbstractRange{Int}, Colon}} +end + +Base.copyto!(dest::ChunkSlice, src::ChunkSlice) = copyto!(view(unwrap(dest.chunk), dest.slices...), view(unwrap(src.chunk), src.slices...)) + +@inline function view(c::Chunk, slices...) + isa(c.domain, ArrayDomain) || throw(ArgumentError("Chunk must of a DArray (ArrayDomain), got $(typeof(c.domain))")) + nd, sz = ndims(c.domain), size(c.domain) + nd == length(slices) || throw(DimensionMismatch("Expected $nd slices, got $(length(slices))")) + + for (i, s) in enumerate(slices) + if s isa Int + 1 ≤ s ≤ sz[i] || throw(ArgumentError("Index $s out of bounds for dimension $i (size $(sz[i]))")) + elseif s isa AbstractRange + isempty(s) && continue + 1 ≤ first(s) ≤ last(s) ≤ sz[i] || throw(ArgumentError("Range $s out of bounds for dimension $i (size $(sz[i]))")) + elseif s === Colon() + continue + else + throw(ArgumentError("Invalid slice type $(typeof(s)) at dimension $i, Expected Type of Int, AbstractRange, or Colon")) + end + end + + return ChunkSlice(c, slices) +end + +view(c::DTask, slices...) = view(fetch(c; raw=true), slices...) + +function aliasing(x::ChunkSlice{N}) where N + remotecall_fetch(root_worker_id(x.chunk.processor), x.chunk, x.slices) do x, slices + x = unwrap(x) + v = view(x, slices...) + return aliasing(v) + end +end + +function move!(dep_mod, to_space::MemorySpace, from_space::MemorySpace, to::ChunkSlice, from::ChunkSlice) + to_w = root_worker_id(to_space) + remotecall_wait(to_w, dep_mod, to_space, from_space, to, from) do dep_mod, to_space, from_space, to, from + to_raw = unwrap(to.chunk) + from_w = root_worker_id(from_space) + from_raw = to_w == from_w ? unwrap(from.chunk) : remotecall_fetch(unwrap, from_w, from.chunk) + from_view = view(from_raw, from.slices...) + to_view = view(to_raw, to.slices...) + move!(dep_mod, to_space, from_space, to_view, from_view) + end +end + +move(from_proc::Processor, to_proc::Processor, slice::ChunkSlice) = view(move(from_proc, to_proc, slice.chunk), slice.slices...) \ No newline at end of file diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index b894f4526..0335fe3e0 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -14,7 +14,7 @@ import Random: randperm import Base: @invokelatest import ..Dagger -import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime import ..Dagger: @dagdebug, @safe_lock_spin1 import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek @@ -726,16 +726,25 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) sig = signature(state, task) # Calculate scope - scope = if task.f isa Chunk - task.f.scope - else - if task.options.proclist !== nothing - # proclist overrides scope selection - AnyScope() - else - DefaultScope() + scope = constrain(task.compute_scope, task.result_scope) + if scope isa InvalidScope + ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task + end + if task.f isa Chunk + scope = constrain(scope, task.f.scope) + if scope isa InvalidScope + ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task end end + for (_,input) in task.inputs input = unwrap_weak_checked(input) chunk = if istask(input) @@ -747,8 +756,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) end chunk isa Chunk || continue scope = constrain(scope, chunk.scope) - if scope isa Dagger.InvalidScope - ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)") + if scope isa InvalidScope + ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)") state.cache[task] = ex state.errored[task] = true set_failed!(state, task) @@ -1086,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options, propagated, ids, positions, (log_sink=ctx.log_sink, profile=ctx.profile), - sch_handle, state.uid]) + sch_handle, state.uid, thunk.result_scope]) end # N.B. We don't batch these because we might get a deserialization # error due to something not being defined on the worker, and then we don't @@ -1305,7 +1314,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re task = task_spec[] scope = task[5] if !isa(constrain(scope, Dagger.ExactScope(to_proc)), - Dagger.InvalidScope) && + InvalidScope) && typemax(UInt32) - proc_occupancy_cached >= occupancy # Compatible, steal this task return dequeue_pair!(queue) @@ -1488,7 +1497,7 @@ function do_task(to_proc, task_desc) scope, Tf, data, send_result, persist, cache, meta, options, propagated, ids, positions, - ctx_vars, sch_handle, sch_uid = task_desc + ctx_vars, sch_handle, sch_uid, result_scope = task_desc ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile) from_proc = OSProc() @@ -1696,7 +1705,7 @@ function do_task(to_proc, task_desc) # Construct result # TODO: We should cache this locally - send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache, + send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache, tag=options.storage_root_tag, leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()), retain=options.storage_retain) diff --git a/src/thunk.jl b/src/thunk.jl index b24806f85..659ee9a2c 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -73,6 +73,8 @@ mutable struct Thunk eager_ref::Union{DRef,Nothing} options::Any # stores scheduler-specific options propagates::Tuple # which options we'll propagate + compute_scope::AbstractScope + result_scope::AbstractScope function Thunk(f, xs...; syncdeps=nothing, id::Int=next_id(), @@ -84,16 +86,14 @@ mutable struct Thunk affinity=nothing, eager_ref=nothing, processor=nothing, - scope=nothing, + scope=DefaultScope(), + compute_scope=scope, + result_scope=AnyScope(), options=nothing, propagates=(), kwargs... ) - if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope)) - f = tochunk(f, - something(processor, OSProc()), - something(scope, DefaultScope())) - end + xs = Base.mapany(identity, xs) syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs))) if syncdeps !== nothing @@ -105,11 +105,11 @@ mutable struct Thunk if options !== nothing @assert isempty(kwargs) new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, - cache_ref, affinity, eager_ref, options, propagates) + cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope) else new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...), - propagates) + propagates, compute_scope, result_scope) end end end @@ -476,15 +476,6 @@ function spawn(f, args...; kwargs...) args = args[2:end] end - # Wrap f in a Chunk if necessary - processor = haskey(options, :processor) ? options.processor : nothing - scope = haskey(options, :scope) ? options.scope : nothing - if !isnothing(processor) || !isnothing(scope) - f = tochunk(f, - something(processor, get_options(:processor, OSProc())), - something(scope, get_options(:scope, DefaultScope()))) - end - # Process the args and kwargs into Pair form args_kwargs = args_kwargs_to_pairs(args, kwargs) diff --git a/test/array/allocation.jl b/test/array/allocation.jl index a95ef2efc..86f491ec1 100644 --- a/test/array/allocation.jl +++ b/test/array/allocation.jl @@ -201,6 +201,516 @@ end end end +@testset "Constructors and Functions with assignment" begin + + availprocs = collect(Dagger.all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + numprocs = length(availprocs) + + + function chunk_processors(Ad::DArray) + [Dagger.processor(Ad.chunks[idx].future.future.v.value[2]) for idx in CartesianIndices(size(Dagger.domainchunks(Ad)))] + end + + function tile_processors(proc_grid::AbstractArray{<:Dagger.Processor,N}, block_grid::Tuple{Vararg{Int,N}}) where N + reps = Int.(ceil.(block_grid ./ size(proc_grid))) + tiled = repeat(proc_grid, reps...) + idx_slices = [1:block_grid[d] for d in 1:length(block_grid)] + return tiled[idx_slices...] + end + + + A = rand(41, 35, 12) + v = rand(23) + M = rand(76,118) + + t_blocks_a = (4,3,2) + d_blocks_a = Dagger.Blocks(t_blocks_a) + blocks_a = cld.(size(A), t_blocks_a) + + n_blocks_v = 3 + t_blocks_v = (n_blocks_v,) + d_blocks_v = Dagger.Blocks(t_blocks_v) + blocks_v = cld.(size(v), t_blocks_v) + blocks_vv = [blocks_v...] + blocks_nv = blocks_v[1] + + t_blocks_m = (2,3) + d_blocks_m = Dagger.Blocks(t_blocks_m) + blocks_m = cld.(size(M), t_blocks_m) + + function get_default_blockgrid(data, numprocs) + ndims_data = ndims(data) + size_data = size(data) + ntuple(i->i == ndims_data ? cld( size_data[ndims_data], cld(size_data[ndims_data], numprocs) ) : 1, ndims_data) + end + + + @testset "Arbitrary Assignment (:arbitrary)" begin + assignment = :arbitrary + + @testset "Auto Blocks" begin + + @test distribute(A, assignment) isa DArray && distribute(A, AutoBlocks(), assignment) isa DArray + @test distribute(v, assignment) isa DVector && distribute(v, AutoBlocks(), assignment) isa DVector + @test distribute(M, assignment) isa DMatrix && distribute(M, AutoBlocks(), assignment) isa DMatrix + + @test DArray( A, assignment) isa DArray && DArray( A, AutoBlocks(), assignment) isa DArray + @test DVector(v, assignment) isa DVector && DVector( v, AutoBlocks(), assignment) isa DVector + @test DMatrix(M, assignment) isa DMatrix && DMatrix( M, AutoBlocks(), assignment) isa DMatrix + + @test rand( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && rand( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test rand( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && rand( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test rand( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && rand( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test randn( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && randn( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test randn( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && randn( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test randn( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && randn( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test Dagger.sprand(AutoBlocks(), size(A)..., 0.5; assignment=assignment) isa DArray && Dagger.sprand(AutoBlocks(), size(A), 0.5; assignment=assignment) isa DArray + @test Dagger.sprand(AutoBlocks(), size(v)..., 0.5; assignment=assignment) isa DVector && Dagger.sprand(AutoBlocks(), size(v), 0.5; assignment=assignment) isa DVector + @test Dagger.sprand(AutoBlocks(), size(M)..., 0.5; assignment=assignment) isa DMatrix && Dagger.sprand(AutoBlocks(), size(M), 0.5; assignment=assignment) isa DMatrix + + @test ones( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && ones( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test ones( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && ones( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test ones( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && ones( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test zeros( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && zeros( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test zeros( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && zeros( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test zeros( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && zeros( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + end + + @testset "Explicit Blocks" begin + + @test distribute(A, d_blocks_a, assignment) isa DArray && distribute(A, blocks_a, assignment) isa DArray + @test distribute(v, d_blocks_v, assignment) isa DVector && distribute(v, blocks_v, assignment) isa DVector + @test distribute(v, n_blocks_v, assignment) isa DVector && distribute(v, blocks_vv, assignment) isa DVector + @test distribute(M, d_blocks_m, assignment) isa DMatrix && distribute(M, blocks_m, assignment) isa DMatrix + + @test DArray( A, d_blocks_a, assignment) isa DArray + @test DVector(v, d_blocks_v, assignment) isa DVector + @test DMatrix(M, d_blocks_m, assignment) isa DMatrix + + @test rand( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && rand( d_blocks_a, size(A); assignment=assignment) isa DArray + @test rand( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && rand( d_blocks_v, size(v); assignment=assignment) isa DVector + @test rand( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && rand( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test randn( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && randn( d_blocks_a, size(A); assignment=assignment) isa DArray + @test randn( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && randn( d_blocks_v, size(v); assignment=assignment) isa DVector + @test randn( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && randn( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test Dagger.sprand(d_blocks_a, size(A)..., 0.5; assignment=assignment) isa DArray && Dagger.sprand(d_blocks_a, size(A), 0.5; assignment=assignment) isa DArray + @test Dagger.sprand(d_blocks_v, size(v)..., 0.5; assignment=assignment) isa DVector && Dagger.sprand(d_blocks_v, size(v), 0.5; assignment=assignment) isa DVector + @test Dagger.sprand(d_blocks_m, size(M)..., 0.5; assignment=assignment) isa DMatrix && Dagger.sprand(d_blocks_m, size(M), 0.5; assignment=assignment) isa DMatrix + + @test ones( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && ones( d_blocks_a, size(A); assignment=assignment) isa DArray + @test ones( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && ones( d_blocks_v, size(v); assignment=assignment) isa DVector + @test ones( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && ones( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test zeros( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && zeros( d_blocks_a, size(A); assignment=assignment) isa DArray + @test zeros( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && zeros( d_blocks_v, size(v); assignment=assignment) isa DVector + @test zeros( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && zeros( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + end + + end + + + @testset "Structured Assignment (:blockrow, :blockcol, :cyclicrow, :cycliccol)" begin + + function get_default_blockgrid(data, numprocs) + ndims_data = ndims(data) + size_data = size(data) + ntuple(i->i == ndims_data ? cld( size_data[ndims_data], cld(size_data[ndims_data], numprocs) ) : 1, ndims_data) + end + + function get_blockrow_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == 1 ? blocksize[1] : 1, ndims_data) + rows_per_proc, extra = divrem(blocksize[1], numprocs) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:numprocs] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + return procgrid + end + + function get_blockcol_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == ndims_data ? blocksize[end] : 1, ndims_data) + cols_per_proc, extra = divrem(blocksize[end], numprocs) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:numprocs] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + return procgrid + end + + function get_cyclicrow_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == 1 ? numprocs : 1, ndims_data) + procgrid = reshape(availprocs, p) + return procgrid + end + + function get_cycliccol_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == ndims_data ? numprocs : 1, ndims_data) + procgrid = reshape(availprocs, p) + return procgrid + end + + function test_assignment_strategy(assignment::Symbol, get_assignment_procgrid) + + @testset "Block Row Assignment (:$assignment)" begin + + @testset "Auto Blocks" begin + + dist_A_def_auto = distribute(A, assignment); fetch(dist_A_def_auto) + dist_A_auto_def = distribute(A, AutoBlocks(), assignment); fetch(dist_A_auto_def) + dist_v_def_auto = distribute(v, assignment); fetch(dist_v_def_auto) + dist_v_auto_def = distribute(v, AutoBlocks(), assignment); fetch(dist_v_auto_def) + dist_M_def_auto = distribute(M, assignment); fetch(dist_M_def_auto) + dist_M_auto_def = distribute(M, AutoBlocks(), assignment); fetch(dist_M_auto_def) + + darr_A_def_auto = DArray( A, assignment); fetch(darr_A_def_auto) + darr_A_auto_def = DArray( A, AutoBlocks(), assignment); fetch(darr_A_auto_def) + dvec_v_def_auto = DVector( v, assignment); fetch(dvec_v_def_auto) + dvec_v_auto_def = DVector( v, AutoBlocks(), assignment); fetch(dvec_v_auto_def) + dmat_M_def_auto = DMatrix( M, assignment); fetch(dmat_M_def_auto) + dmat_M_auto_def = DMatrix( M, AutoBlocks(), assignment); fetch(dmat_M_auto_def) + + @test chunk_processors(dist_A_def_auto) == chunk_processors(dist_A_auto_def) == chunk_processors(darr_A_def_auto) == chunk_processors(darr_A_auto_def) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_def_auto) == chunk_processors(dist_v_auto_def) == chunk_processors(dvec_v_def_auto) == chunk_processors(dvec_v_auto_def) == tile_processors(get_assignment_procgrid(v, numprocs, get_default_blockgrid(v, numprocs)), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_def_auto) == chunk_processors(dist_M_auto_def) == chunk_processors(dmat_M_def_auto) == chunk_processors(dmat_M_auto_def) == tile_processors(get_assignment_procgrid(M, numprocs, get_default_blockgrid(M, numprocs)), get_default_blockgrid(M, numprocs)) + + end + + @testset "Functions with AutoBlocks" begin + + rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=assignment); fetch(rand_A_auto) + rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=assignment); fetch(rand_v_auto) + rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=assignment); fetch(rand_M_auto) + + randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=assignment); fetch(randn_A_auto) + randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=assignment); fetch(randn_v_auto) + randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=assignment); fetch(randn_M_auto) + + # sprand_A_auto = Dagger.sprand(AutoBlocks(), size(A)..., 0.5; assignment=assignment); fetch(sprand_A_auto) + sprand_v_auto = Dagger.sprand(AutoBlocks(), size(v)..., 0.5; assignment=assignment); fetch(sprand_v_auto) + sprand_M_auto = Dagger.sprand(AutoBlocks(), size(M)..., 0.5; assignment=assignment); fetch(sprand_M_auto) + + ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=assignment); fetch(ones_A_auto) + ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=assignment); fetch(ones_v_auto) + ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=assignment); fetch(ones_M_auto) + + zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=assignment); fetch(zeros_A_auto) + zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=assignment); fetch(zeros_v_auto) + zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=assignment); fetch(zeros_M_auto) + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + # @test chunk_processors(sprand_A_auto) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(get_assignment_procgrid(v, numprocs, get_default_blockgrid(v, numprocs)), get_default_blockgrid(v, numprocs)) + @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(get_assignment_procgrid(M, numprocs, get_default_blockgrid(M, numprocs)), get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_def = distribute(A, d_blocks_a, assignment); fetch(dist_A_exp_def) + dist_A_blocks_exp = distribute(A, blocks_a, assignment); fetch(dist_A_blocks_exp) + dist_v_exp_def = distribute(v, d_blocks_v, assignment); fetch(dist_v_exp_def) + dist_v_blocks_exp = distribute(v, blocks_v, assignment); fetch(dist_v_blocks_exp) + dist_v_nblocks_exp = distribute(v, blocks_nv, assignment); fetch(dist_v_nblocks_exp) + dist_v_vblocks_exp = distribute(v, blocks_vv, assignment); fetch(dist_v_vblocks_exp) + dist_M_exp_def = distribute(M, d_blocks_m, assignment); fetch(dist_M_exp_def) + dist_M_blocks_exp = distribute(M, blocks_m, assignment); fetch(dist_M_blocks_exp) + + darr_A_exp_def = DArray( A, d_blocks_a, assignment); fetch(darr_A_exp_def) + dvec_v_exp_def = DVector( v, d_blocks_v, assignment); fetch(dvec_v_exp_def) + dmat_M_exp_def = DMatrix( M, d_blocks_m, assignment); fetch(dmat_M_exp_def) + + + @test chunk_processors(dist_A_exp_def) == chunk_processors(dist_A_blocks_exp) == chunk_processors(darr_A_exp_def) == tile_processors(get_assignment_procgrid(A, numprocs, blocks_a), blocks_a) + @test chunk_processors(dist_v_exp_def) == chunk_processors(dist_v_blocks_exp) == chunk_processors(dvec_v_exp_def) == tile_processors(get_assignment_procgrid(v, numprocs, blocks_v), blocks_v) + @test chunk_processors(dist_v_nblocks_exp) == chunk_processors(dist_v_vblocks_exp) == tile_processors(get_assignment_procgrid(v, numprocs, blocks_v), blocks_v) + @test chunk_processors(dist_M_exp_def) == chunk_processors(dist_M_blocks_exp) == chunk_processors(dmat_M_exp_def) == tile_processors(get_assignment_procgrid(M, numprocs, blocks_m), blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=assignment); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=assignment); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=assignment); fetch(rand_M_exp) + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=assignment); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=assignment); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=assignment); fetch(rand_M_exp) + + randn_A_exp = randn( d_blocks_a, size(A)...; assignment=assignment); fetch(randn_A_exp) + randn_v_exp = randn( d_blocks_v, size(v)...; assignment=assignment); fetch(randn_v_exp) + randn_M_exp = randn( d_blocks_m, size(M)...; assignment=assignment); fetch(randn_M_exp) + + # sprand_A_exp = Dagger.sprand(d_blocks_a, size(A)..., 0.5; assignment=assignment); fetch(sprand_A_exp) + sprand_v_exp = Dagger.sprand(d_blocks_v, size(v)..., 0.5; assignment=assignment); fetch(sprand_v_exp) + sprand_M_exp = Dagger.sprand(d_blocks_m, size(M)..., 0.5; assignment=assignment); fetch(sprand_M_exp) + + ones_A_exp = ones( d_blocks_a, size(A)...; assignment=assignment); fetch(ones_A_exp) + ones_v_exp = ones( d_blocks_v, size(v)...; assignment=assignment); fetch(ones_v_exp) + ones_M_exp = ones( d_blocks_m, size(M)...; assignment=assignment); fetch(ones_M_exp) + + zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=assignment); fetch(zeros_A_exp) + zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=assignment); fetch(zeros_v_exp) + zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=assignment); fetch(zeros_M_exp) + + # @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + # @test chunk_processors(sprand_A_exp) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + # @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(get_assignment_procgrid(v, numprocs, get_default_blockgrid(v, numprocs)), get_default_blockgrid(v, numprocs)) + # @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(get_assignment_procgrid(M, numprocs, get_default_blockgrid(M, numprocs)), get_default_blockgrid(M, numprocs)) + + end + + end + + end + + test_assignment_strategy(:blockrow, get_blockrow_procgrid) + test_assignment_strategy(:blockcol, get_blockcol_procgrid) + test_assignment_strategy(:cyclicrow, get_cyclicrow_procgrid) + test_assignment_strategy(:cycliccol, get_cycliccol_procgrid) + + end + + @testset "OSProc ID Array Assignment (AbstractArray{<:Int, N})" begin + + function get_random_threadprocs(proc_ids) + [Dagger.ThreadProc(proc, 1) for proc in proc_ids] + end + + rand_osproc_ids_A = rand(Dagger.procs(), 3, 2, 2) + rand_osproc_ids_v = rand(Dagger.procs(), 11) + rand_osproc_ids_M = rand(Dagger.procs(), 2, 5) + + @testset "Auto Blocks" begin + + dist_A_rand_osproc_auto = distribute(A, rand_osproc_ids_A); fetch(dist_A_rand_osproc_auto) + dist_A_auto_rand_osproc = distribute(A, AutoBlocks(), rand_osproc_ids_A); fetch(dist_A_auto_rand_osproc) + # dist_v_rand_osproc_auto = distribute(v, rand_osproc_ids_v); fetch(dist_v_rand_osproc_auto) + dist_v_auto_rand_osproc = distribute(v, AutoBlocks(), rand_osproc_ids_v); fetch(dist_v_auto_rand_osproc) + dist_M_rand_osproc_auto = distribute(M, rand_osproc_ids_M); fetch(dist_M_rand_osproc_auto) + dist_M_auto_rand_osproc = distribute(M, AutoBlocks(), rand_osproc_ids_M); fetch(dist_M_auto_rand_osproc) + + darr_A_rand_osproc_auto = DArray( A, rand_osproc_ids_A); fetch(darr_A_rand_osproc_auto) + darr_A_auto_rand_osproc = DArray( A, AutoBlocks(), rand_osproc_ids_A); fetch(darr_A_auto_rand_osproc) + dvec_v_rand_osproc_auto = DVector( v, rand_osproc_ids_v); fetch(dvec_v_rand_osproc_auto) + dvec_v_auto_rand_osproc = DVector( v, AutoBlocks(), rand_osproc_ids_v); fetch(dvec_v_auto_rand_osproc) + # dmat_M_rand_osproc_auto = DMatrix( M, rand_osproc_ids_M); fetch(dmat_M_rand_osproc_auto) ### rand_osproc_ids_M assigned as Blocks + dmat_M_auto_rand_osproc = DMatrix( M, AutoBlocks(), rand_osproc_ids_M); fetch(dmat_M_auto_rand_osproc) + + @test chunk_processors(dist_A_rand_osproc_auto) == chunk_processors(dist_A_auto_rand_osproc) == chunk_processors(darr_A_rand_osproc_auto) == chunk_processors(darr_A_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_auto_rand_osproc) == chunk_processors(dvec_v_rand_osproc_auto) == chunk_processors(dvec_v_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(v, numprocs)) + # @test chunk_processors(dist_v_rand_osproc_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_rand_osproc_auto) == chunk_processors(dist_M_auto_rand_osproc) == chunk_processors(dmat_M_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(M, numprocs)) + # @test chunk_processors(dmat_M_rand_osproc_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(M, numprocs)) + end + + @testset "Functions with AutoBlocks" begin + + rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(rand_A_auto) + rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(rand_v_auto) + rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(rand_M_auto) + + randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(randn_A_auto) + randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(randn_v_auto) + randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(randn_M_auto) + + # sprand_A_auto = Dagger.sprand(AutoBlocks(), size(A)..., 0.5; assignment=rand_osproc_ids_A); fetch(sprand_A_auto) + sprand_v_auto = Dagger.sprand(AutoBlocks(), size(v)..., 0.5; assignment=rand_osproc_ids_v); fetch(sprand_v_auto) + sprand_M_auto = Dagger.sprand(AutoBlocks(), size(M)..., 0.5; assignment=rand_osproc_ids_M); fetch(sprand_M_auto) + + ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(ones_A_auto) + ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(ones_v_auto) + ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(ones_M_auto) + + zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(zeros_A_auto) + zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(zeros_v_auto) + zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(zeros_M_auto) + + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), get_default_blockgrid(rand_A_auto, numprocs)) + # @test chunk_processors(sprand_A_auto) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(rand_v_auto, numprocs)) + @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(rand_M_auto, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_osproc = distribute(A, d_blocks_a, rand_osproc_ids_A); fetch(dist_A_exp_rand_osproc) + dist_A_blocks_rand_osproc = distribute(A, blocks_a, rand_osproc_ids_A); fetch(dist_A_blocks_rand_osproc) + dist_v_exp_rand_osproc = distribute(v, d_blocks_v, rand_osproc_ids_v); fetch(dist_v_exp_rand_osproc) + dist_v_blocks_rand_osproc = distribute(v, blocks_v, rand_osproc_ids_v); fetch(dist_v_blocks_rand_osproc) + dist_v_nblocks_rand_osproc = distribute(v, blocks_nv, rand_osproc_ids_v); fetch(dist_v_nblocks_rand_osproc) + dist_v_vblocks_rand_osproc = distribute(v, blocks_vv, rand_osproc_ids_v); fetch(dist_v_vblocks_rand_osproc) + dist_M_exp_rand_osproc = distribute(M, d_blocks_m, rand_osproc_ids_M); fetch(dist_M_exp_rand_osproc) + dist_M_blocks_rand_osproc = distribute(M, blocks_m, rand_osproc_ids_M); fetch(dist_M_blocks_rand_osproc) + + darr_A_exp_rand_osproc = DArray( A, d_blocks_a, rand_osproc_ids_A); fetch(darr_A_exp_rand_osproc) + dvec_v_exp_rand_osproc = DVector( v, d_blocks_v, rand_osproc_ids_v); fetch(dvec_v_exp_rand_osproc) + dmat_M_exp_rand_osproc = DMatrix( M, d_blocks_m, rand_osproc_ids_M); fetch(dmat_M_exp_rand_osproc) + + @test chunk_processors(dist_A_exp_rand_osproc) == chunk_processors(dist_A_blocks_rand_osproc) == chunk_processors(darr_A_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), blocks_a) + @test chunk_processors(dist_v_exp_rand_osproc) == chunk_processors(dist_v_blocks_rand_osproc) == chunk_processors(dvec_v_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(dist_v_nblocks_rand_osproc) == chunk_processors(dist_v_vblocks_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(dist_M_exp_rand_osproc) == chunk_processors(dist_M_blocks_rand_osproc) == chunk_processors(dmat_M_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(rand_M_exp) + + randn_A_exp = randn( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(randn_A_exp) + randn_v_exp = randn( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(randn_v_exp) + randn_M_exp = randn( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(randn_M_exp) + + # sprand_A_exp = Dagger.sprand(d_blocks_a, size(A)..., 0.5; assignment=rand_osproc_ids_A); fetch(sprand_A_exp) + sprand_v_exp = Dagger.sprand(d_blocks_v, size(v)..., 0.5; assignment=rand_osproc_ids_v); fetch(sprand_v_exp) + sprand_M_exp = Dagger.sprand(d_blocks_m, size(M)..., 0.5; assignment=rand_osproc_ids_M); fetch(sprand_M_exp) + + ones_A_exp = ones( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(ones_A_exp) + ones_v_exp = ones( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(ones_v_exp) + ones_M_exp = ones( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(ones_M_exp) + + zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(zeros_A_exp) + zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(zeros_v_exp) + zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(zeros_M_exp) + + + @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), blocks_a) + # @test chunk_processors(sprand_A_exp) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), blocks_a) + @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), blocks_m) + + end + + end + + + @testset "Explicit Processor Array Assignment (AbstractArray{<:Processor, N})" begin + + rand_procs_A = reshape(availprocs[ rand(Dagger.procs(), 6) ], 2, 3, 1) + rand_procs_v = reshape(availprocs[ rand(Dagger.procs(), 5) ], 5) + rand_procs_M = reshape(availprocs[ rand(Dagger.procs(), 14) ], 2, 7) + + + @testset "Auto Blocks" begin + + dist_A_rand_procs_auto = distribute(A, rand_procs_A); fetch(dist_A_rand_procs_auto) + dist_A_auto_rand_procs = distribute(A, AutoBlocks(), rand_procs_A); fetch(dist_A_auto_rand_procs) + dist_v_rand_procs_auto = distribute(v, rand_procs_v); fetch(dist_v_rand_procs_auto) + dist_v_auto_rand_procs = distribute(v, AutoBlocks(), rand_procs_v); fetch(dist_v_auto_rand_procs) + dist_M_rand_procs_auto = distribute(M, rand_procs_M); fetch(dist_M_rand_procs_auto) + dist_M_auto_rand_procs = distribute(M, AutoBlocks(), rand_procs_M); fetch(dist_M_auto_rand_procs) + + darr_A_rand_procs_auto = DArray( A, rand_procs_A); fetch(darr_A_rand_procs_auto) + darr_A_auto_rand_procs = DArray( A, AutoBlocks(), rand_procs_A); fetch(darr_A_auto_rand_procs) + dvec_v_rand_procs_auto = DVector( v, rand_procs_v); fetch(dvec_v_rand_procs_auto) + dvec_v_auto_rand_procs = DVector( v, AutoBlocks(), rand_procs_v); fetch(dvec_v_auto_rand_procs) + dmat_M_rand_procs_auto = DMatrix( M, rand_procs_M); fetch(dmat_M_rand_procs_auto) + dmat_M_auto_rand_procs = DMatrix( M, AutoBlocks(), rand_procs_M); fetch(dmat_M_auto_rand_procs) + + @test chunk_processors(dist_A_rand_procs_auto) == chunk_processors(dist_A_auto_rand_procs) == chunk_processors(darr_A_rand_procs_auto) == chunk_processors(darr_A_auto_rand_procs) == tile_processors(rand_procs_A, get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_rand_procs_auto) == chunk_processors(dist_v_auto_rand_procs) == chunk_processors(dvec_v_rand_procs_auto) == chunk_processors(dvec_v_auto_rand_procs) == tile_processors(rand_procs_v, get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_rand_procs_auto) == chunk_processors(dist_M_auto_rand_procs) == chunk_processors(dmat_M_rand_procs_auto) == chunk_processors(dmat_M_auto_rand_procs) == tile_processors(rand_procs_M, get_default_blockgrid(M, numprocs)) + + end + + @testset "Functions with AutoBlocks" begin + + # rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(rand_A_auto) + # rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(rand_v_auto) + # rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(rand_M_auto) + + # randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(randn_A_auto) + # randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(randn_v_auto) + # randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(randn_M_auto) + + # # sprand_A_auto = Dagger.sprand(AutoBlocks(), size(A)..., 0.5; assignment=rand_procs_A); fetch(sprand_A_auto) + # sprand_v_auto = Dagger.sprand(AutoBlocks(), size(v)..., 0.5; assignment=rand_procs_v); fetch(sprand_v_auto) + # sprand_M_auto = Dagger.sprand(AutoBlocks(), size(M)..., 0.5; assignment=rand_procs_M); fetch(sprand_M_auto) + + # ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(ones_A_auto) + # ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(ones_v_auto) + # ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(ones_M_auto) + + # zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(zeros_A_auto) + # zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(zeros_v_auto) + # zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(zeros_M_auto) + + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(rand_procs_A, get_default_blockgrid(A, numprocs)) + # @test chunk_processors(sprand_A_auto) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + # @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(rand_procs_v, get_default_blockgrid(v, numprocs)) + # @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(rand_procs_M, get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_procs = distribute(A, d_blocks_a, rand_procs_A); fetch(dist_A_exp_rand_procs) + dist_A_blocks_rand_procs = distribute(A, blocks_a, rand_procs_A); fetch(dist_A_blocks_rand_procs) + dist_v_exp_rand_procs = distribute(v, d_blocks_v, rand_procs_v); fetch(dist_v_exp_rand_procs) + dist_v_blocks_rand_procs = distribute(v, blocks_v, rand_procs_v); fetch(dist_v_blocks_rand_procs) + dist_v_nblocks_rand_procs = distribute(v, blocks_nv, rand_procs_v); fetch(dist_v_nblocks_rand_procs) + dist_v_vblocks_rand_procs = distribute(v, blocks_vv, rand_procs_v); fetch(dist_v_vblocks_rand_procs) + dist_M_exp_rand_procs = distribute(M, d_blocks_m, rand_procs_M); fetch(dist_M_exp_rand_procs) + dist_M_blocks_rand_procs = distribute(M, blocks_m, rand_procs_M); fetch(dist_M_blocks_rand_procs) + + darr_A_exp_rand_procs = DArray( A, d_blocks_a, rand_procs_A); fetch(darr_A_exp_rand_procs) + dvec_v_exp_rand_procs = DVector( v, d_blocks_v, rand_procs_v); fetch(dvec_v_exp_rand_procs) + dmat_M_exp_rand_procs = DMatrix( M, d_blocks_m, rand_procs_M); fetch(dmat_M_exp_rand_procs) + + @test chunk_processors(dist_A_exp_rand_procs) == chunk_processors(dist_A_blocks_rand_procs) == chunk_processors(darr_A_exp_rand_procs) == tile_processors(rand_procs_A, blocks_a) + @test chunk_processors(dist_v_exp_rand_procs) == chunk_processors(dist_v_blocks_rand_procs) == chunk_processors(dvec_v_exp_rand_procs) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(dist_v_nblocks_rand_procs) == chunk_processors(dist_v_vblocks_rand_procs) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(dist_M_exp_rand_procs) == chunk_processors(dist_M_blocks_rand_procs) == chunk_processors(dmat_M_exp_rand_procs) == tile_processors(rand_procs_M, blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + # rand_A_exp = rand( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(rand_A_exp) + # rand_v_exp = rand( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(rand_v_exp) + # rand_M_exp = rand( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(rand_M_exp) + + # randn_A_exp = randn( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(randn_A_exp) + # randn_v_exp = randn( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(randn_v_exp) + # randn_M_exp = randn( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(randn_M_exp) + + # # sprand_A_exp = Dagger.sprand(d_blocks_a, size(A)..., 0.5; assignment=rand_procs_A); fetch(sprand_A_exp) + # sprand_v_exp = Dagger.sprand(d_blocks_v, size(v)..., 0.5; assignment=rand_procs_v); fetch(sprand_v_exp) + # sprand_M_exp = Dagger.sprand(d_blocks_m, size(M)..., 0.5; assignment=rand_procs_M); fetch(sprand_M_exp) + + # ones_A_exp = ones( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(ones_A_exp) + # ones_v_exp = ones( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(ones_v_exp) + # ones_M_exp = ones( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(ones_M_exp) + + # zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(zeros_A_exp) + # zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(zeros_v_exp) + # zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(zeros_M_exp) + + # @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(rand_procs_A, blocks_a) + # @test chunk_processors(sprand_A_exp) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), blocks_a) + # @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(rand_procs_A, blocks_a) + # @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(rand_procs_A, blocks_a) + + end + + end + +end + @testset "view" begin A = rand(64, 64) DA = view(A, Blocks(8, 8)) @@ -211,6 +721,44 @@ end @test A_v == A[1:8, 1:8] end +@testset "Chunk view of DArray" begin + A = rand(64, 64) + DA = DArray(A, Blocks(8,8)) + chunk = DA.chunks[1,1] + + @testset "Valid Slices" begin + @test view(chunk, :, :) isa ChunkSlice && view(chunk, 1:8, 1:8) isa ChunkSlice + @test view(chunk, 1:2:7, :) isa ChunkSlice && view(chunk, :, 2:2:8) isa ChunkSlice + @test view(chunk, 1, :) isa ChunkSlice && view(chunk, :, 1) isa ChunkSlice + @test view(chunk, 3:3, 5:5) isa ChunkSlice && view(chunk, 5:7, 1:2:4) isa ChunkSlice + @test view(chunk, 8, 8) isa ChunkSlice + @test view(chunk, 1:0, :) isa ChunkSlice + end + + @testset "Dimension Mismatch" begin + @test_throws DimensionMismatch view(chunk, :) + @test_throws DimensionMismatch view(chunk, :, :, :) + end + + @testset "Int Slice Out of Bounds" begin + @test_throws ArgumentError view(chunk, 0, :) + @test_throws ArgumentError view(chunk, :, 9) + @test_throws ArgumentError view(chunk, 9, 1) + end + + @testset "Range Slice Out of Bounds" begin + @test_throws ArgumentError view(chunk, 0:5, :) + @test_throws ArgumentError view(chunk, 1:8, 5:10) + @test_throws ArgumentError view(chunk, 2:2:10, :) + end + + @testset "Invalid Slice Types" begin + @test_throws DimensionMismatch view(chunk, (1:2, :)) + @test_throws ArgumentError view(chunk, :, [1, 2]) + end + +end + @testset "copy/similar" begin X1 = ones(Blocks(10, 10), 100, 100) X2 = copy(X1) diff --git a/test/runtests.jl b/test/runtests.jl index 20c7eb41c..a7b7a890a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ tests = [ ("Options", "options.jl"), ("Mutation", "mutation.jl"), ("Task Queues", "task-queues.jl"), + ("Task Affinity", "task-affinity.jl"), ("Datadeps", "datadeps.jl"), ("Streaming", "streaming.jl"), ("Domain Utilities", "domain.jl"), diff --git a/test/task-affinity.jl b/test/task-affinity.jl new file mode 100644 index 000000000..7a74d0958 --- /dev/null +++ b/test/task-affinity.jl @@ -0,0 +1,569 @@ +@testset "Task affinity" begin + + get_compute_scope(x::DTask) = try + Dagger.Sch._find_thunk(x).compute_scope + catch + Dagger.InvalidScope + end + + get_result_scope(x::DTask) = try + fetch(x; raw=true).scope + catch + Dagger.InvalidScope + end + + get_execution_scope(x::DTask) = try + chunk = fetch(x; raw=true) + Dagger.ExactScope(chunk.processor) + catch + Dagger.InvalidScope + end + + function intersect_scopes(scope1::Dagger.AbstractScope, scopes::Dagger.AbstractScope...) + for s in scopes + scope1 = Dagger.constrain(scope1, s) + scope1 isa Dagger.InvalidScope && return (scope1,) + end + return scope1.scopes + end + + availprocs = collect(Dagger.all_processors()) + availscopes = shuffle!(Dagger.ExactScope.(availprocs)) + numscopes = length(availscopes) + + master_proc = Dagger.ThreadProc(1, 1) + master_scope = Dagger.ExactScope(master_proc) + + @testset "Function: scope, compute_scope and result_scope" begin + + @everywhere f(x) = x + 1 + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn scope=scope_only f(10); fetch(task1) + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == Dagger.AnyScope() + + execution_scope1 = get_execution_scope(task1) + @test execution_scope1 in intersect_scopes(execution_scope1,scope_only) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + scope = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn compute_scope=compute_scope_only f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only f(20); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == Dagger.AnyScope() + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + @test execution_scope1 in intersect_scopes(execution_scope1, compute_scope_only) && + execution_scope2 in intersect_scopes(execution_scope2, compute_scope_only) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn result_scope=result_scope_only f(10); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == result_scope_only + end + + @testset "compute_scope and result_scope with intersection" begin + if numscopes >= 3 + n = cld(numscopes, 3) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect f(20); fetch(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect f(30); fetch(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_intersect + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) + @test execution_scope1 in intersect_scopes(execution_scope1, compute_scope_intersect, result_scope_intersect) && + execution_scope2 in intersect_scopes(execution_scope2, compute_scope_intersect, result_scope_intersect) && + execution_scope3 in intersect_scopes(execution_scope3, compute_scope_intersect, result_scope_intersect) + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(numscopes, 2) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect f(10); wait(task1) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect f(20); wait(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect f(30); wait(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == Dagger.InvalidScope + + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk function: scope, compute_scope and result_scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + n = cld(numscopes, 2) + + chunk_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + chunk_proc = rand(availprocs) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn scope=scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + + @test execution_scope1 == Dagger.ExactScope(chunk_proc) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn compute_scope=compute_scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + @test execution_scope1 == execution_scope2 == Dagger.ExactScope(chunk_proc) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn result_scope=result_scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + @test execution_scope1 == Dagger.ExactScope(chunk_proc) + end + + @testset "compute_scope and result_scope with intersection" begin + if length(availscopes) >= 3 + n = cld(numscopes, 3) + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1 ) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); fetch(task2 ) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope); fetch(task3 ) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) + @test execution_scope1 == execution_scope2 == execution_scope3 == Dagger.ExactScope(chunk_proc) + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(length(availscopes), 2) + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); wait(task1 ) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); wait(task2 ) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope); wait(task3 ) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == Dagger.InvalidScope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk arguments: scope, compute_scope and result_scope with non-intersection of chunk arg and scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + n = cld(numscopes, 2) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + arg_scope = Dagger.UnionScope(scope_a...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == scope_only + @test get_result_scope(task11) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + + @test execution_scope11 == Dagger.InvalidScope + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + scope = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); wait(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + @test execution_scope11 == execution_scope21 == Dagger.InvalidScope + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + @test execution_scope11 == Dagger.InvalidScope + end + + @testset "compute_scope and result_scope with intersection" begin + if length(scope_b) >= 3 + n = cld(length(scope_b), 3) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:2n] + scope_bc = scope_b[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_ba..., scope_bb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bb..., scope_bc...) + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); wait(task11 ) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test execution_scope11 == execution_scope21 == execution_scope31 == Dagger.InvalidScope + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(scope_b) >= 2 + n = cld(length(scope_b), 2) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bb...) + + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); ; wait(task11 ) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); ; wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk arguments: scope, compute_scope and result_scope with intersection of chunk arg and scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + n = cld(numscopes, 3) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + arg_scope = Dagger.UnionScope(scope_a..., scope_b...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == scope_only + @test get_result_scope(task11) == Dagger.AnyScope() + + execution_scope11 = get_execution_scope(task11) + + @test execution_scope11 in intersect_scopes(execution_scope11, scope_only, arg_scope) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + scope = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); fetch(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); fetch(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.AnyScope() + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + @test execution_scope11 in intersect_scopes(execution_scope11, compute_scope_only, arg_scope) && + execution_scope11 in intersect_scopes(execution_scope11, compute_scope_only, arg_scope) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == result_scope_only + + execution_scope11 = get_execution_scope(task11) + @test execution_scope11 in intersect_scopes(execution_scope11, result_scope_only, arg_scope) + end + + @testset "compute_scope and result_scope with intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 3 + n = cld(length(scope_bc), 3) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:2n] + scope_bcc = scope_bc[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_bca..., scope_bcb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bcb..., scope_bcc...) + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); fetch(task11 ) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); fetch(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); fetch(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test execution_scope11 in intersect_scopes(execution_scope11, compute_scope_intersect, result_scope_intersect, arg_scope) && + execution_scope21 in intersect_scopes(execution_scope21, scope_intersect, result_scope_intersect, arg_scope) && + execution_scope31 in intersect_scopes(execution_scope31, compute_scope_intersect, result_scope_intersect, arg_scope) + end + end + + @testset "compute_scope and result_scope without intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 2 + n = cld(length(scope_bc), 2) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bcb...) + + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11 ) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + end + + # @testset "Chunk function with Chunk arguments: scope, compute_scope and result_scope with non-intersection of chunk arg and chunk scope" begin + + # @everywhere g(x, y) = x * 2 + y * 3 + + # availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + + # chunk_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # chunk_proc = rand(chunk_scope.scopes).processor + # arg_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # arg_proc = rand(arg_scope.scopes).processor + # arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + # @testset "scope" begin + # scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn scope=scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + + # @test get_compute_scope(task11) == scope_only + # @test get_result_scope(task11) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + + # @test execution_scope11 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "compute_scope" begin + # compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn compute_scope=compute_scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + # task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); fetch(task21) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + # @test get_result_scope(task11) == get_result_scope(task21) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # @test execution_scope11 == execution_scope21 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "result_scope" begin + # result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn result_scope=result_scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + + # @test get_compute_scope(task11) == Dagger.DefaultScope() + # @test get_result_scope(task11) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # @test execution_scope11 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "compute_scope and result_scope with intersection" begin + # if numscopes >= 3 + # n = cld(numscopes, 3) + + # shuffle!(availscopes) + # scope_a = availscopes[1:n] + # scope_b = availscopes[n+1:2n] + # scope_c = availscopes[2n+1:end] + + # compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + # scope_intersect = compute_scope_intersect + # scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + # result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + + # task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); wait(task11 ) + # task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); wait(task21 ) + # task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 31), chunk_proc, chunk_scope); wait(task31 ) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + # @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # execution_scope31 = get_execution_scope(task31) + # @test execution_scope11 == execution_scope21 == execution_scope31 == Dagger.ExactScope(chunk_proc) + # end + # end + + # @testset "compute_scope and result_scope without intersection" begin + # if numscopes >= 2 + # n = cld(numscopes, 2) + + # shuffle!(availscopes) + # scope_a = availscopes[1:n] + # scope_b = availscopes[n+1:end] + + # compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + # scope_no_intersect = Dagger.UnionScope(scope_a...) + # scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + # result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + # task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); wait(task11 ) + # task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); wait(task21 ) + # task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 31), chunk_proc, chunk_scope); wait(task31 ) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + # @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # execution_scope31 = get_execution_scope(task31) + # @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + # end + # end + + # end + +end \ No newline at end of file