diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..e69de29bb diff --git a/docs/make.jl b/docs/make.jl index fffad5017..1e05647d8 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -20,6 +20,7 @@ makedocs(; "Parallel Nested Loops" => "use-cases/parallel-nested-loops.md", ], "Task Spawning" => "task-spawning.md", + "Task Affinity" => "task-affinity.md", "Data Management" => "data-management.md", "Distributed Arrays" => "darray.md", "Streaming Tasks" => "streaming.md", diff --git a/docs/src/darray.md b/docs/src/darray.md index 715a6cbe8..4c1136f98 100644 --- a/docs/src/darray.md +++ b/docs/src/darray.md @@ -211,6 +211,270 @@ across the workers in the Julia cluster in a relatively even distribution; future operations on a `DArray` may produce a different distribution from the one chosen by previous calls. + + +### Explicit Processor Mapping of DArray Blocks + +This feature allows you to control how `DArray` blocks (chunks) are assigned to specific processors within the cluster. Controlling data locality is crucial for optimizing the performance of distributed algorithms. + +You can specify the mapping using the optional `assignment` argument in the `DArray` constructor functions (`DArray`, `DVector`, and `DMatrix`), the `distribute` function, and also directly within constructor-like functions such as `rand`, `randn`, `sprand`, `ones`, and `zeros` using the `assignment` optional keyword argument. + +The `assignment` argument accepts the following values: + +* `:arbitrary` **(Default)**: + + * If `assignment` is not provided or is set to symbol `:arbitrary`, Dagger's scheduler assigns blocks to processors automatically. This is the default behavior. + +* `:blockrow`: + + * Divides the matrix blocks row-wise (vertically in the terminal). Each processor gets a contiguous chunk of row blocks. + +* `:blockcol`: + + * Divides the matrix blocks column-wise (horizontally in the terminal). Each processor gets a contiguous chunk of column blocks. + +* `:cyclicrow`: + * Assigns row-blocks to processors in a round-robin fashion. Blocks are distributed one row-block at a time. Useful for parallel row-wise tasks. + +* `:cycliccol`: + * Assigns column-blocks to processors in a round-robin fashion. Blocks are distributed one column-block at a time. Useful for parallel column-wise tasks. + +* Any other symbol used for `assignment` results in an error. + +* `AbstractArray{<:Int, N}`: + + * Provide an integer **N**-dimensional array of worker IDs. The dimension **N** must match the number of dimensions of the `DArray`. + * Dagger maps blocks to worker IDs in a block-cyclic manner according to this processor-array. The block at index `(i,j,...)` is assigned to the first thread of the processor with ID `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions. + +* `AbstractArray{<:Processor, N}`: + + * Provide an **N**-dimensional array of `Processor` objects. The dimension **N** must match the number of dimensions of the `DArray` blocks. + * Blocks are mapped in a block-cyclic manner according to the `Processor` objects in the assignment array. The block at index `(i,j,...)` is assigned to the processor at `assignment[mod1(i, size(assignment,1)), mod1(j, size(assignment,2)), ...]`. This pattern repeats block-cyclically across all dimensions. + +#### Examples and Usage + +The `assignment` argument works similarly for `DArray`, `DVector`, and `DMatrix`, as well as the `distribute` function. The key difference lies in the dimensionality of the resulting distributed array. For functions like `rand`, `randn`, `sprand`, `ones`, and `zeros`, `assignment` is an keyword argument. + +* `DArray`: For N-dimensional distributed arrays. + +* `DVector`: Specifically for 1-dimensional distributed arrays. + +* `DMatrix`: Specifically for 2-dimensional distributed arrays. + +* `distribute`: General function to distribute arrays. + +* `rand`, `randn`, `sprand`, `ones`, `zeros`: Functions to create DArrays with initial values, also supporting `assignment`. + +Here are some examples using a setup with one master processor and three worker processors. + +First, let's create some sample arrays for `distribute` (and constructor functions): + +```julia +A = rand(7, 11) # 2D array +v = ones(15) # 1D array +M = zeros(5, 5, 5) # 3D array +``` + +1. **Arbitrary Assignment:** + + ```julia + Ad = distribute(A, Blocks(2, 2), :arbitrary) + # DMatrix(A, Blocks(2, 2), :arbitrary) + + vd = distribute(v, Blocks(3), :arbitrary) + # DVector(v, Blocks(3), :arbitrary) + + Md = distribute(M, Blocks(2, 2, 2), :arbitrary) + # DArray(M, Blocks(2,2,2), :arbitrary) + + Rd = rand(Blocks(2, 2), 7, 11; assignment=:arbitrary) + # distribute(rand(7, 11), Blocks(2, 2), :arbitrary) + ``` + + This creates distributed arrays with the specified block sizes, and assigns the blocks to processors arbitrarily. For example, the assignment for `Ad` might look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ThreadProc(2, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(2, 1) ThreadProc(3, 1) + ``` + +2. **Structured Assignments:** + + * **`:blockrow` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(1, 2), :blockrow) + # DMatrix(A, Blocks(1, 2), :blockrow) + + vd = distribute(v, Blocks(3), :blockrow) + # DVector(v, Blocks(3), :blockrow) + + Md = distribute(M, Blocks(2, 2, 2), :blockrow) + # DArray(M, Blocks(2,2,2), :blockrow) + + Od = ones(Blocks(1, 2), 7, 11; assignment=:blockrow) + # distribute(ones(7, 11), Blocks(1, 2), :blockrow) + ``` + + This creates distributed arrays with the specified block sizes, and assigns contiguous row-blocks to processors evenly. For example, the assignment for `Ad` (and `Od`) will look like this: + + ```julia + 7×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ``` + + * **`:blockcol` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(2, 2), :blockcol) + # DMatrix(A, Blocks(2, 2), :blockcol) + + vd = distribute(v, Blocks(3), :blockcol) + # DVector(v, Blocks(3), :blockcol) + + Md = distribute(M, Blocks(2, 2, 2), :blockcol) + # DArray(M, Blocks(2,2,2), :blockcol) + + Rd = randn(Blocks(2, 2), 7, 11; assignment=:blockcol) + # distribute(randn(7, 11), Blocks(2, 2), :blockcol) + ``` + + This creates distributed arrays with the specified block sizes, and assigns contiguous column-blocks to processors evenly. For example, the assignment for `Ad` (and `Rd`) will look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) + ``` + +* **`:cyclicrow` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(1, 2), :cyclicrow) + # DMatrix(A, Blocks(1, 2), :cyclicrow) + + vd = distribute(v, Blocks(3), :cyclicrow) + # DVector(v, Blocks(3), :cyclicrow) + + Md = distribute(M, Blocks(2, 2, 2), :cyclicrow) + # DArray(M, Blocks(2,2,2), :cyclicrow) + + Zd = zeros(Blocks(1, 2), 7, 11; assignment=:cyclicrow) + # distribute(zeros(7, 11), Blocks(1, 2), :cyclicrow) + ``` + + This creates distributed arrays with the specified block sizes, and assigns row-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Zd`) will look like this: + + ```julia + 7×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) ThreadProc(4, 1) + ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) ThreadProc(1, 1) + ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) ThreadProc(2, 1) + ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) ThreadProc(3, 1) + ``` + +* **`:cycliccol` Assignment:**  + + ```julia + Ad = distribute(A, Blocks(2, 2), :cycliccol) + # DMatrix(A, Blocks(2, 2), :cycliccol) + + vd = distribute(v, Blocks(3), :cycliccol) + # DVector(v, Blocks(3), :cycliccol) + + Md = distribute(M, Blocks(2, 2, 2), :cycliccol) + # DArray(M, Blocks(2,2,2), :cycliccol) + + Od = ones(Blocks(2, 2), 7, 11; assignment=:cycliccol) + # distribute(ones(7, 11), Blocks(2, 2), :cycliccol) + ``` + + This creates distributed arrays with the specified block sizes, and assigns column-blocks to processors in round-robin fashion. For example, the assignment for `Ad` (and `Od`) will look like this: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(1, 1) ThreadProc(2, 1) + ``` + +3. **Block-Cyclic Assignment with Integer Array:** + + ```julia + assignment_2d = [2 1; 4 3] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), [3 1; 4 2]) + + assignment_1d = [2,3,1,4] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), [2,3,1,4]) + + assignment_3d = cat([1 2; 3 4], [4 3; 2 1], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), cat([1 2; 3 4], [4 3; 2 1], dims=3)) + + Rd = sprand(Blocks(2, 2), 7, 11, 0.2; assignment=assignment_2d) + # distribute(sprand(7,11, 0.2), Blocks(2, 2), assignment_2d) + ``` + + The assignment is a integer matrix of `Processor` ID’s, the blocks are assigned in block-cyclic manner to first thread `Processor` ID’s. The assignment for `Ad` (and `Rd`) would be + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) ThreadProc(2, 1) ThreadProc(1, 1) + ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) ThreadProc(4, 1) ThreadProc(3, 1) + ``` + +4. **Block-Cyclic Assignment with Processor Array:** + + ```julia + assignment_2d = [Dagger.ThreadProc(3, 2) Dagger.ThreadProc(1, 1); + Dagger.ThreadProc(4, 3) Dagger.ThreadProc(2, 2)] + Ad = distribute(A, Blocks(2, 2), assignment_2d) + # DMatrix(A, Blocks(2, 2), assignment_2d) + + assignment_1d = [Dagger.ThreadProc(2,1), Dagger.ThreadProc(3,1), Dagger.ThreadProc(1,1), Dagger.ThreadProc(4,1)] + vd = distribute(v, Blocks(3), assignment_1d) + # DVector(v, Blocks(3), assignment_1d) + + assignment_3d = cat([Dagger.ThreadProc(1,1) Dagger.ThreadProc(2,1); Dagger.ThreadProc(3,1) Dagger.ThreadProc(4,1)], + [Dagger.ThreadProc(4,1) Dagger.ThreadProc(3,1); Dagger.ThreadProc(2,1) Dagger.ThreadProc(1,1)], dims=3) + Md = distribute(M, Blocks(2, 2, 2), assignment_3d) + # DArray(M, Blocks(2, 2, 2), assignment_3d) + + Rd = rand(Blocks(2, 2), 7, 11; assignment=assignment_2d)) + # distribute(rand(7,11), Blocks(2, 2), assignment_2d) + ``` + + The assignment is a matrix of `Processor` objects, the blocks are assigned in block-cyclic manner to `Processor` objects. The assignment for `Ad` (and `Rd`) would be: + + ```julia + 4×6 Matrix{Dagger.ThreadProc}: + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) ThreadProc(3, 2) ThreadProc(1, 1) + ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) ThreadProc(4, 3) ThreadProc(2, 2) + ``` + + + ## Broadcasting As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's @@ -446,4 +710,4 @@ From `LinearAlgebra`: - `*` (Out-of-place Matrix-(Matrix/Vector) multiply) - `mul!` (In-place Matrix-Matrix multiply) - `cholesky`/`cholesky!` (In-place/Out-of-place Cholesky factorization) -- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only)) +- `lu`/`lu!` (In-place/Out-of-place LU factorization (`NoPivot` only)) \ No newline at end of file diff --git a/docs/src/task-affinity.md b/docs/src/task-affinity.md new file mode 100644 index 000000000..53303923e --- /dev/null +++ b/docs/src/task-affinity.md @@ -0,0 +1,131 @@ +# Task Affinity + +Dagger's allows for precise control over task placement and result availability using scopes. Tasks are assigned based on the combination of multiple scopes: `scope`/`compute_scope`, and `result_scope` (which can all be specified with `@spawn`), and additionally the scopes of any arguments to the task (in the form of a scope attached to a `Chunk` argument). Let's take a look at how to configure these scopes, and how they work together to direct task placement. + +For more information on how scopes work, see [Scopes](@ref). + +--- + +## Task Scopes + +### Scope + +`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not specified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y) +``` +Task `g` executes only on worker 3. Its result can be accessed by any worker. + +--- + +### Compute Scope + +Like `scope`, `compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, then they default to `DefaultScope()`. + +**Example:** +```julia +g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +``` +Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker. + +--- + +### Result Scope + +The result_scope limits the processors from which a task's result can be accessed. This can be useful for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any processor (including those not default enabled for task execution, such as GPUs). + +**Example:** +```julia +g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1, 3, 4]) f(x,y) +``` +The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3. + +--- + +## Interaction of `compute_scope` and `result_scope` + +When `scope`/`compute_scope` and `result_scope` are specified, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty, then the scheduler throws a `Dagger.Sch.SchedulerException` error. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y) +``` +The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), but accessng its result is restricted to thread 2 of worker 2 and thread 2 of worker 4. + +--- + +## Function as a Chunk + +This section explains how `scope`/`compute_scope` and `result_scope` affect tasks when a `Chunk` is used to specify the function to be executed by `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task). This may seem strange (to use a `Chunk` to specify the function to be executed), but it can be useful with working with callable structs, such as closures or Flux.jl models. + +Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, and `chunk_scope` is its defined affinity. + +When `Dagger.tochunk(...)` is used to pass a `Chunk` as the function to be executed by `@spawn`: +- The result is accessible only on processors in `chunk_scope`. +- Dagger validates that there is an intersection between `chunk_scope`, the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`), and the `result_scope`. If no intersection exists, the scheduler throws an exception. + +!!! info While `chunk_proc` is currently required when constructing a chunk, it is only used to pick the most optimal processor for accessing the chunk; it does not affect which set of processors the task may execute on. + +**Usage:** +```julia +chunk_scope = Dagger.scope(worker=3) +chunk_proc = Dagger.OSProc(3) # not important, just needs to be a valid processor +g(x, y) = x * 2 + y * 3 +g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) +h1 = Dagger.@spawn scope=Dagger.scope(worker=3) g_chunk(10, 11) +h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(20, 21) +h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(30, 31) +h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) g_chunk(40, 41) +h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=3) result_scope=Dagger.scope(worker=3,threads=[2,3]) g_chunk(50, 51) +``` +In all these cases (`h1` through `h5`), the tasks get executed on any processor within `chunk_scope` and its result is accessible only within `chunk_scope`. + +--- + +## Chunk arguments + +This section details behavior when some or all of a task's arguments are `Chunk`s. + +Assume `g(x, y) = x * 2 + y * 3`, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)`, where `arg_scope` is the argument's defined scope. Assume `arg_scope = Dagger.scope(worker=2)`. + +### Scope +If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=2) g(arg, 11) +``` +Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any processor. + +--- + +### Compute scope and Chunk argument scopes interaction +If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. + +```julia +h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 11) +h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 21) +``` +Tasks `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is accessible from any processor. + +--- + +### Result scope and Chunk argument scopes interaction +If only `result_scope` is specified, computation happens on any processor within the intersection of `arg_scope` and `result_scope`, and the result is only accessible within `result_scope`. + +```julia +h = Dagger.@spawn result_scope=Dagger.scope(worker=2) g(arg, 11) +``` +Task `h` executes on any processor within the intersection of `arg_scope` and `result_scope`. The result is accessible from only within `result_scope`. + +--- + +### Compute, result, and chunk argument scopes interaction +When `scope`/`compute_scope`, `result_scope`, and `Chunk` argument scopes are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31) +``` +Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg_scope`, `compute_scope`, and `result_scope`), and its result access is restricted to thread 2 of worker 2 or thread 2 of worker 4. diff --git a/src/array/alloc.jl b/src/array/alloc.jl index f67c927de..5b3a2fe3b 100644 --- a/src/array/alloc.jl +++ b/src/array/alloc.jl @@ -8,7 +8,49 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N} want_index::Bool domain::ArrayDomain{N} domainchunks - partitioning::AbstractBlocks + partitioning::AbstractBlocks{N} + procgrid::Union{AbstractArray{<:Processor, N}, Nothing} + + function AllocateArray(eltype::Type{T}, f, want_index::Bool, d::ArrayDomain{N}, domainchunks, p::AbstractBlocks{N}, assignment::Union{AssignmentType{N},Nothing} = nothing) where {T,N} + sizeA = map(length, d.indexes) + procgrid = nothing + availprocs = collect(Dagger.all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + if assignment isa Symbol + if assignment == :arbitrary + procgrid = nothing + elseif assignment == :blockrow + q = ntuple(i -> i == 1 ? Int(ceil(sizeA[1] / p.blocksize[1])) : 1, N) + rows_per_proc, extra = divrem(Int(ceil(sizeA[1] / p.blocksize[1])), num_processors()) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), q) + elseif assignment == :blockcol + q = ntuple(i -> i == N ? Int(ceil(sizeA[N] / p.blocksize[N])) : 1, N) + cols_per_proc, extra = divrem(Int(ceil(sizeA[N] / p.blocksize[N])), num_processors()) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), q) + elseif assignment == :cyclicrow + q = ntuple(i -> i == 1 ? num_processors() : 1, N) + procgrid = reshape(availprocs, q) + elseif assignment == :cycliccol + q = ntuple(i -> i == N ? num_processors() : 1, N) + procgrid = reshape(availprocs, q) + else + error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol") + end + elseif assignment isa AbstractArray{<:Int, N} + missingprocs = filter(q -> q ∉ procs(), assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment] + elseif assignment isa AbstractArray{<:Processor, N} + missingprocs = filter(q -> q ∉ availprocs, assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = assignment + end + + return new{T,N}(eltype, f, want_index, d, domainchunks, p, procgrid) + end + end size(a::AllocateArray) = size(a.domain) @@ -34,79 +76,86 @@ function allocate_array(f, T, sz) end allocate_array_func(::Processor, f) = f function stage(ctx, a::AllocateArray) - if a.want_index - thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, i, size(x)) for (i, x) in enumerate(a.domainchunks)] - else - thunks = [Dagger.@spawn allocate_array(a.f, a.eltype, size(x)) for (i, x) in enumerate(a.domainchunks)] + chunks = map(CartesianIndices(a.domainchunks)) do I + x = a.domainchunks[I] + i = LinearIndices(a.domainchunks)[I] + args = a.want_index ? (i, size(x)) : (size(x),) + scope = isnothing(a.procgrid) ? nothing : ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)]) + + if isnothing(scope) + Dagger.@spawn allocate_array(a.f, a.eltype, args...) + else + Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...) + end end - return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning) + return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning) end const BlocksOrAuto = Union{Blocks{N} where N, AutoBlocks} -function Base.rand(p::Blocks, eltype::Type, dims::Dims) +function Base.rand(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, rand, false, d, partition(p, d), p) + a = AllocateArray(eltype, rand, false, d, partition(p, d), p, assignment) return _to_darray(a) end -Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...) = rand(p, T, dims) -Base.rand(p::BlocksOrAuto, T::Type, dims::Dims) = rand(p, T, dims) -Base.rand(p::BlocksOrAuto, dims::Integer...) = rand(p, Float64, dims) -Base.rand(p::BlocksOrAuto, dims::Dims) = rand(p, Float64, dims) -Base.rand(::AutoBlocks, eltype::Type, dims::Dims) = - rand(auto_blocks(dims), eltype, dims) - -function Base.randn(p::Blocks, eltype::Type, dims::Dims) +Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment) +Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment=assignment) +Base.rand(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + rand(auto_blocks(dims), eltype, dims; assignment=assignment) + +function Base.randn(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, randn, false, d, partition(p, d), p) + a = AllocateArray(eltype, randn, false, d, partition(p, d), p, assignment) return _to_darray(a) end -Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...) = randn(p, T, dims) -Base.randn(p::BlocksOrAuto, T::Type, dims::Dims) = randn(p, T, dims) -Base.randn(p::BlocksOrAuto, dims::Integer...) = randn(p, Float64, dims) -Base.randn(p::BlocksOrAuto, dims::Dims) = randn(p, Float64, dims) -Base.randn(::AutoBlocks, eltype::Type, dims::Dims) = - randn(auto_blocks(dims), eltype, dims) - -function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) +Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment) +Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment=assignment) +Base.randn(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + randn(auto_blocks(dims), eltype, dims; assignment=assignment) + +function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p) + a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p, assignment) return _to_darray(a) end -sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...) = - sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]) -sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat) = - sprand(p, T, dims, sparsity) -sprand(p::BlocksOrAuto, dims_and_sparsity::Real...) = - sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]) -sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat) = - sprand(p, Float64, dims, sparsity) -sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) = - sprand(auto_blocks(dims), eltype, dims, sparsity) - -function Base.ones(p::Blocks, eltype::Type, dims::Dims) +sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) = + sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment) +# sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = +# sprand(p, T, dims, sparsity; assignment=assignment) +sprand(p::BlocksOrAuto, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) = + sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment=assignment) +sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = + sprand(p, Float64, dims, sparsity; assignment=assignment) +sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) = + sprand(auto_blocks(dims), eltype, dims, sparsity; assignment=assignment) + +function Base.ones(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, ones, false, d, partition(p, d), p) + a = AllocateArray(eltype, ones, false, d, partition(p, d), p, assignment) return _to_darray(a) end -Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...) = ones(p, T, dims) -Base.ones(p::BlocksOrAuto, T::Type, dims::Dims) = ones(p, T, dims) -Base.ones(p::BlocksOrAuto, dims::Integer...) = ones(p, Float64, dims) -Base.ones(p::BlocksOrAuto, dims::Dims) = ones(p, Float64, dims) -Base.ones(::AutoBlocks, eltype::Type, dims::Dims) = - ones(auto_blocks(dims), eltype, dims) - -function Base.zeros(p::Blocks, eltype::Type, dims::Dims) +Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment) +Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment=assignment) +Base.ones(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + ones(auto_blocks(dims), eltype, dims; assignment=assignment) + +function Base.zeros(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) d = ArrayDomain(map(x->1:x, dims)) - a = AllocateArray(eltype, zeros, false, d, partition(p, d), p) + a = AllocateArray(eltype, zeros, false, d, partition(p, d), p, assignment) return _to_darray(a) end -Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...) = zeros(p, T, dims) -Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims) = zeros(p, T, dims) -Base.zeros(p::BlocksOrAuto, dims::Integer...) = zeros(p, Float64, dims) -Base.zeros(p::BlocksOrAuto, dims::Dims) = zeros(p, Float64, dims) -Base.zeros(::AutoBlocks, eltype::Type, dims::Dims) = - zeros(auto_blocks(dims), eltype, dims) +Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment) +Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment=assignment) +Base.zeros(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) = + zeros(auto_blocks(dims), eltype, dims; assignment=assignment) function Base.zero(x::DArray{T,N}) where {T,N} dims = ntuple(i->x.domain.indexes[i].stop, N) diff --git a/src/array/darray.jl b/src/array/darray.jl index 37c61a936..7292d29a8 100644 --- a/src/array/darray.jl +++ b/src/array/darray.jl @@ -419,6 +419,7 @@ struct Distribute{T,N,B<:AbstractBlocks} <: ArrayOp{T, N} domainchunks partitioning::B data::AbstractArray{T,N} + procgrid::Union{AbstractArray{<:Processor, N}, Nothing} end size(x::Distribute) = size(domain(x.data)) @@ -426,19 +427,18 @@ size(x::Distribute) = size(domain(x.data)) Base.@deprecate BlockPartition Blocks -Distribute(p::Blocks, data::AbstractArray) = - Distribute(partition(p, domain(data)), p, data) +Distribute(p::Blocks, data::AbstractArray, procgrid::Union{AbstractArray{<:Processor},Nothing} = nothing) = + Distribute(partition(p, domain(data)), p, data, procgrid) -function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}) where {T,N} +function Distribute(domainchunks::DomainBlocks{N}, data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} p = Blocks(ntuple(i->first(domainchunks.cumlength[i]), N)) - Distribute(domainchunks, p, data) + Distribute(domainchunks, p, data, procgrid) end -function Distribute(data::AbstractArray{T,N}) where {T,N} - nprocs = sum(w->length(Dagger.get_processors(OSProc(w))), - procs()) +function Distribute(data::AbstractArray{T,N}, procgrid::Union{AbstractArray{<:Processor, N},Nothing} = nothing) where {T,N} + nprocs = sum(w->length(get_processors(OSProc(w))),procs()) p = Blocks(ntuple(i->max(cld(size(data, i), nprocs), 1), N)) - return Distribute(partition(p, domain(data)), p, data) + return Distribute(partition(p, domain(data)), p, data, procgrid) end function stage(ctx::Context, d::Distribute) @@ -451,7 +451,8 @@ function stage(ctx::Context, d::Distribute) Nd = ndims(x) T = eltype(d.data) concat = x.concat - cs = map(d.domainchunks) do idx + cs = map(CartesianIndices(d.domainchunks)) do I + idx = d.domainchunks[I] chunks = stage(ctx, x[idx]).chunks shape = size(chunks) # TODO: fix hashing @@ -466,12 +467,19 @@ function stage(ctx::Context, d::Distribute) end end else - cs = map(d.domainchunks) do c + cs = map(CartesianIndices(d.domainchunks)) do I # TODO: fix hashing #hash = uhash(c, Base.hash(Distribute, Base.hash(d.data))) - Dagger.@spawn identity(d.data[c]) + c = d.domainchunks[I] + if isnothing(d.procgrid) + Dagger.@spawn identity(d.data[c]) + else + scope = ExactScope(d.procgrid[CartesianIndex(mod1.(Tuple(I), size(d.procgrid))...)]) + Dagger.@spawn compute_scope=scope identity(d.data[c]) + end end end + return DArray(eltype(d.data), domain(d.data), d.domainchunks, @@ -494,29 +502,66 @@ function auto_blocks(dims::Dims{N}) where N end auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A)) -distribute(A::AbstractArray) = distribute(A, AutoBlocks()) -distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} = - _to_darray(Distribute(dist, A)) -distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A)) -function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N} +const AssignmentType{N} = Union{Symbol, AbstractArray{<:Int, N}, AbstractArray{<:Processor, N}} + +distribute(A::AbstractArray, assignment::AssignmentType = :arbitrary) = distribute(A, AutoBlocks(), assignment) +function distribute(A::AbstractArray{T,N}, dist::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} + procgrid = nothing + availprocs = collect(Dagger.all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + if assignment isa Symbol + if assignment == :arbitrary + procgrid = nothing + elseif assignment == :blockrow + p = ntuple(i -> i == 1 ? Int(ceil(size(A,1) / dist.blocksize[1])) : 1, N) + rows_per_proc, extra = divrem(Int(ceil(size(A,1) / dist.blocksize[1])), num_processors()) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :blockcol + p = ntuple(i -> i == N ? Int(ceil(size(A,N) / dist.blocksize[N])) : 1, N) + cols_per_proc, extra = divrem(Int(ceil(size(A,N) / dist.blocksize[N])), num_processors()) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:num_processors()] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + elseif assignment == :cyclicrow + p = ntuple(i -> i == 1 ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + elseif assignment == :cycliccol + p = ntuple(i -> i == N ? num_processors() : 1, N) + procgrid = reshape(availprocs, p) + else + error("Unsupported assignment symbol: $assignment, use :arbitrary, :blockrow, :blockcol, :cyclicrow or :cycliccol") + end + elseif assignment isa AbstractArray{<:Int, N} + missingprocs = filter(p -> p ∉ procs(), assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = [Dagger.ThreadProc(proc, 1) for proc in assignment] + elseif assignment isa AbstractArray{<:Processor, N} + missingprocs = filter(p -> p ∉ availprocs, assignment) + isempty(missingprocs) || error("Missing processors: $missingprocs") + procgrid = assignment + end + + return _to_darray(Distribute(dist, A, procgrid)) +end + +distribute(A::AbstractArray, ::AutoBlocks, assignment::AssignmentType = :arbitrary) = distribute(A, auto_blocks(A), assignment) +function distribute(x::AbstractArray{T,N}, n::NTuple{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} p = map((d, dn)->ceil(Int, d / dn), size(x), n) - distribute(x, Blocks(p)) + distribute(x, Blocks(p), assignment) end -distribute(x::AbstractVector, n::Int) = distribute(x, (n,)) -distribute(x::AbstractVector, n::Vector{<:Integer}) = - distribute(x, DomainBlocks((1,), (cumsum(n),))) +distribute(x::AbstractVector, n::Int, assignment::AssignmentType{1} = :arbitrary) = distribute(x, (n,), assignment) -DVector(A::AbstractVector{T}, part::Blocks{1}) where T = distribute(A, part) -DMatrix(A::AbstractMatrix{T}, part::Blocks{2}) where T = distribute(A, part) -DArray(A::AbstractArray{T,N}, part::Blocks{N}) where {T,N} = distribute(A, part) +DVector(A::AbstractVector{T}, part::Blocks{1}, assignment::AssignmentType{1} = :arbitrary) where T = distribute(A, part, assignment) +DMatrix(A::AbstractMatrix{T}, part::Blocks{2}, assignment::AssignmentType{2} = :arbitrary) where T = distribute(A, part, assignment) +DArray(A::AbstractArray{T,N}, part::Blocks{N}, assignment::AssignmentType{N} = :arbitrary) where {T,N} = distribute(A, part, assignment) -DVector(A::AbstractVector{T}) where T = DVector(A, AutoBlocks()) -DMatrix(A::AbstractMatrix{T}) where T = DMatrix(A, AutoBlocks()) -DArray(A::AbstractArray) = DArray(A, AutoBlocks()) +DVector(A::AbstractVector{T}, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, AutoBlocks(), assignment) +DMatrix(A::AbstractMatrix{T}, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, AutoBlocks(), assignment) +DArray(A::AbstractArray, assignment::AssignmentType = :arbitrary) = DArray(A, AutoBlocks(), assignment) -DVector(A::AbstractVector{T}, ::AutoBlocks) where T = DVector(A, auto_blocks(A)) -DMatrix(A::AbstractMatrix{T}, ::AutoBlocks) where T = DMatrix(A, auto_blocks(A)) -DArray(A::AbstractArray, ::AutoBlocks) = DArray(A, auto_blocks(A)) +DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::AssignmentType{1} = :arbitrary) where T = DVector(A, auto_blocks(A), assignment) +DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment) +DArray(A::AbstractArray, ::AutoBlocks, assignment::AssignmentType = :arbitrary) = DArray(A, auto_blocks(A), assignment) function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N} collect(x) == y diff --git a/src/memory-spaces.jl b/src/memory-spaces.jl index 0ef0d1200..f34fd454a 100644 --- a/src/memory-spaces.jl +++ b/src/memory-spaces.jl @@ -386,4 +386,4 @@ function will_alias(x_span::MemorySpan, y_span::MemorySpan) x_end = x_span.ptr + x_span.len - 1 y_end = y_span.ptr + y_span.len - 1 return x_span.ptr <= y_end && y_span.ptr <= x_end -end +end \ No newline at end of file diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index b894f4526..0335fe3e0 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -14,7 +14,7 @@ import Random: randperm import Base: @invokelatest import ..Dagger -import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime import ..Dagger: @dagdebug, @safe_lock_spin1 import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek @@ -726,16 +726,25 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) sig = signature(state, task) # Calculate scope - scope = if task.f isa Chunk - task.f.scope - else - if task.options.proclist !== nothing - # proclist overrides scope selection - AnyScope() - else - DefaultScope() + scope = constrain(task.compute_scope, task.result_scope) + if scope isa InvalidScope + ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task + end + if task.f isa Chunk + scope = constrain(scope, task.f.scope) + if scope isa InvalidScope + ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task end end + for (_,input) in task.inputs input = unwrap_weak_checked(input) chunk = if istask(input) @@ -747,8 +756,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) end chunk isa Chunk || continue scope = constrain(scope, chunk.scope) - if scope isa Dagger.InvalidScope - ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)") + if scope isa InvalidScope + ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)") state.cache[task] = ex state.errored[task] = true set_failed!(state, task) @@ -1086,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options, propagated, ids, positions, (log_sink=ctx.log_sink, profile=ctx.profile), - sch_handle, state.uid]) + sch_handle, state.uid, thunk.result_scope]) end # N.B. We don't batch these because we might get a deserialization # error due to something not being defined on the worker, and then we don't @@ -1305,7 +1314,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re task = task_spec[] scope = task[5] if !isa(constrain(scope, Dagger.ExactScope(to_proc)), - Dagger.InvalidScope) && + InvalidScope) && typemax(UInt32) - proc_occupancy_cached >= occupancy # Compatible, steal this task return dequeue_pair!(queue) @@ -1488,7 +1497,7 @@ function do_task(to_proc, task_desc) scope, Tf, data, send_result, persist, cache, meta, options, propagated, ids, positions, - ctx_vars, sch_handle, sch_uid = task_desc + ctx_vars, sch_handle, sch_uid, result_scope = task_desc ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile) from_proc = OSProc() @@ -1696,7 +1705,7 @@ function do_task(to_proc, task_desc) # Construct result # TODO: We should cache this locally - send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache, + send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache, tag=options.storage_root_tag, leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()), retain=options.storage_retain) diff --git a/src/sch/util.jl b/src/sch/util.jl index 01138a052..dd148d336 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -42,9 +42,7 @@ function get_propagated_options(thunk) nt = NamedTuple() for key in thunk.propagates value = if key == :scope - isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope() - elseif key == :processor - isa(thunk.f, Chunk) ? thunk.f.processor : OSProc() + thunk.compute_scope elseif key in fieldnames(Thunk) getproperty(thunk, key) elseif key in fieldnames(ThunkOptions) @@ -340,7 +338,7 @@ function can_use_proc(state, task, gproc, proc, opts, scope) scope = constrain(scope, Dagger.ExactScope(proc)) elseif opts.proclist isa Vector if !(typeof(proc) in opts.proclist) - @dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist)" + @dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist) ($(opts.proclist))" return false, scope end scope = constrain(scope, diff --git a/src/scopes.jl b/src/scopes.jl index 0545c573e..ecb3e5e03 100644 --- a/src/scopes.jl +++ b/src/scopes.jl @@ -240,6 +240,17 @@ constrain(x::ProcessScope, y::ExactScope) = constrain(x::NodeScope, y::ExactScope) = x == y.parent.parent ? y : InvalidScope(x, y) + +function constrain(scope1, scope2, scopes...) + scope1 = constrain(scope1, scope2) + scope1 isa InvalidScope && return scope1 + for s in scopes + scope1 = constrain(scope1, s) + scope1 isa InvalidScope && return scope1 + end + return scope1 +end + ### Scopes helper """ @@ -412,3 +423,26 @@ to_scope(::Val{key}, sc::NamedTuple) where key = # Base case for all Dagger-owned keys scope_key_precedence(::Val) = 0 + +### Scope comparison helpers + +function Base.issetequal(scopes::AbstractScope...) + scope1 = scopes[1] + scope1_procs = Dagger.compatible_processors(scope1) + for scope2 in scopes[2:end] + scope2_procs = Dagger.compatible_processors(scope2) + if !issetequal(scope1_procs, scope2_procs) + return false + end + end + return true +end + +function Base.issubset(scope1::AbstractScope, scope2::AbstractScope) + scope1_procs = compatible_processors(scope1) + scope2_procs = compatible_processors(scope2) + for proc in scope1_procs + proc in scope2_procs || return false + end + return true +end \ No newline at end of file diff --git a/src/stream.jl b/src/stream.jl index 07a3dae95..885a9e931 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -289,6 +289,20 @@ struct StreamingFunction{F, S} new{F, S}(f, stream, max_evals) end +struct DestPostMigration + thunk_id::Int + cancel_token::CancelToken + f + DestPostMigration(thunk_id, tls, f) = new(thunk_id, tls.cancel_token, f) +end +function (dpm::DestPostMigration)(store, unsent) + STREAM_THUNK_ID[] = dpm.thunk_id + @assert !in_task() + tls = DTaskTLS(OSProc(), typemax(UInt64), nothing, [], dpm.cancel_token) + set_tls!(tls) + return dpm.f(store, unsent) +end + function migrate_stream!(stream::Stream, w::Integer=myid()) # Perform migration of the StreamStore # MemPool will block access to the new ref until the migration completes @@ -318,11 +332,8 @@ function migrate_stream!(stream::Stream, w::Integer=myid()) empty!(store.output_buffers) return (unsent_inputs, unsent_outputs) end, - dest_post_migration=(store, unsent)->begin + dest_post_migration=DestPostMigration(thunk_id, tls, (store, unsent)->begin # Initialize the StreamStore on the destination with the unsent inputs/outputs. - STREAM_THUNK_ID[] = thunk_id - @assert !in_task() - set_tls!(tls) #get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true) unsent_inputs, unsent_outputs = unsent for (input_uid, inputs) in unsent_inputs @@ -342,7 +353,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid()) # Reset the state of this new store store.open = true store.migrating = false - end, + end), post_migration=store->begin # Indicate that this store has migrated store.migrating = true diff --git a/src/thunk.jl b/src/thunk.jl index b24806f85..659ee9a2c 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -73,6 +73,8 @@ mutable struct Thunk eager_ref::Union{DRef,Nothing} options::Any # stores scheduler-specific options propagates::Tuple # which options we'll propagate + compute_scope::AbstractScope + result_scope::AbstractScope function Thunk(f, xs...; syncdeps=nothing, id::Int=next_id(), @@ -84,16 +86,14 @@ mutable struct Thunk affinity=nothing, eager_ref=nothing, processor=nothing, - scope=nothing, + scope=DefaultScope(), + compute_scope=scope, + result_scope=AnyScope(), options=nothing, propagates=(), kwargs... ) - if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope)) - f = tochunk(f, - something(processor, OSProc()), - something(scope, DefaultScope())) - end + xs = Base.mapany(identity, xs) syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs))) if syncdeps !== nothing @@ -105,11 +105,11 @@ mutable struct Thunk if options !== nothing @assert isempty(kwargs) new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, - cache_ref, affinity, eager_ref, options, propagates) + cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope) else new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...), - propagates) + propagates, compute_scope, result_scope) end end end @@ -476,15 +476,6 @@ function spawn(f, args...; kwargs...) args = args[2:end] end - # Wrap f in a Chunk if necessary - processor = haskey(options, :processor) ? options.processor : nothing - scope = haskey(options, :scope) ? options.scope : nothing - if !isnothing(processor) || !isnothing(scope) - f = tochunk(f, - something(processor, get_options(:processor, OSProc())), - something(scope, get_options(:scope, DefaultScope()))) - end - # Process the args and kwargs into Pair form args_kwargs = args_kwargs_to_pairs(args, kwargs) diff --git a/test/array/allocation.jl b/test/array/allocation.jl index a95ef2efc..f7bb86323 100644 --- a/test/array/allocation.jl +++ b/test/array/allocation.jl @@ -120,7 +120,7 @@ end @testset "AutoBlocks" begin function test_auto_blocks(DA, dims) - np = Dagger.num_processors() + np = num_processors() part = DA.partitioning @test part isa Blocks part_size = part.blocksize @@ -201,6 +201,494 @@ end end end +@testset "Constructor with assignment" begin + + availprocs = collect(all_processors()) + sort!(availprocs, by = x -> (x.owner, x.tid)) + numprocs = length(availprocs) + + + function chunk_processors(Ad::DArray) + [processor(Ad.chunks[idx].future.future.v.value[2]) for idx in CartesianIndices(size(domainchunks(Ad)))] + end + + function tile_processors(proc_grid::AbstractArray{<:Processor,N}, block_grid::Tuple{Vararg{Int,N}}) where N + reps = Int.(ceil.(block_grid ./ size(proc_grid))) + tiled = repeat(proc_grid, reps...) + idx_slices = [1:block_grid[d] for d in 1:length(block_grid)] + return tiled[idx_slices...] + end + + + A = rand(41, 35, 12) + v = rand(23) + M = rand(76,118) + + t_blocks_a = (4,3,2) + d_blocks_a = Blocks(t_blocks_a) + blocks_a = cld.(size(A), t_blocks_a) + + n_blocks_v = 3 + t_blocks_v = (n_blocks_v,) + d_blocks_v = Blocks(t_blocks_v) + blocks_v = cld.(size(v), t_blocks_v) + blocks_nv = blocks_v[1] + + t_blocks_m = (2,3) + d_blocks_m = Blocks(t_blocks_m) + blocks_m = cld.(size(M), t_blocks_m) + + function get_default_blockgrid(data, numprocs) + ndims_data = ndims(data) + size_data = size(data) + ntuple(i->i == ndims_data ? cld( size_data[ndims_data], cld(size_data[ndims_data], numprocs) ) : 1, ndims_data) + end + + + @testset "Arbitrary Assignment (:arbitrary)" begin + assignment = :arbitrary + + @testset "Auto Blocks" begin + + @test distribute(A, assignment) isa DArray && distribute(A, AutoBlocks(), assignment) isa DArray + @test distribute(v, assignment) isa DVector && distribute(v, AutoBlocks(), assignment) isa DVector + @test distribute(M, assignment) isa DMatrix && distribute(M, AutoBlocks(), assignment) isa DMatrix + + @test DArray( A, assignment) isa DArray && DArray( A, AutoBlocks(), assignment) isa DArray + @test DVector(v, assignment) isa DVector && DVector( v, AutoBlocks(), assignment) isa DVector + @test DMatrix(M, assignment) isa DMatrix && DMatrix( M, AutoBlocks(), assignment) isa DMatrix + + @test rand( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && rand( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test rand( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && rand( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test rand( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && rand( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test randn( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && randn( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test randn( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && randn( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test randn( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && randn( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test sprand(AutoBlocks(), size(v)..., 0.5; assignment=assignment) isa DVector && sprand(AutoBlocks(), size(v), 0.5; assignment=assignment) isa DVector + @test sprand(AutoBlocks(), size(M)..., 0.5; assignment=assignment) isa DMatrix && sprand(AutoBlocks(), size(M), 0.5; assignment=assignment) isa DMatrix + + @test ones( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && ones( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test ones( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && ones( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test ones( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && ones( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + @test zeros( AutoBlocks(), size(A)... ; assignment=assignment) isa DArray && zeros( AutoBlocks(), size(A); assignment=assignment) isa DArray + @test zeros( AutoBlocks(), size(v)... ; assignment=assignment) isa DVector && zeros( AutoBlocks(), size(v); assignment=assignment) isa DVector + @test zeros( AutoBlocks(), size(M)... ; assignment=assignment) isa DMatrix && zeros( AutoBlocks(), size(M); assignment=assignment) isa DMatrix + + end + + @testset "Explicit Blocks" begin + + @test distribute(A, d_blocks_a, assignment) isa DArray && distribute(A, blocks_a, assignment) isa DArray + @test distribute(v, d_blocks_v, assignment) isa DVector && distribute(v, blocks_v, assignment) isa DVector + @test distribute(v, n_blocks_v, assignment) isa DVector + @test distribute(M, d_blocks_m, assignment) isa DMatrix && distribute(M, blocks_m, assignment) isa DMatrix + + @test DArray( A, d_blocks_a, assignment) isa DArray + @test DVector(v, d_blocks_v, assignment) isa DVector + @test DMatrix(M, d_blocks_m, assignment) isa DMatrix + + @test rand( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && rand( d_blocks_a, size(A); assignment=assignment) isa DArray + @test rand( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && rand( d_blocks_v, size(v); assignment=assignment) isa DVector + @test rand( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && rand( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test randn( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && randn( d_blocks_a, size(A); assignment=assignment) isa DArray + @test randn( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && randn( d_blocks_v, size(v); assignment=assignment) isa DVector + @test randn( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && randn( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test sprand(d_blocks_v, size(v)..., 0.5; assignment=assignment) isa DVector && sprand(d_blocks_v, size(v), 0.5; assignment=assignment) isa DVector + @test sprand(d_blocks_m, size(M)..., 0.5; assignment=assignment) isa DMatrix && sprand(d_blocks_m, size(M), 0.5; assignment=assignment) isa DMatrix + + @test ones( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && ones( d_blocks_a, size(A); assignment=assignment) isa DArray + @test ones( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && ones( d_blocks_v, size(v); assignment=assignment) isa DVector + @test ones( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && ones( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + @test zeros( d_blocks_a, size(A)... ; assignment=assignment) isa DArray && zeros( d_blocks_a, size(A); assignment=assignment) isa DArray + @test zeros( d_blocks_v, size(v)... ; assignment=assignment) isa DVector && zeros( d_blocks_v, size(v); assignment=assignment) isa DVector + @test zeros( d_blocks_m, size(M)... ; assignment=assignment) isa DMatrix && zeros( d_blocks_m, size(M); assignment=assignment) isa DMatrix + + end + + end + + + @testset "Structured Assignment (:blockrow, :blockcol, :cyclicrow, :cycliccol)" begin + + function get_default_blockgrid(data, numprocs) + ndims_data = ndims(data) + size_data = size(data) + ntuple(i->i == ndims_data ? cld( size_data[ndims_data], cld(size_data[ndims_data], numprocs) ) : 1, ndims_data) + end + + function get_blockrow_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == 1 ? blocksize[1] : 1, ndims_data) + rows_per_proc, extra = divrem(blocksize[1], numprocs) + counts = [rows_per_proc + (i <= extra ? 1 : 0) for i in 1:numprocs] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + return procgrid + end + + function get_blockcol_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == ndims_data ? blocksize[end] : 1, ndims_data) + cols_per_proc, extra = divrem(blocksize[end], numprocs) + counts = [cols_per_proc + (i <= extra ? 1 : 0) for i in 1:numprocs] + procgrid = reshape(vcat(fill.(availprocs, counts)...), p) + return procgrid + end + + function get_cyclicrow_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == 1 ? numprocs : 1, ndims_data) + procgrid = reshape(availprocs, p) + return procgrid + end + + function get_cycliccol_procgrid(data, numprocs, blocksize) + ndims_data = ndims(data) + p = ntuple(i -> i == ndims_data ? numprocs : 1, ndims_data) + procgrid = reshape(availprocs, p) + return procgrid + end + + function test_assignment_strategy(assignment::Symbol, get_assignment_procgrid) + + @testset "Block Row Assignment (:$assignment)" begin + + @testset "Auto Blocks" begin + + dist_A_def_auto = distribute(A, assignment); fetch(dist_A_def_auto) + dist_A_auto_def = distribute(A, AutoBlocks(), assignment); fetch(dist_A_auto_def) + dist_v_def_auto = distribute(v, assignment); fetch(dist_v_def_auto) + dist_v_auto_def = distribute(v, AutoBlocks(), assignment); fetch(dist_v_auto_def) + dist_M_def_auto = distribute(M, assignment); fetch(dist_M_def_auto) + dist_M_auto_def = distribute(M, AutoBlocks(), assignment); fetch(dist_M_auto_def) + + darr_A_def_auto = DArray( A, assignment); fetch(darr_A_def_auto) + darr_A_auto_def = DArray( A, AutoBlocks(), assignment); fetch(darr_A_auto_def) + dvec_v_def_auto = DVector( v, assignment); fetch(dvec_v_def_auto) + dvec_v_auto_def = DVector( v, AutoBlocks(), assignment); fetch(dvec_v_auto_def) + dmat_M_def_auto = DMatrix( M, assignment); fetch(dmat_M_def_auto) + dmat_M_auto_def = DMatrix( M, AutoBlocks(), assignment); fetch(dmat_M_auto_def) + + @test chunk_processors(dist_A_def_auto) == chunk_processors(dist_A_auto_def) == chunk_processors(darr_A_def_auto) == chunk_processors(darr_A_auto_def) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_def_auto) == chunk_processors(dist_v_auto_def) == chunk_processors(dvec_v_def_auto) == chunk_processors(dvec_v_auto_def) == tile_processors(get_assignment_procgrid(v, numprocs, get_default_blockgrid(v, numprocs)), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_def_auto) == chunk_processors(dist_M_auto_def) == chunk_processors(dmat_M_def_auto) == chunk_processors(dmat_M_auto_def) == tile_processors(get_assignment_procgrid(M, numprocs, get_default_blockgrid(M, numprocs)), get_default_blockgrid(M, numprocs)) + + end + + @testset "Functions with AutoBlocks" begin + + rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=assignment); fetch(rand_A_auto) + rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=assignment); fetch(rand_v_auto) + rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=assignment); fetch(rand_M_auto) + + randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=assignment); fetch(randn_A_auto) + randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=assignment); fetch(randn_v_auto) + randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=assignment); fetch(randn_M_auto) + + sprand_v_auto = sprand(AutoBlocks(), size(v)..., 0.5; assignment=assignment); fetch(sprand_v_auto) + sprand_M_auto = sprand(AutoBlocks(), size(M)..., 0.5; assignment=assignment); fetch(sprand_M_auto) + + ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=assignment); fetch(ones_A_auto) + ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=assignment); fetch(ones_v_auto) + ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=assignment); fetch(ones_M_auto) + + zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=assignment); fetch(zeros_A_auto) + zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=assignment); fetch(zeros_v_auto) + zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=assignment); fetch(zeros_M_auto) + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(get_assignment_procgrid(A, numprocs, get_default_blockgrid(A, numprocs)), get_default_blockgrid(A, numprocs)) + @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(get_assignment_procgrid(v, numprocs, get_default_blockgrid(v, numprocs)), get_default_blockgrid(v, numprocs)) + @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(get_assignment_procgrid(M, numprocs, get_default_blockgrid(M, numprocs)), get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_def = distribute(A, d_blocks_a, assignment); fetch(dist_A_exp_def) + dist_A_blocks_exp = distribute(A, blocks_a, assignment); fetch(dist_A_blocks_exp) + dist_v_exp_def = distribute(v, d_blocks_v, assignment); fetch(dist_v_exp_def) + dist_v_blocks_exp = distribute(v, blocks_v, assignment); fetch(dist_v_blocks_exp) + dist_v_nblocks_exp = distribute(v, blocks_nv, assignment); fetch(dist_v_nblocks_exp) + dist_M_exp_def = distribute(M, d_blocks_m, assignment); fetch(dist_M_exp_def) + dist_M_blocks_exp = distribute(M, blocks_m, assignment); fetch(dist_M_blocks_exp) + + darr_A_exp_def = DArray( A, d_blocks_a, assignment); fetch(darr_A_exp_def) + dvec_v_exp_def = DVector( v, d_blocks_v, assignment); fetch(dvec_v_exp_def) + dmat_M_exp_def = DMatrix( M, d_blocks_m, assignment); fetch(dmat_M_exp_def) + + + @test chunk_processors(dist_A_exp_def) == chunk_processors(dist_A_blocks_exp) == chunk_processors(darr_A_exp_def) == tile_processors(get_assignment_procgrid(A, numprocs, blocks_a), blocks_a) + @test chunk_processors(dist_v_exp_def) == chunk_processors(dist_v_blocks_exp) == chunk_processors(dvec_v_exp_def) == tile_processors(get_assignment_procgrid(v, numprocs, blocks_v), blocks_v) + @test chunk_processors(dist_v_nblocks_exp) == tile_processors(get_assignment_procgrid(v, numprocs, blocks_v), blocks_v) + @test chunk_processors(dist_M_exp_def) == chunk_processors(dist_M_blocks_exp) == chunk_processors(dmat_M_exp_def) == tile_processors(get_assignment_procgrid(M, numprocs, blocks_m), blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=assignment); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=assignment); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=assignment); fetch(rand_M_exp) + + randn_A_exp = randn( d_blocks_a, size(A)...; assignment=assignment); fetch(randn_A_exp) + randn_v_exp = randn( d_blocks_v, size(v)...; assignment=assignment); fetch(randn_v_exp) + randn_M_exp = randn( d_blocks_m, size(M)...; assignment=assignment); fetch(randn_M_exp) + + sprand_v_exp = sprand(d_blocks_v, size(v)..., 0.5; assignment=assignment); fetch(sprand_v_exp) + sprand_M_exp = sprand(d_blocks_m, size(M)..., 0.5; assignment=assignment); fetch(sprand_M_exp) + + ones_A_exp = ones( d_blocks_a, size(A)...; assignment=assignment); fetch(ones_A_exp) + ones_v_exp = ones( d_blocks_v, size(v)...; assignment=assignment); fetch(ones_v_exp) + ones_M_exp = ones( d_blocks_m, size(M)...; assignment=assignment); fetch(ones_M_exp) + + zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=assignment); fetch(zeros_A_exp) + zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=assignment); fetch(zeros_v_exp) + zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=assignment); fetch(zeros_M_exp) + + @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(get_assignment_procgrid(A, numprocs, blocks_a), blocks_a) + @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(get_assignment_procgrid(v, numprocs, blocks_v), blocks_v) + @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(get_assignment_procgrid(M, numprocs, blocks_m), blocks_m) + + end + + end + + end + + test_assignment_strategy(:blockrow, get_blockrow_procgrid) + test_assignment_strategy(:blockcol, get_blockcol_procgrid) + test_assignment_strategy(:cyclicrow, get_cyclicrow_procgrid) + test_assignment_strategy(:cycliccol, get_cycliccol_procgrid) + + end + + @testset "OSProc ID Array Assignment (AbstractArray{<:Int, N})" begin + + function get_random_threadprocs(proc_ids) + [ThreadProc(proc, 1) for proc in proc_ids] + end + + rand_osproc_ids_A = rand(procs(), 3, 2, 2) + rand_osproc_ids_v = rand(procs(), 11) + rand_osproc_ids_M = rand(procs(), 2, 5) + + @testset "Auto Blocks" begin + + dist_A_rand_osproc_auto = distribute(A, rand_osproc_ids_A); fetch(dist_A_rand_osproc_auto) + dist_A_auto_rand_osproc = distribute(A, AutoBlocks(), rand_osproc_ids_A); fetch(dist_A_auto_rand_osproc) + dist_v_rand_osproc_auto = distribute(v, rand_osproc_ids_v); fetch(dist_v_rand_osproc_auto) + dist_v_auto_rand_osproc = distribute(v, AutoBlocks(), rand_osproc_ids_v); fetch(dist_v_auto_rand_osproc) + dist_M_rand_osproc_auto = distribute(M, rand_osproc_ids_M); fetch(dist_M_rand_osproc_auto) + dist_M_auto_rand_osproc = distribute(M, AutoBlocks(), rand_osproc_ids_M); fetch(dist_M_auto_rand_osproc) + + darr_A_rand_osproc_auto = DArray( A, rand_osproc_ids_A); fetch(darr_A_rand_osproc_auto) + darr_A_auto_rand_osproc = DArray( A, AutoBlocks(), rand_osproc_ids_A); fetch(darr_A_auto_rand_osproc) + dvec_v_rand_osproc_auto = DVector( v, rand_osproc_ids_v); fetch(dvec_v_rand_osproc_auto) + dvec_v_auto_rand_osproc = DVector( v, AutoBlocks(), rand_osproc_ids_v); fetch(dvec_v_auto_rand_osproc) + dmat_M_rand_osproc_auto = DMatrix( M, rand_osproc_ids_M); fetch(dmat_M_rand_osproc_auto) ### rand_osproc_ids_M assigned as Blocks + dmat_M_auto_rand_osproc = DMatrix( M, AutoBlocks(), rand_osproc_ids_M); fetch(dmat_M_auto_rand_osproc) + + @test chunk_processors(dist_A_rand_osproc_auto) == chunk_processors(dist_A_auto_rand_osproc) == chunk_processors(darr_A_rand_osproc_auto) == chunk_processors(darr_A_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_auto_rand_osproc) == chunk_processors(dvec_v_rand_osproc_auto) == chunk_processors(dvec_v_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_v_rand_osproc_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_rand_osproc_auto) == chunk_processors(dist_M_auto_rand_osproc) == chunk_processors(dmat_M_auto_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(M, numprocs)) + @test chunk_processors(dmat_M_rand_osproc_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(M, numprocs)) + end + + @testset "Functions with AutoBlocks" begin + + rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(rand_A_auto) + rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(rand_v_auto) + rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(rand_M_auto) + + randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(randn_A_auto) + randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(randn_v_auto) + randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(randn_M_auto) + + sprand_v_auto = sprand(AutoBlocks(), size(v)..., 0.5; assignment=rand_osproc_ids_v); fetch(sprand_v_auto) + sprand_M_auto = sprand(AutoBlocks(), size(M)..., 0.5; assignment=rand_osproc_ids_M); fetch(sprand_M_auto) + + ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(ones_A_auto) + ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(ones_v_auto) + ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(ones_M_auto) + + zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=rand_osproc_ids_A); fetch(zeros_A_auto) + zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=rand_osproc_ids_v); fetch(zeros_v_auto) + zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=rand_osproc_ids_M); fetch(zeros_M_auto) + + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), get_default_blockgrid(rand_A_auto, numprocs)) + @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), get_default_blockgrid(rand_v_auto, numprocs)) + @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), get_default_blockgrid(rand_M_auto, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_osproc = distribute(A, d_blocks_a, rand_osproc_ids_A); fetch(dist_A_exp_rand_osproc) + dist_A_blocks_rand_osproc = distribute(A, blocks_a, rand_osproc_ids_A); fetch(dist_A_blocks_rand_osproc) + dist_v_exp_rand_osproc = distribute(v, d_blocks_v, rand_osproc_ids_v); fetch(dist_v_exp_rand_osproc) + dist_v_blocks_rand_osproc = distribute(v, blocks_v, rand_osproc_ids_v); fetch(dist_v_blocks_rand_osproc) + dist_v_nblocks_rand_osproc = distribute(v, blocks_nv, rand_osproc_ids_v); fetch(dist_v_nblocks_rand_osproc) + dist_M_exp_rand_osproc = distribute(M, d_blocks_m, rand_osproc_ids_M); fetch(dist_M_exp_rand_osproc) + dist_M_blocks_rand_osproc = distribute(M, blocks_m, rand_osproc_ids_M); fetch(dist_M_blocks_rand_osproc) + + darr_A_exp_rand_osproc = DArray( A, d_blocks_a, rand_osproc_ids_A); fetch(darr_A_exp_rand_osproc) + dvec_v_exp_rand_osproc = DVector( v, d_blocks_v, rand_osproc_ids_v); fetch(dvec_v_exp_rand_osproc) + dmat_M_exp_rand_osproc = DMatrix( M, d_blocks_m, rand_osproc_ids_M); fetch(dmat_M_exp_rand_osproc) + + @test chunk_processors(dist_A_exp_rand_osproc) == chunk_processors(dist_A_blocks_rand_osproc) == chunk_processors(darr_A_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), blocks_a) + @test chunk_processors(dist_v_exp_rand_osproc) == chunk_processors(dist_v_blocks_rand_osproc) == chunk_processors(dvec_v_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(dist_v_nblocks_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(dist_M_exp_rand_osproc) == chunk_processors(dist_M_blocks_rand_osproc) == chunk_processors(dmat_M_exp_rand_osproc) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(rand_M_exp) + + randn_A_exp = randn( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(randn_A_exp) + randn_v_exp = randn( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(randn_v_exp) + randn_M_exp = randn( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(randn_M_exp) + + sprand_v_exp = sprand(d_blocks_v, size(v)..., 0.5; assignment=rand_osproc_ids_v); fetch(sprand_v_exp) + sprand_M_exp = sprand(d_blocks_m, size(M)..., 0.5; assignment=rand_osproc_ids_M); fetch(sprand_M_exp) + + ones_A_exp = ones( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(ones_A_exp) + ones_v_exp = ones( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(ones_v_exp) + ones_M_exp = ones( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(ones_M_exp) + + zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=rand_osproc_ids_A); fetch(zeros_A_exp) + zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=rand_osproc_ids_v); fetch(zeros_v_exp) + zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=rand_osproc_ids_M); fetch(zeros_M_exp) + + + @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_A), blocks_a) + @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_v), blocks_v) + @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(get_random_threadprocs(rand_osproc_ids_M), blocks_m) + + end + + end + + + @testset "Explicit Processor Array Assignment (AbstractArray{<:Processor, N})" begin + + rand_procs_A = reshape(availprocs[ rand(procs(), 6) ], 2, 3, 1) + rand_procs_v = reshape(availprocs[ rand(procs(), 5) ], 5) + rand_procs_M = reshape(availprocs[ rand(procs(), 14) ], 2, 7) + + + @testset "Auto Blocks" begin + + dist_A_rand_procs_auto = distribute(A, rand_procs_A); fetch(dist_A_rand_procs_auto) + dist_A_auto_rand_procs = distribute(A, AutoBlocks(), rand_procs_A); fetch(dist_A_auto_rand_procs) + dist_v_rand_procs_auto = distribute(v, rand_procs_v); fetch(dist_v_rand_procs_auto) + dist_v_auto_rand_procs = distribute(v, AutoBlocks(), rand_procs_v); fetch(dist_v_auto_rand_procs) + dist_M_rand_procs_auto = distribute(M, rand_procs_M); fetch(dist_M_rand_procs_auto) + dist_M_auto_rand_procs = distribute(M, AutoBlocks(), rand_procs_M); fetch(dist_M_auto_rand_procs) + + darr_A_rand_procs_auto = DArray( A, rand_procs_A); fetch(darr_A_rand_procs_auto) + darr_A_auto_rand_procs = DArray( A, AutoBlocks(), rand_procs_A); fetch(darr_A_auto_rand_procs) + dvec_v_rand_procs_auto = DVector( v, rand_procs_v); fetch(dvec_v_rand_procs_auto) + dvec_v_auto_rand_procs = DVector( v, AutoBlocks(), rand_procs_v); fetch(dvec_v_auto_rand_procs) + dmat_M_rand_procs_auto = DMatrix( M, rand_procs_M); fetch(dmat_M_rand_procs_auto) + dmat_M_auto_rand_procs = DMatrix( M, AutoBlocks(), rand_procs_M); fetch(dmat_M_auto_rand_procs) + + @test chunk_processors(dist_A_rand_procs_auto) == chunk_processors(dist_A_auto_rand_procs) == chunk_processors(darr_A_rand_procs_auto) == chunk_processors(darr_A_auto_rand_procs) == tile_processors(rand_procs_A, get_default_blockgrid(A, numprocs)) + @test chunk_processors(dist_v_rand_procs_auto) == chunk_processors(dist_v_auto_rand_procs) == chunk_processors(dvec_v_rand_procs_auto) == chunk_processors(dvec_v_auto_rand_procs) == tile_processors(rand_procs_v, get_default_blockgrid(v, numprocs)) + @test chunk_processors(dist_M_rand_procs_auto) == chunk_processors(dist_M_auto_rand_procs) == chunk_processors(dmat_M_rand_procs_auto) == chunk_processors(dmat_M_auto_rand_procs) == tile_processors(rand_procs_M, get_default_blockgrid(M, numprocs)) + + end + + @testset "Functions with AutoBlocks" begin + + rand_A_auto = rand( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(rand_A_auto) + rand_v_auto = rand( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(rand_v_auto) + rand_M_auto = rand( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(rand_M_auto) + + randn_A_auto = randn( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(randn_A_auto) + randn_v_auto = randn( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(randn_v_auto) + randn_M_auto = randn( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(randn_M_auto) + + sprand_v_auto = sprand(AutoBlocks(), size(v)..., 0.5; assignment=rand_procs_v); fetch(sprand_v_auto) + sprand_M_auto = sprand(AutoBlocks(), size(M)..., 0.5; assignment=rand_procs_M); fetch(sprand_M_auto) + + ones_A_auto = ones( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(ones_A_auto) + ones_v_auto = ones( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(ones_v_auto) + ones_M_auto = ones( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(ones_M_auto) + + zeros_A_auto = zeros( AutoBlocks(), size(A)...; assignment=rand_procs_A); fetch(zeros_A_auto) + zeros_v_auto = zeros( AutoBlocks(), size(v)...; assignment=rand_procs_v); fetch(zeros_v_auto) + zeros_M_auto = zeros( AutoBlocks(), size(M)...; assignment=rand_procs_M); fetch(zeros_M_auto) + + + @test chunk_processors(rand_A_auto) == chunk_processors(randn_A_auto) == chunk_processors(ones_A_auto) == chunk_processors(zeros_A_auto) == tile_processors(rand_procs_A, get_default_blockgrid(A, numprocs)) + @test chunk_processors(rand_v_auto) == chunk_processors(randn_v_auto) == chunk_processors(sprand_v_auto) == chunk_processors(ones_v_auto) == chunk_processors(zeros_v_auto) == tile_processors(rand_procs_v, get_default_blockgrid(v, numprocs)) + @test chunk_processors(rand_M_auto) == chunk_processors(randn_M_auto) == chunk_processors(sprand_M_auto) == chunk_processors(ones_M_auto) == chunk_processors(zeros_M_auto) == tile_processors(rand_procs_M, get_default_blockgrid(M, numprocs)) + + end + + @testset "Explicit Blocks" begin + + dist_A_exp_rand_procs = distribute(A, d_blocks_a, rand_procs_A); fetch(dist_A_exp_rand_procs) + dist_A_blocks_rand_procs = distribute(A, blocks_a, rand_procs_A); fetch(dist_A_blocks_rand_procs) + dist_v_exp_rand_procs = distribute(v, d_blocks_v, rand_procs_v); fetch(dist_v_exp_rand_procs) + dist_v_blocks_rand_procs = distribute(v, blocks_v, rand_procs_v); fetch(dist_v_blocks_rand_procs) + dist_v_nblocks_rand_procs = distribute(v, blocks_nv, rand_procs_v); fetch(dist_v_nblocks_rand_procs) + dist_M_exp_rand_procs = distribute(M, d_blocks_m, rand_procs_M); fetch(dist_M_exp_rand_procs) + dist_M_blocks_rand_procs = distribute(M, blocks_m, rand_procs_M); fetch(dist_M_blocks_rand_procs) + + darr_A_exp_rand_procs = DArray( A, d_blocks_a, rand_procs_A); fetch(darr_A_exp_rand_procs) + dvec_v_exp_rand_procs = DVector( v, d_blocks_v, rand_procs_v); fetch(dvec_v_exp_rand_procs) + dmat_M_exp_rand_procs = DMatrix( M, d_blocks_m, rand_procs_M); fetch(dmat_M_exp_rand_procs) + + @test chunk_processors(dist_A_exp_rand_procs) == chunk_processors(dist_A_blocks_rand_procs) == chunk_processors(darr_A_exp_rand_procs) == tile_processors(rand_procs_A, blocks_a) + @test chunk_processors(dist_v_exp_rand_procs) == chunk_processors(dist_v_blocks_rand_procs) == chunk_processors(dvec_v_exp_rand_procs) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(dist_v_nblocks_rand_procs) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(dist_M_exp_rand_procs) == chunk_processors(dist_M_blocks_rand_procs) == chunk_processors(dmat_M_exp_rand_procs) == tile_processors(rand_procs_M, blocks_m) + + end + + @testset "Functions with Explicit Blocks" begin + + rand_A_exp = rand( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(rand_A_exp) + rand_v_exp = rand( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(rand_v_exp) + rand_M_exp = rand( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(rand_M_exp) + + randn_A_exp = randn( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(randn_A_exp) + randn_v_exp = randn( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(randn_v_exp) + randn_M_exp = randn( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(randn_M_exp) + + sprand_v_exp = sprand(d_blocks_v, size(v)..., 0.5; assignment=rand_procs_v); fetch(sprand_v_exp) + sprand_M_exp = sprand(d_blocks_m, size(M)..., 0.5; assignment=rand_procs_M); fetch(sprand_M_exp) + + ones_A_exp = ones( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(ones_A_exp) + ones_v_exp = ones( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(ones_v_exp) + ones_M_exp = ones( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(ones_M_exp) + + zeros_A_exp = zeros( d_blocks_a, size(A)...; assignment=rand_procs_A); fetch(zeros_A_exp) + zeros_v_exp = zeros( d_blocks_v, size(v)...; assignment=rand_procs_v); fetch(zeros_v_exp) + zeros_M_exp = zeros( d_blocks_m, size(M)...; assignment=rand_procs_M); fetch(zeros_M_exp) + + @test chunk_processors(rand_A_exp) == chunk_processors(randn_A_exp) == chunk_processors(ones_A_exp) == chunk_processors(zeros_A_exp) == tile_processors(rand_procs_A, blocks_a) + @test chunk_processors(rand_v_exp) == chunk_processors(randn_v_exp) == chunk_processors(sprand_v_exp) == chunk_processors(ones_v_exp) == chunk_processors(zeros_v_exp) == tile_processors(rand_procs_v, blocks_v) + @test chunk_processors(rand_M_exp) == chunk_processors(randn_M_exp) == chunk_processors(sprand_M_exp) == chunk_processors(ones_M_exp) == chunk_processors(zeros_M_exp) == tile_processors(rand_procs_M, blocks_m) + + end + + end + +end + @testset "view" begin A = rand(64, 64) DA = view(A, Blocks(8, 8)) diff --git a/test/options.jl b/test/options.jl index e832a0827..91349ab9f 100644 --- a/test/options.jl +++ b/test/options.jl @@ -28,7 +28,6 @@ end for (option, default, value, value2) in [ # Special handling (:scope, AnyScope(), ProcessScope(first_wid), ProcessScope(last_wid)), - (:processor, OSProc(), Dagger.ThreadProc(first_wid, 1), Dagger.ThreadProc(last_wid, 1)), # ThunkOptions field (:single, 0, first_wid, last_wid), # Thunk field @@ -80,7 +79,7 @@ end @test fetch(Dagger.@spawn sf(obj)) == 0 @test fetch(Dagger.@spawn sf(obj)) == 0 end - Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), processor=OSProc(1), meta=true) do + Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), meta=true) do @test fetch(Dagger.@spawn sf(obj)) == 43 @test fetch(Dagger.@spawn sf(obj)) == 43 end diff --git a/test/runtests.jl b/test/runtests.jl index 20c7eb41c..a7b7a890a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ tests = [ ("Options", "options.jl"), ("Mutation", "mutation.jl"), ("Task Queues", "task-queues.jl"), + ("Task Affinity", "task-affinity.jl"), ("Datadeps", "datadeps.jl"), ("Streaming", "streaming.jl"), ("Domain Utilities", "domain.jl"), diff --git a/test/scheduler.jl b/test/scheduler.jl index b12ad3e1e..9f00485a8 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -1,3 +1,4 @@ +import Dagger: Chunk import Dagger.Sch: SchedulerOptions, ThunkOptions, SchedulerHaltedException, ComputeState, ThunkID, sch_handle @everywhere begin @@ -162,10 +163,8 @@ end @test Dagger.default_enabled(Dagger.ThreadProc(1,1)) == true @test Dagger.default_enabled(FakeProc()) == false - opts = Dagger.Sch.ThunkOptions(;proclist=[Dagger.ThreadProc]) - as = [delayed(identity; options=opts)(i) for i in 1:5] - opts = Dagger.Sch.ThunkOptions(;proclist=[FakeProc]) - b = delayed(fakesum; options=opts)(as...) + as = [delayed(identity; proclist=[Dagger.ThreadProc])(i) for i in 1:5] + b = delayed(fakesum; proclist=[FakeProc], compute_scope=Dagger.AnyScope())(as...) @test collect(Context(), b) == FakeVal(57) end diff --git a/test/scopes.jl b/test/scopes.jl index ecade7ab4..fa5bf1135 100644 --- a/test/scopes.jl +++ b/test/scopes.jl @@ -1,6 +1,5 @@ -#@everywhere ENV["JULIA_DEBUG"] = "Dagger" @testset "Chunk Scopes" begin - wid1, wid2 = addprocs(2, exeflags=["-t 2"]) + wid1, wid2 = addprocs(2, exeflags=["-t2", "--project=$(Base.active_project())"]) @everywhere [wid1,wid2] using Dagger Dagger.addprocs!(Dagger.Sch.eager_context(), [wid1,wid2]) fetch(Dagger.@spawn 1+1) # Force scheduler to pick up new workers @@ -57,7 +56,7 @@ # Different nodes for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Process Scope" begin @@ -76,7 +75,7 @@ # Different process for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process and node @@ -84,7 +83,7 @@ # Different process and node for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Exact Scope" begin @@ -105,14 +104,14 @@ # Different process, different processor for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process, different processor es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2)) es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2) for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Union Scope" begin @@ -269,5 +268,16 @@ @test Dagger.num_processors() == length(comp_procs) end + @testset "scope comparison" begin + scope1 = Dagger.scope(worker=wid1) + scope2 = Dagger.scope(worker=wid1, thread=1) + @test issubset(scope2, scope1) + @test !issubset(scope1, scope2) + @test issetequal(scope1, scope1) + @test issetequal(scope2, scope2) + @test !issetequal(scope1, scope2) + @test !issetequal(scope2, scope1) + end + rmprocs([wid1, wid2]) end diff --git a/test/task-affinity.jl b/test/task-affinity.jl new file mode 100644 index 000000000..64d357e36 --- /dev/null +++ b/test/task-affinity.jl @@ -0,0 +1,883 @@ +@testset "Task affinity" begin +<<<<<<< HEAD + fetch_or_invalidscope(x::DTask) = try + fetch(x; raw=true) + nothing + catch err + @assert Dagger.Sch.unwrap_nested_exception(err) isa Dagger.Sch.SchedulingException + return Dagger.InvalidScope + end + get_compute_scope(x::DTask) = Dagger.Sch._find_thunk(x).compute_scope + + get_result_scope(x::DTask) = Dagger.Sch._find_thunk(x).result_scope + + get_final_result_scope(x::DTask) = @something(fetch_or_invalidscope(x), fetch(x; raw=true).scope) + + function get_execution_scope(x::DTask) + res = fetch_or_invalidscope(x) + if res !== nothing + return res + end + thunk = Dagger.Sch._find_thunk(x) + compute_scope = thunk.compute_scope + result_scope = thunk.result_scope + f_scope = thunk.f isa Dagger.Chunk ? thunk.f.scope : Dagger.AnyScope() + inputs_scopes = Dagger.AbstractScope[] + for input in thunk.inputs + if input isa Dagger.Chunk + push!(inputs_scopes, input.scope) + else + push!(inputs_scopes, Dagger.AnyScope()) + end + end + return Dagger.constrain(compute_scope, result_scope, f_scope, inputs_scopes...) +======= + + get_compute_scope(x::DTask) = try + Dagger.Sch._find_thunk(x).compute_scope + catch + Dagger.InvalidScope + end + + get_result_scope(x::DTask) = try + fetch(x; raw=true).scope + catch + Dagger.InvalidScope + end + + get_execution_scope(x::DTask) = try + chunk = fetch(x; raw=true) + Dagger.ExactScope(chunk.processor) + catch + Dagger.InvalidScope + end + + function intersect_scopes(scope1::Dagger.AbstractScope, scopes::Dagger.AbstractScope...) + for s in scopes + scope1 = Dagger.constrain(scope1, s) + scope1 isa Dagger.InvalidScope && return (scope1,) + end + return scope1.scopes +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + availprocs = collect(Dagger.all_processors()) + availscopes = shuffle!(Dagger.ExactScope.(availprocs)) + numscopes = length(availscopes) + + master_proc = Dagger.ThreadProc(1, 1) + master_scope = Dagger.ExactScope(master_proc) + +<<<<<<< HEAD + @testset "scope, compute_scope and result_scope" begin +======= + @testset "Function: scope, compute_scope and result_scope" begin + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + @everywhere f(x) = x + 1 + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn scope=scope_only f(10); fetch(task1) + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == Dagger.AnyScope() +<<<<<<< HEAD + @test get_final_result_scope(task1) == Dagger.AnyScope() + @test issubset(get_execution_scope(task1), scope_only) +======= + + execution_scope1 = get_execution_scope(task1) + @test execution_scope1 in intersect_scopes(execution_scope1,scope_only) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + scope = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn compute_scope=compute_scope_only f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only f(20); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == Dagger.AnyScope() +<<<<<<< HEAD + @test get_final_result_scope(task1) == get_final_result_scope(task2) == Dagger.AnyScope() + @test issubset(get_execution_scope(task1), compute_scope_only) && + issubset(get_execution_scope(task2), compute_scope_only) +======= + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + @test execution_scope1 in intersect_scopes(execution_scope1, compute_scope_only) && + execution_scope2 in intersect_scopes(execution_scope2, compute_scope_only) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn result_scope=result_scope_only f(10); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == result_scope_only +<<<<<<< HEAD + @test get_final_result_scope(task1) == result_scope_only + @test issubset(get_execution_scope(task1), result_scope_only) +======= +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "compute_scope and result_scope with intersection" begin + if numscopes >= 3 + n = cld(numscopes, 3) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) +<<<<<<< HEAD + all_scope_intersect = Dagger.constrain(compute_scope_intersect, result_scope_intersect) +======= +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect f(20); fetch(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect f(30); fetch(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_intersect +<<<<<<< HEAD + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == all_scope_intersect + @test issubset(get_execution_scope(task1), all_scope_intersect) && + issubset(get_execution_scope(task2), all_scope_intersect) && + issubset(get_execution_scope(task3), all_scope_intersect) +======= + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) + @test execution_scope1 in intersect_scopes(execution_scope1, compute_scope_intersect, result_scope_intersect) && + execution_scope2 in intersect_scopes(execution_scope2, compute_scope_intersect, result_scope_intersect) && + execution_scope3 in intersect_scopes(execution_scope3, compute_scope_intersect, result_scope_intersect) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(numscopes, 2) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect f(10); wait(task1) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect f(20); wait(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect f(30); wait(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect +<<<<<<< HEAD + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_no_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == Dagger.InvalidScope + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + end + + @testset "Chunk function, scope, compute_scope and result_scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + + n = cld(numscopes, 3) + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "scope" begin + scope_only = Dagger.UnionScope(scope_a..., scope_b...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn scope=scope_only g_chunk(10, 11); fetch(task1) + + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == Dagger.AnyScope() + @test get_final_result_scope(task1) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task1), all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(scope_a..., scope_b...) + scope = Dagger.UnionScope(scope_c...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_only g_chunk(10, 11); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g_chunk(20, 21); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == Dagger.AnyScope() + @test get_final_result_scope(task1) == get_final_result_scope(task2) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task1), + get_execution_scope(task2), + all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(scope_a..., scope_b...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(result_scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn result_scope=result_scope_only g_chunk(10, 11); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == result_scope_only + @test get_final_result_scope(task1) == result_scope_only + @test issetequal(get_execution_scope(task1), all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "compute_scope and result_scope with intersection" begin + if length(availscopes) >= 3 +======= + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == Dagger.InvalidScope + + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk function: scope, compute_scope and result_scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + n = cld(numscopes, 2) + + chunk_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + chunk_proc = rand(availprocs) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn scope=scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + + @test execution_scope1 == Dagger.ExactScope(chunk_proc) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn compute_scope=compute_scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + @test execution_scope1 == execution_scope2 == Dagger.ExactScope(chunk_proc) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + task1 = Dagger.@spawn result_scope=result_scope_only Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + @test execution_scope1 == Dagger.ExactScope(chunk_proc) + end + + @testset "compute_scope and result_scope with intersection" begin + if length(availscopes) >= 3 + n = cld(numscopes, 3) + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) +<<<<<<< HEAD + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_intersect, result_scope_intersect, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g_chunk(10, 11); fetch(task1) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g_chunk(20, 21); fetch(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g_chunk(30, 31); fetch(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == result_scope_intersect + @test issetequal(get_execution_scope(task1), + get_execution_scope(task2), + get_execution_scope(task3), + all_scope) + end + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] +======= + + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); fetch(task1 ) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); fetch(task2 ) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope); fetch(task3 ) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == chunk_scope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) + @test execution_scope1 == execution_scope2 == execution_scope3 == Dagger.ExactScope(chunk_proc) + end + end + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(length(availscopes), 2) + +<<<<<<< HEAD +======= + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) +<<<<<<< HEAD + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g_chunk(10, 11); wait(task1) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g_chunk(20, 21); wait(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g_chunk(30, 31); wait(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_no_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == Dagger.InvalidScope +======= + + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect Dagger.tochunk(g(10, 11), chunk_proc, chunk_scope); wait(task1 ) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect Dagger.tochunk(g(20, 21), chunk_proc, chunk_scope); wait(task2 ) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect Dagger.tochunk(g(30, 31), chunk_proc, chunk_scope); wait(task3 ) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == Dagger.InvalidScope + + execution_scope1 = get_execution_scope(task1) + execution_scope2 = get_execution_scope(task2) + execution_scope3 = get_execution_scope(task3) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + + end + +<<<<<<< HEAD + @testset "Chunk arguments, scope, compute_scope and result_scope with non-intersection of chunk arg and scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + +======= + @testset "Chunk arguments: scope, compute_scope and result_scope with non-intersection of chunk arg and scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + n = cld(numscopes, 2) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + arg_scope = Dagger.UnionScope(scope_a...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == scope_only +<<<<<<< HEAD + @test get_result_scope(task11) == Dagger.AnyScope() + @test get_final_result_scope(task11) == Dagger.InvalidScope +======= + @test get_result_scope(task11) == Dagger.InvalidScope + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + execution_scope11 = get_execution_scope(task11) + + @test execution_scope11 == Dagger.InvalidScope + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + scope = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) +<<<<<<< HEAD + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); wait(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.AnyScope() + @test get_final_result_scope(task11) == get_final_result_scope(task21) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == Dagger.InvalidScope +======= + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); wait(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + @test execution_scope11 == execution_scope21 == Dagger.InvalidScope +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) +<<<<<<< HEAD + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == result_scope_only + @test get_final_result_scope(task11) == Dagger.InvalidScope + @test get_execution_scope(task11) == Dagger.InvalidScope +======= + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + @test execution_scope11 == Dagger.InvalidScope +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "compute_scope and result_scope with intersection" begin + if length(scope_b) >= 3 + n = cld(length(scope_b), 3) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:2n] + scope_bc = scope_b[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_ba..., scope_bb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bb..., scope_bc...) + +<<<<<<< HEAD + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope +======= + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); wait(task11 ) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test execution_scope11 == execution_scope21 == execution_scope31 == Dagger.InvalidScope +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(scope_b) >= 2 + n = cld(length(scope_b), 2) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bb...) + +<<<<<<< HEAD + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_no_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope +======= + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); ; wait(task11 ) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); ; wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + end + +<<<<<<< HEAD + @testset "Chunk arguments, scope, compute_scope and result_scope with intersection of chunk arg and scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + + shuffle!(availscopes) +======= + @testset "Chunk arguments: scope, compute_scope and result_scope with intersection of chunk arg and scope" begin + + @everywhere g(x, y) = x * 2 + y * 3 + + availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + n = cld(numscopes, 3) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + arg_scope = Dagger.UnionScope(scope_a..., scope_b...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin +<<<<<<< HEAD + scope_only = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(scope_only, arg_scope) +======= + scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == scope_only + @test get_result_scope(task11) == Dagger.AnyScope() +<<<<<<< HEAD + @test get_final_result_scope(task11) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task11), all_scope) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(scope_b..., scope_c...) + scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_only, arg_scope) + +======= + + execution_scope11 = get_execution_scope(task11) + + @test execution_scope11 in intersect_scopes(execution_scope11, scope_only, arg_scope) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + scope = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); fetch(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); fetch(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.AnyScope() +<<<<<<< HEAD + @test get_final_result_scope(task11) == get_final_result_scope(task21) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task11), + get_execution_scope(task21), + all_scope) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(result_scope_only, arg_scope) + +======= + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + @test execution_scope11 in intersect_scopes(execution_scope11, compute_scope_only, arg_scope) && + execution_scope11 in intersect_scopes(execution_scope11, compute_scope_only, arg_scope) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))..., rand(scope_c, rand(1:length(scope_c)))...) + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == result_scope_only +<<<<<<< HEAD + @test get_final_result_scope(task11) == result_scope_only + @test issetequal(get_execution_scope(task11), all_scope) +======= + + execution_scope11 = get_execution_scope(task11) + @test execution_scope11 in intersect_scopes(execution_scope11, result_scope_only, arg_scope) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + + @testset "compute_scope and result_scope with intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 3 + n = cld(length(scope_bc), 3) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:2n] + scope_bcc = scope_bc[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_bca..., scope_bcb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bcb..., scope_bcc...) +<<<<<<< HEAD + all_scope = Dagger.constrain(compute_scope_intersect, result_scope_intersect, arg_scope) + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); fetch(task11) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); fetch(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); fetch(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == result_scope_intersect + @test issetequal(get_execution_scope(task11), + get_execution_scope(task21), + get_execution_scope(task31), + all_scope) +======= + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); fetch(task11 ) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); fetch(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); fetch(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test execution_scope11 in intersect_scopes(execution_scope11, compute_scope_intersect, result_scope_intersect, arg_scope) && + execution_scope21 in intersect_scopes(execution_scope21, scope_intersect, result_scope_intersect, arg_scope) && + execution_scope31 in intersect_scopes(execution_scope31, compute_scope_intersect, result_scope_intersect, arg_scope) +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 + end + end + + @testset "compute_scope and result_scope without intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 2 + n = cld(length(scope_bc), 2) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bcb...) + +<<<<<<< HEAD + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_no_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + end +======= + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11 ) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21 ) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31 ) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + execution_scope11 = get_execution_scope(task11) + execution_scope21 = get_execution_scope(task21) + execution_scope31 = get_execution_scope(task31) + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + end + + # @testset "Chunk function with Chunk arguments: scope, compute_scope and result_scope with non-intersection of chunk arg and chunk scope" begin + + # @everywhere g(x, y) = x * 2 + y * 3 + + # availscopes = shuffle!(Dagger.ExactScope.(collect(Dagger.all_processors()))) + + # chunk_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # chunk_proc = rand(chunk_scope.scopes).processor + # arg_scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # arg_proc = rand(arg_scope.scopes).processor + # arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + # @testset "scope" begin + # scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn scope=scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + + # @test get_compute_scope(task11) == scope_only + # @test get_result_scope(task11) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + + # @test execution_scope11 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "compute_scope" begin + # compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + # scope = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn compute_scope=compute_scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + # task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); fetch(task21) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + # @test get_result_scope(task11) == get_result_scope(task21) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # @test execution_scope11 == execution_scope21 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "result_scope" begin + # result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:numscopes))) + + # task11 = Dagger.@spawn result_scope=result_scope_only arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); fetch(task11) + + # @test get_compute_scope(task11) == Dagger.DefaultScope() + # @test get_result_scope(task11) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # @test execution_scope11 == Dagger.ExactScope(chunk_proc) + # end + + # @testset "compute_scope and result_scope with intersection" begin + # if numscopes >= 3 + # n = cld(numscopes, 3) + + # shuffle!(availscopes) + # scope_a = availscopes[1:n] + # scope_b = availscopes[n+1:2n] + # scope_c = availscopes[2n+1:end] + + # compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + # scope_intersect = compute_scope_intersect + # scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + # result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + + # task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); wait(task11 ) + # task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); wait(task21 ) + # task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect arg -> Dagger.tochunk(g(arg, 31), chunk_proc, chunk_scope); wait(task31 ) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + # @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == chunk_scope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # execution_scope31 = get_execution_scope(task31) + # @test execution_scope11 == execution_scope21 == execution_scope31 == Dagger.ExactScope(chunk_proc) + # end + # end + + # @testset "compute_scope and result_scope without intersection" begin + # if numscopes >= 2 + # n = cld(numscopes, 2) + + # shuffle!(availscopes) + # scope_a = availscopes[1:n] + # scope_b = availscopes[n+1:end] + + # compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + # scope_no_intersect = Dagger.UnionScope(scope_a...) + # scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + # result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + # task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 11), chunk_proc, chunk_scope); wait(task11 ) + # task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 21), chunk_proc, chunk_scope); wait(task21 ) + # task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect arg -> Dagger.tochunk(g(arg, 31), chunk_proc, chunk_scope); wait(task31 ) + + # @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + # @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == Dagger.InvalidScope + + # execution_scope11 = get_execution_scope(task11) + # execution_scope21 = get_execution_scope(task21) + # execution_scope31 = get_execution_scope(task31) + # @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + # end + # end + + # end + +>>>>>>> 0b15479548bd89fc051c7c388be1e7399d2ba669 +end \ No newline at end of file diff --git a/test/thunk.jl b/test/thunk.jl index 06ba25144..8f4477df6 100644 --- a/test/thunk.jl +++ b/test/thunk.jl @@ -21,8 +21,8 @@ import Dagger: Chunk end MulProc() = MulProc(myid()) Dagger.get_parent(mp::MulProc) = OSProc(mp.owner) - Dagger.move(src::MulProc, dest::Dagger.OSProc, x::Function) = Base.:* - Dagger.move(src::MulProc, dest::Dagger.ThreadProc, x::Function) = Base.:* + Dagger.move(src::MulProc, dest::Dagger.OSProc, ::Function) = Base.:* + Dagger.move(src::MulProc, dest::Dagger.ThreadProc, ::Function) = Base.:* end @testset "@par" begin @@ -326,21 +326,11 @@ end @testset "lazy API" begin a = delayed(+)(1,2) @test !(a.f isa Chunk) + @test a.compute_scope == Dagger.DefaultScope() a = delayed(+; scope=NodeScope())(1,2) - @test a.f isa Chunk - @test a.f.processor isa OSProc - @test a.f.scope isa NodeScope - - a = delayed(+; processor=Dagger.ThreadProc(1,1))(1,2) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope == DefaultScope() - - a = delayed(+; processor=Dagger.ThreadProc(1,1), scope=NodeScope())(1,2) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope isa NodeScope + @test !(a.f isa Chunk) + @test a.compute_scope isa NodeScope @testset "Scope Restrictions" begin pls = ProcessLockedStruct(Ptr{Int}(42)) @@ -354,28 +344,16 @@ end end @testset "Processor Data Movement" begin @everywhere Dagger.add_processor_callback!(()->MulProc(), :mulproc) - @test collect(delayed(+; processor=MulProc())(3,4)) == 12 + plus_chunk = Dagger.tochunk(+, MulProc()) + @test collect(delayed(plus_chunk)(3,4)) == 12 @everywhere Dagger.delete_processor_callback!(:mulproc) end end @testset "eager API" begin _a = Dagger.@spawn scope=NodeScope() 1+2 a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa OSProc - @test a.f.scope isa NodeScope - - _a = Dagger.@spawn processor=Dagger.ThreadProc(1,1) 1+2 - a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope == DefaultScope() - - _a = Dagger.@spawn processor=Dagger.ThreadProc(1,1) scope=NodeScope() 1+2 - a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope isa NodeScope + @test !(a.f isa Chunk) + @test a.compute_scope isa NodeScope end end @testset "parent fetch child, one thread" begin