diff --git a/docs/make.jl b/docs/make.jl index fffad5017..1e05647d8 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -20,6 +20,7 @@ makedocs(; "Parallel Nested Loops" => "use-cases/parallel-nested-loops.md", ], "Task Spawning" => "task-spawning.md", + "Task Affinity" => "task-affinity.md", "Data Management" => "data-management.md", "Distributed Arrays" => "darray.md", "Streaming Tasks" => "streaming.md", diff --git a/docs/src/task-affinity.md b/docs/src/task-affinity.md new file mode 100644 index 000000000..53303923e --- /dev/null +++ b/docs/src/task-affinity.md @@ -0,0 +1,131 @@ +# Task Affinity + +Dagger's allows for precise control over task placement and result availability using scopes. Tasks are assigned based on the combination of multiple scopes: `scope`/`compute_scope`, and `result_scope` (which can all be specified with `@spawn`), and additionally the scopes of any arguments to the task (in the form of a scope attached to a `Chunk` argument). Let's take a look at how to configure these scopes, and how they work together to direct task placement. + +For more information on how scopes work, see [Scopes](@ref). + +--- + +## Task Scopes + +### Scope + +`scope` defines the general set of locations where a Dagger task can execute. If `scope` is not specified, the task falls back to `DefaultScope()`, allowing it to run wherever execution is possible. Execution occurs on any worker within the defined scope. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3) f(x,y) +``` +Task `g` executes only on worker 3. Its result can be accessed by any worker. + +--- + +### Compute Scope + +Like `scope`, `compute_scope` also specifies where a Dagger task can execute. The key difference is if both `compute_scope` and `scope` are provided, `compute_scope` takes precedence over `scope` for execution placement. If neither is specified, then they default to `DefaultScope()`. + +**Example:** +```julia +g1 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +g2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) f(x,y) +``` +Tasks `g1` and `g2` execute on either thread 2 of worker 1, or thread 1 of worker 3. The `scope` argument to `g1` is ignored. Their result can be accessed by any worker. + +--- + +### Result Scope + +The result_scope limits the processors from which a task's result can be accessed. This can be useful for managing data locality and minimizing transfers. If `result_scope` is not specified, it defaults to `AnyScope()`, meaning the result can be accessed by any processor (including those not default enabled for task execution, such as GPUs). + +**Example:** +```julia +g = Dagger.@spawn result_scope=Dagger.scope(worker=3, threads=[1, 3, 4]) f(x,y) +``` +The result of `g` is accessible only from threads 1, 3 and 4 of worker process 3. The task's execution may happen anywhere on threads 1, 3 and 4 of worker 3. + +--- + +## Interaction of `compute_scope` and `result_scope` + +When `scope`/`compute_scope` and `result_scope` are specified, the scheduler executes the task on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and the `result_scope`. If the intersection is empty, then the scheduler throws a `Dagger.Sch.SchedulerException` error. + +**Example:** +```julia +g = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) f(x,y) +``` +The task `g` computes on thread 2 of worker 2 (as it's the intersection of compute and result scopes), but accessng its result is restricted to thread 2 of worker 2 and thread 2 of worker 4. + +--- + +## Function as a Chunk + +This section explains how `scope`/`compute_scope` and `result_scope` affect tasks when a `Chunk` is used to specify the function to be executed by `@spawn` (e.g. created via `Dagger.tochunk(...)` or by calling `fetch(task; raw=true)` on a task). This may seem strange (to use a `Chunk` to specify the function to be executed), but it can be useful with working with callable structs, such as closures or Flux.jl models. + +Assume `g` is some function, e.g. `g(x, y) = x * 2 + y * 3`, and `chunk_scope` is its defined affinity. + +When `Dagger.tochunk(...)` is used to pass a `Chunk` as the function to be executed by `@spawn`: +- The result is accessible only on processors in `chunk_scope`. +- Dagger validates that there is an intersection between `chunk_scope`, the effective `compute_scope` (derived from `@spawn`'s `compute_scope` or `scope`), and the `result_scope`. If no intersection exists, the scheduler throws an exception. + +!!! info While `chunk_proc` is currently required when constructing a chunk, it is only used to pick the most optimal processor for accessing the chunk; it does not affect which set of processors the task may execute on. + +**Usage:** +```julia +chunk_scope = Dagger.scope(worker=3) +chunk_proc = Dagger.OSProc(3) # not important, just needs to be a valid processor +g(x, y) = x * 2 + y * 3 +g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) +h1 = Dagger.@spawn scope=Dagger.scope(worker=3) g_chunk(10, 11) +h2 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(20, 21) +h3 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=3, thread=1)) g_chunk(30, 31) +h4 = Dagger.@spawn result_scope=Dagger.scope(worker=3) g_chunk(40, 41) +h5 = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=3) result_scope=Dagger.scope(worker=3,threads=[2,3]) g_chunk(50, 51) +``` +In all these cases (`h1` through `h5`), the tasks get executed on any processor within `chunk_scope` and its result is accessible only within `chunk_scope`. + +--- + +## Chunk arguments + +This section details behavior when some or all of a task's arguments are `Chunk`s. + +Assume `g(x, y) = x * 2 + y * 3`, and `arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope)`, where `arg_scope` is the argument's defined scope. Assume `arg_scope = Dagger.scope(worker=2)`. + +### Scope +If `arg_scope` and `scope` do not intersect, the scheduler throws an exception. Execution occurs on the intersection of `scope` and `arg_scope`. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=2) g(arg, 11) +``` +Task `h` executes on any worker within the intersection of `scope` and `arg_scope`. The result is accessible from any processor. + +--- + +### Compute scope and Chunk argument scopes interaction +If `arg_scope` and `compute_scope` do not intersect, the scheduler throws an exception. Otherwise, execution happens on the intersection of the effective compute scope (which will be `compute_scope` if provided, otherwise `scope`) and `arg_scope`. + +```julia +h1 = Dagger.@spawn compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 11) +h2 = Dagger.@spawn scope=Dagger.scope(worker=2,thread=3) compute_scope=Dagger.scope((worker=1, thread=2), (worker=2, thread=1)) g(arg, 21) +``` +Tasks `h1` and `h2` execute on any processor within the intersection of the `compute_scope` and `arg_scope`. `scope` is ignored if `compute_scope` is specified. The result is accessible from any processor. + +--- + +### Result scope and Chunk argument scopes interaction +If only `result_scope` is specified, computation happens on any processor within the intersection of `arg_scope` and `result_scope`, and the result is only accessible within `result_scope`. + +```julia +h = Dagger.@spawn result_scope=Dagger.scope(worker=2) g(arg, 11) +``` +Task `h` executes on any processor within the intersection of `arg_scope` and `result_scope`. The result is accessible from only within `result_scope`. + +--- + +### Compute, result, and chunk argument scopes interaction +When `scope`/`compute_scope`, `result_scope`, and `Chunk` argument scopes are all used, the scheduler executes the task on the intersection of `arg_scope`, the effective compute scope (which is `compute_scope` if provided, otherwise `scope`), and `result_scope`. If no intersection exists, the scheduler throws an exception. + +```julia +h = Dagger.@spawn scope=Dagger.scope(worker=3,thread=2) compute_scope=Dagger.scope(worker=2) result_scope=Dagger.scope((worker=2, thread=2), (worker=4, thread=2)) g(arg, 31) +``` +Task `h` computes on thread 2 of worker 2 (as it's the intersection of `arg_scope`, `compute_scope`, and `result_scope`), and its result access is restricted to thread 2 of worker 2 or thread 2 of worker 4. diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index b894f4526..0335fe3e0 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -14,7 +14,7 @@ import Random: randperm import Base: @invokelatest import ..Dagger -import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, InvalidScope, LockedObject import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime import ..Dagger: @dagdebug, @safe_lock_spin1 import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek @@ -726,16 +726,25 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) sig = signature(state, task) # Calculate scope - scope = if task.f isa Chunk - task.f.scope - else - if task.options.proclist !== nothing - # proclist overrides scope selection - AnyScope() - else - DefaultScope() + scope = constrain(task.compute_scope, task.result_scope) + if scope isa InvalidScope + ex = SchedulingException("compute_scope and result_scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task + end + if task.f isa Chunk + scope = constrain(scope, task.f.scope) + if scope isa InvalidScope + ex = SchedulingException("Current scope and function Chunk Scope are not compatible: $(scope.x), $(scope.y)") + state.cache[task] = ex + state.errored[task] = true + set_failed!(state, task) + @goto pop_task end end + for (_,input) in task.inputs input = unwrap_weak_checked(input) chunk = if istask(input) @@ -747,8 +756,8 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) end chunk isa Chunk || continue scope = constrain(scope, chunk.scope) - if scope isa Dagger.InvalidScope - ex = SchedulingException("Scopes are not compatible: $(scope.x), $(scope.y)") + if scope isa InvalidScope + ex = SchedulingException("Current scope and argument Chunk scope are not compatible: $(scope.x), $(scope.y)") state.cache[task] = ex state.errored[task] = true set_failed!(state, task) @@ -1086,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options, propagated, ids, positions, (log_sink=ctx.log_sink, profile=ctx.profile), - sch_handle, state.uid]) + sch_handle, state.uid, thunk.result_scope]) end # N.B. We don't batch these because we might get a deserialization # error due to something not being defined on the worker, and then we don't @@ -1305,7 +1314,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re task = task_spec[] scope = task[5] if !isa(constrain(scope, Dagger.ExactScope(to_proc)), - Dagger.InvalidScope) && + InvalidScope) && typemax(UInt32) - proc_occupancy_cached >= occupancy # Compatible, steal this task return dequeue_pair!(queue) @@ -1488,7 +1497,7 @@ function do_task(to_proc, task_desc) scope, Tf, data, send_result, persist, cache, meta, options, propagated, ids, positions, - ctx_vars, sch_handle, sch_uid = task_desc + ctx_vars, sch_handle, sch_uid, result_scope = task_desc ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile) from_proc = OSProc() @@ -1696,7 +1705,7 @@ function do_task(to_proc, task_desc) # Construct result # TODO: We should cache this locally - send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache, + send_result || meta ? res : tochunk(res, to_proc, result_scope; device, persist, cache=persist ? true : cache, tag=options.storage_root_tag, leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()), retain=options.storage_retain) diff --git a/src/sch/util.jl b/src/sch/util.jl index 01138a052..dd148d336 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -42,9 +42,7 @@ function get_propagated_options(thunk) nt = NamedTuple() for key in thunk.propagates value = if key == :scope - isa(thunk.f, Chunk) ? thunk.f.scope : DefaultScope() - elseif key == :processor - isa(thunk.f, Chunk) ? thunk.f.processor : OSProc() + thunk.compute_scope elseif key in fieldnames(Thunk) getproperty(thunk, key) elseif key in fieldnames(ThunkOptions) @@ -340,7 +338,7 @@ function can_use_proc(state, task, gproc, proc, opts, scope) scope = constrain(scope, Dagger.ExactScope(proc)) elseif opts.proclist isa Vector if !(typeof(proc) in opts.proclist) - @dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist)" + @dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist) ($(opts.proclist))" return false, scope end scope = constrain(scope, diff --git a/src/scopes.jl b/src/scopes.jl index 0545c573e..ecb3e5e03 100644 --- a/src/scopes.jl +++ b/src/scopes.jl @@ -240,6 +240,17 @@ constrain(x::ProcessScope, y::ExactScope) = constrain(x::NodeScope, y::ExactScope) = x == y.parent.parent ? y : InvalidScope(x, y) + +function constrain(scope1, scope2, scopes...) + scope1 = constrain(scope1, scope2) + scope1 isa InvalidScope && return scope1 + for s in scopes + scope1 = constrain(scope1, s) + scope1 isa InvalidScope && return scope1 + end + return scope1 +end + ### Scopes helper """ @@ -412,3 +423,26 @@ to_scope(::Val{key}, sc::NamedTuple) where key = # Base case for all Dagger-owned keys scope_key_precedence(::Val) = 0 + +### Scope comparison helpers + +function Base.issetequal(scopes::AbstractScope...) + scope1 = scopes[1] + scope1_procs = Dagger.compatible_processors(scope1) + for scope2 in scopes[2:end] + scope2_procs = Dagger.compatible_processors(scope2) + if !issetequal(scope1_procs, scope2_procs) + return false + end + end + return true +end + +function Base.issubset(scope1::AbstractScope, scope2::AbstractScope) + scope1_procs = compatible_processors(scope1) + scope2_procs = compatible_processors(scope2) + for proc in scope1_procs + proc in scope2_procs || return false + end + return true +end \ No newline at end of file diff --git a/src/stream.jl b/src/stream.jl index 07a3dae95..885a9e931 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -289,6 +289,20 @@ struct StreamingFunction{F, S} new{F, S}(f, stream, max_evals) end +struct DestPostMigration + thunk_id::Int + cancel_token::CancelToken + f + DestPostMigration(thunk_id, tls, f) = new(thunk_id, tls.cancel_token, f) +end +function (dpm::DestPostMigration)(store, unsent) + STREAM_THUNK_ID[] = dpm.thunk_id + @assert !in_task() + tls = DTaskTLS(OSProc(), typemax(UInt64), nothing, [], dpm.cancel_token) + set_tls!(tls) + return dpm.f(store, unsent) +end + function migrate_stream!(stream::Stream, w::Integer=myid()) # Perform migration of the StreamStore # MemPool will block access to the new ref until the migration completes @@ -318,11 +332,8 @@ function migrate_stream!(stream::Stream, w::Integer=myid()) empty!(store.output_buffers) return (unsent_inputs, unsent_outputs) end, - dest_post_migration=(store, unsent)->begin + dest_post_migration=DestPostMigration(thunk_id, tls, (store, unsent)->begin # Initialize the StreamStore on the destination with the unsent inputs/outputs. - STREAM_THUNK_ID[] = thunk_id - @assert !in_task() - set_tls!(tls) #get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true) unsent_inputs, unsent_outputs = unsent for (input_uid, inputs) in unsent_inputs @@ -342,7 +353,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid()) # Reset the state of this new store store.open = true store.migrating = false - end, + end), post_migration=store->begin # Indicate that this store has migrated store.migrating = true diff --git a/src/thunk.jl b/src/thunk.jl index b24806f85..659ee9a2c 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -73,6 +73,8 @@ mutable struct Thunk eager_ref::Union{DRef,Nothing} options::Any # stores scheduler-specific options propagates::Tuple # which options we'll propagate + compute_scope::AbstractScope + result_scope::AbstractScope function Thunk(f, xs...; syncdeps=nothing, id::Int=next_id(), @@ -84,16 +86,14 @@ mutable struct Thunk affinity=nothing, eager_ref=nothing, processor=nothing, - scope=nothing, + scope=DefaultScope(), + compute_scope=scope, + result_scope=AnyScope(), options=nothing, propagates=(), kwargs... ) - if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope)) - f = tochunk(f, - something(processor, OSProc()), - something(scope, DefaultScope())) - end + xs = Base.mapany(identity, xs) syncdeps_set = Set{Any}(filterany(is_task_or_chunk, Base.mapany(last, xs))) if syncdeps !== nothing @@ -105,11 +105,11 @@ mutable struct Thunk if options !== nothing @assert isempty(kwargs) new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, - cache_ref, affinity, eager_ref, options, propagates) + cache_ref, affinity, eager_ref, options, propagates, compute_scope, result_scope) else new(f, xs, syncdeps_set, id, get_result, meta, persist, cache, cache_ref, affinity, eager_ref, Sch.ThunkOptions(;kwargs...), - propagates) + propagates, compute_scope, result_scope) end end end @@ -476,15 +476,6 @@ function spawn(f, args...; kwargs...) args = args[2:end] end - # Wrap f in a Chunk if necessary - processor = haskey(options, :processor) ? options.processor : nothing - scope = haskey(options, :scope) ? options.scope : nothing - if !isnothing(processor) || !isnothing(scope) - f = tochunk(f, - something(processor, get_options(:processor, OSProc())), - something(scope, get_options(:scope, DefaultScope()))) - end - # Process the args and kwargs into Pair form args_kwargs = args_kwargs_to_pairs(args, kwargs) diff --git a/test/options.jl b/test/options.jl index e832a0827..91349ab9f 100644 --- a/test/options.jl +++ b/test/options.jl @@ -28,7 +28,6 @@ end for (option, default, value, value2) in [ # Special handling (:scope, AnyScope(), ProcessScope(first_wid), ProcessScope(last_wid)), - (:processor, OSProc(), Dagger.ThreadProc(first_wid, 1), Dagger.ThreadProc(last_wid, 1)), # ThunkOptions field (:single, 0, first_wid, last_wid), # Thunk field @@ -80,7 +79,7 @@ end @test fetch(Dagger.@spawn sf(obj)) == 0 @test fetch(Dagger.@spawn sf(obj)) == 0 end - Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), processor=OSProc(1), meta=true) do + Dagger.with_options(;scope=Dagger.ExactScope(Dagger.ThreadProc(1,1)), meta=true) do @test fetch(Dagger.@spawn sf(obj)) == 43 @test fetch(Dagger.@spawn sf(obj)) == 43 end diff --git a/test/runtests.jl b/test/runtests.jl index 20c7eb41c..a7b7a890a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ tests = [ ("Options", "options.jl"), ("Mutation", "mutation.jl"), ("Task Queues", "task-queues.jl"), + ("Task Affinity", "task-affinity.jl"), ("Datadeps", "datadeps.jl"), ("Streaming", "streaming.jl"), ("Domain Utilities", "domain.jl"), diff --git a/test/scheduler.jl b/test/scheduler.jl index b12ad3e1e..9f00485a8 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -1,3 +1,4 @@ +import Dagger: Chunk import Dagger.Sch: SchedulerOptions, ThunkOptions, SchedulerHaltedException, ComputeState, ThunkID, sch_handle @everywhere begin @@ -162,10 +163,8 @@ end @test Dagger.default_enabled(Dagger.ThreadProc(1,1)) == true @test Dagger.default_enabled(FakeProc()) == false - opts = Dagger.Sch.ThunkOptions(;proclist=[Dagger.ThreadProc]) - as = [delayed(identity; options=opts)(i) for i in 1:5] - opts = Dagger.Sch.ThunkOptions(;proclist=[FakeProc]) - b = delayed(fakesum; options=opts)(as...) + as = [delayed(identity; proclist=[Dagger.ThreadProc])(i) for i in 1:5] + b = delayed(fakesum; proclist=[FakeProc], compute_scope=Dagger.AnyScope())(as...) @test collect(Context(), b) == FakeVal(57) end diff --git a/test/scopes.jl b/test/scopes.jl index ecade7ab4..fa5bf1135 100644 --- a/test/scopes.jl +++ b/test/scopes.jl @@ -1,6 +1,5 @@ -#@everywhere ENV["JULIA_DEBUG"] = "Dagger" @testset "Chunk Scopes" begin - wid1, wid2 = addprocs(2, exeflags=["-t 2"]) + wid1, wid2 = addprocs(2, exeflags=["-t2", "--project=$(Base.active_project())"]) @everywhere [wid1,wid2] using Dagger Dagger.addprocs!(Dagger.Sch.eager_context(), [wid1,wid2]) fetch(Dagger.@spawn 1+1) # Force scheduler to pick up new workers @@ -57,7 +56,7 @@ # Different nodes for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Process Scope" begin @@ -76,7 +75,7 @@ # Different process for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process and node @@ -84,7 +83,7 @@ # Different process and node for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Exact Scope" begin @@ -105,14 +104,14 @@ # Different process, different processor for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process, different processor es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2)) es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2) for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)] - @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap (Dagger.DTaskFailedException, Dagger.Sch.SchedulingException) reason<"Current scope and argument Chunk scope are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Union Scope" begin @@ -269,5 +268,16 @@ @test Dagger.num_processors() == length(comp_procs) end + @testset "scope comparison" begin + scope1 = Dagger.scope(worker=wid1) + scope2 = Dagger.scope(worker=wid1, thread=1) + @test issubset(scope2, scope1) + @test !issubset(scope1, scope2) + @test issetequal(scope1, scope1) + @test issetequal(scope2, scope2) + @test !issetequal(scope1, scope2) + @test !issetequal(scope2, scope1) + end + rmprocs([wid1, wid2]) end diff --git a/test/task-affinity.jl b/test/task-affinity.jl new file mode 100644 index 000000000..cd57e273b --- /dev/null +++ b/test/task-affinity.jl @@ -0,0 +1,457 @@ +@testset "Task affinity" begin + fetch_or_invalidscope(x::DTask) = try + fetch(x; raw=true) + nothing + catch err + @assert Dagger.Sch.unwrap_nested_exception(err) isa Dagger.Sch.SchedulingException + return Dagger.InvalidScope + end + get_compute_scope(x::DTask) = Dagger.Sch._find_thunk(x).compute_scope + + get_result_scope(x::DTask) = Dagger.Sch._find_thunk(x).result_scope + + get_final_result_scope(x::DTask) = @something(fetch_or_invalidscope(x), fetch(x; raw=true).scope) + + function get_execution_scope(x::DTask) + res = fetch_or_invalidscope(x) + if res !== nothing + return res + end + thunk = Dagger.Sch._find_thunk(x) + compute_scope = thunk.compute_scope + result_scope = thunk.result_scope + f_scope = thunk.f isa Dagger.Chunk ? thunk.f.scope : Dagger.AnyScope() + inputs_scopes = Dagger.AbstractScope[] + for input in thunk.inputs + if input isa Dagger.Chunk + push!(inputs_scopes, input.scope) + else + push!(inputs_scopes, Dagger.AnyScope()) + end + end + return Dagger.constrain(compute_scope, result_scope, f_scope, inputs_scopes...) + end + + availprocs = collect(Dagger.all_processors()) + availscopes = shuffle!(Dagger.ExactScope.(availprocs)) + numscopes = length(availscopes) + + master_proc = Dagger.ThreadProc(1, 1) + master_scope = Dagger.ExactScope(master_proc) + + @testset "scope, compute_scope and result_scope" begin + @everywhere f(x) = x + 1 + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn scope=scope_only f(10); fetch(task1) + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == Dagger.AnyScope() + @test get_final_result_scope(task1) == Dagger.AnyScope() + @test issubset(get_execution_scope(task1), scope_only) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + scope = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn compute_scope=compute_scope_only f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only f(20); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == Dagger.AnyScope() + @test get_final_result_scope(task1) == get_final_result_scope(task2) == Dagger.AnyScope() + @test issubset(get_execution_scope(task1), compute_scope_only) && + issubset(get_execution_scope(task2), compute_scope_only) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + + task1 = Dagger.@spawn result_scope=result_scope_only f(10); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == result_scope_only + @test get_final_result_scope(task1) == result_scope_only + @test issubset(get_execution_scope(task1), result_scope_only) + end + + @testset "compute_scope and result_scope with intersection" begin + if numscopes >= 3 + n = cld(numscopes, 3) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + all_scope_intersect = Dagger.constrain(compute_scope_intersect, result_scope_intersect) + + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect f(10); fetch(task1) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect f(20); fetch(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect f(30); fetch(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == all_scope_intersect + @test issubset(get_execution_scope(task1), all_scope_intersect) && + issubset(get_execution_scope(task2), all_scope_intersect) && + issubset(get_execution_scope(task3), all_scope_intersect) + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(numscopes, 2) + + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) + + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect f(10); wait(task1) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect f(20); wait(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect f(30); wait(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_no_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == Dagger.InvalidScope + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + end + + @testset "Chunk function, scope, compute_scope and result_scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + + n = cld(numscopes, 3) + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "scope" begin + scope_only = Dagger.UnionScope(scope_a..., scope_b...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn scope=scope_only g_chunk(10, 11); fetch(task1) + + @test get_compute_scope(task1) == scope_only + @test get_result_scope(task1) == Dagger.AnyScope() + @test get_final_result_scope(task1) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task1), all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(scope_a..., scope_b...) + scope = Dagger.UnionScope(scope_c...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_only g_chunk(10, 11); fetch(task1) + task2 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g_chunk(20, 21); fetch(task2) + + @test get_compute_scope(task1) == get_compute_scope(task2) == compute_scope_only + @test get_result_scope(task1) == get_result_scope(task2) == Dagger.AnyScope() + @test get_final_result_scope(task1) == get_final_result_scope(task2) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task1), + get_execution_scope(task2), + all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(scope_a..., scope_b...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(result_scope_only, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn result_scope=result_scope_only g_chunk(10, 11); fetch(task1) + + @test get_compute_scope(task1) == Dagger.DefaultScope() + @test get_result_scope(task1) == result_scope_only + @test get_final_result_scope(task1) == result_scope_only + @test issetequal(get_execution_scope(task1), all_scope) + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + @testset "compute_scope and result_scope with intersection" begin + if length(availscopes) >= 3 + compute_scope_intersect = Dagger.UnionScope(scope_a..., scope_b...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_b..., scope_c...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_intersect, result_scope_intersect, chunk_scope) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g_chunk(10, 11); fetch(task1) + task2 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g_chunk(20, 21); fetch(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g_chunk(30, 31); fetch(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == result_scope_intersect + @test issetequal(get_execution_scope(task1), + get_execution_scope(task2), + get_execution_scope(task3), + all_scope) + end + end + + shuffle!(availscopes) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + @testset "compute_scope and result_scope without intersection" begin + if length(availscopes) >= 2 + n = cld(length(availscopes), 2) + + compute_scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_no_intersect = Dagger.UnionScope(scope_a...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_b...) + chunk_proc = rand(availprocs) + chunk_scope = Dagger.UnionScope(scope_b..., scope_c...) + + g_chunk = Dagger.tochunk(g, chunk_proc, chunk_scope) + task1 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g_chunk(10, 11); wait(task1) + task2 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g_chunk(20, 21); wait(task2) + task3 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g_chunk(30, 31); wait(task3) + + @test get_compute_scope(task1) == get_compute_scope(task2) == get_compute_scope(task3) == compute_scope_no_intersect + @test get_result_scope(task1) == get_result_scope(task2) == get_result_scope(task3) == result_scope_no_intersect + @test get_final_result_scope(task1) == get_final_result_scope(task2) == get_final_result_scope(task3) == Dagger.InvalidScope + @test get_execution_scope(task1) == get_execution_scope(task2) == get_execution_scope(task3) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk arguments, scope, compute_scope and result_scope with non-intersection of chunk arg and scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + + n = cld(numscopes, 2) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:end] + + arg_scope = Dagger.UnionScope(scope_a...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin + scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == scope_only + @test get_result_scope(task11) == Dagger.AnyScope() + @test get_final_result_scope(task11) == Dagger.InvalidScope + execution_scope11 = get_execution_scope(task11) + + @test execution_scope11 == Dagger.InvalidScope + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + scope = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); wait(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.AnyScope() + @test get_final_result_scope(task11) == get_final_result_scope(task21) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == Dagger.InvalidScope + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(rand(scope_b, rand(1:length(scope_b)))) + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); wait(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == result_scope_only + @test get_final_result_scope(task11) == Dagger.InvalidScope + @test get_execution_scope(task11) == Dagger.InvalidScope + end + + @testset "compute_scope and result_scope with intersection" begin + if length(scope_b) >= 3 + n = cld(length(scope_b), 3) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:2n] + scope_bc = scope_b[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_ba..., scope_bb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bb..., scope_bc...) + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + @testset "compute_scope and result_scope without intersection" begin + if length(scope_b) >= 2 + n = cld(length(scope_b), 2) + + scope_ba = scope_b[1:n] + scope_bb = scope_b[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_no_intersect = Dagger.UnionScope(scope_ba...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bb...) + + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_no_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + + end + + @testset "Chunk arguments, scope, compute_scope and result_scope with intersection of chunk arg and scope" begin + @everywhere g(x, y) = x * 2 + y * 3 + + shuffle!(availscopes) + n = cld(numscopes, 3) + scope_a = availscopes[1:n] + scope_b = availscopes[n+1:2n] + scope_c = availscopes[2n+1:end] + + arg_scope = Dagger.UnionScope(scope_a..., scope_b...) + arg_proc = rand(availprocs) + arg = Dagger.tochunk(g(1, 2), arg_proc, arg_scope) + + @testset "scope" begin + scope_only = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(scope_only, arg_scope) + + task11 = Dagger.@spawn scope=scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == scope_only + @test get_result_scope(task11) == Dagger.AnyScope() + @test get_final_result_scope(task11) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task11), all_scope) + end + + @testset "compute_scope" begin + compute_scope_only = Dagger.UnionScope(scope_b..., scope_c...) + scope = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(compute_scope_only, arg_scope) + + task11 = Dagger.@spawn compute_scope=compute_scope_only g(arg, 11); fetch(task11) + task21 = Dagger.@spawn scope=scope compute_scope=compute_scope_only g(arg, 21); fetch(task21) + + @test get_compute_scope(task11) == get_compute_scope(task21) == compute_scope_only + @test get_result_scope(task11) == get_result_scope(task21) == Dagger.AnyScope() + @test get_final_result_scope(task11) == get_final_result_scope(task21) == Dagger.AnyScope() + @test issetequal(get_execution_scope(task11), + get_execution_scope(task21), + all_scope) + end + + @testset "result_scope" begin + result_scope_only = Dagger.UnionScope(scope_b..., scope_c...) + all_scope = Dagger.constrain(result_scope_only, arg_scope) + + task11 = Dagger.@spawn result_scope=result_scope_only g(arg, 11); fetch(task11) + + @test get_compute_scope(task11) == Dagger.DefaultScope() + @test get_result_scope(task11) == result_scope_only + @test get_final_result_scope(task11) == result_scope_only + @test issetequal(get_execution_scope(task11), all_scope) + end + + @testset "compute_scope and result_scope with intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 3 + n = cld(length(scope_bc), 3) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:2n] + scope_bcc = scope_bc[2n+1:end] + + compute_scope_intersect = Dagger.UnionScope(scope_bca..., scope_bcb...) + scope_intersect = compute_scope_intersect + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_intersect = Dagger.UnionScope(scope_bcb..., scope_bcc...) + all_scope = Dagger.constrain(compute_scope_intersect, result_scope_intersect, arg_scope) + + task11 = Dagger.@spawn compute_scope=compute_scope_intersect result_scope=result_scope_intersect g(arg, 11); fetch(task11) + task21 = Dagger.@spawn scope=scope_intersect result_scope=result_scope_intersect g(arg, 21); fetch(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_intersect scope=scope_rand result_scope=result_scope_intersect g(arg, 31); fetch(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == result_scope_intersect + @test issetequal(get_execution_scope(task11), + get_execution_scope(task21), + get_execution_scope(task31), + all_scope) + end + end + + @testset "compute_scope and result_scope without intersection" begin + scope_bc = [scope_b...,scope_c...] + if length(scope_bc) >= 2 + n = cld(length(scope_bc), 2) + + scope_bca = scope_bc[1:n] + scope_bcb = scope_bc[n+1:end] + + compute_scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_no_intersect = Dagger.UnionScope(scope_bca...) + scope_rand = Dagger.UnionScope(rand(availscopes, rand(1:length(availscopes)))) + result_scope_no_intersect = Dagger.UnionScope(scope_bcb...) + + task11 = Dagger.@spawn compute_scope=compute_scope_no_intersect result_scope=result_scope_no_intersect g(arg, 11); wait(task11) + task21 = Dagger.@spawn scope=scope_no_intersect result_scope=result_scope_no_intersect g(arg, 21); wait(task21) + task31 = Dagger.@spawn compute_scope=compute_scope_no_intersect scope=scope_rand result_scope=result_scope_no_intersect g(arg, 31); wait(task31) + + @test get_compute_scope(task11) == get_compute_scope(task21) == get_compute_scope(task31) == compute_scope_no_intersect + @test get_result_scope(task11) == get_result_scope(task21) == get_result_scope(task31) == result_scope_no_intersect + @test get_final_result_scope(task11) == get_final_result_scope(task21) == get_final_result_scope(task31) == Dagger.InvalidScope + @test get_execution_scope(task11) == get_execution_scope(task21) == get_execution_scope(task31) == Dagger.InvalidScope + end + end + end +end \ No newline at end of file diff --git a/test/thunk.jl b/test/thunk.jl index 06ba25144..8f4477df6 100644 --- a/test/thunk.jl +++ b/test/thunk.jl @@ -21,8 +21,8 @@ import Dagger: Chunk end MulProc() = MulProc(myid()) Dagger.get_parent(mp::MulProc) = OSProc(mp.owner) - Dagger.move(src::MulProc, dest::Dagger.OSProc, x::Function) = Base.:* - Dagger.move(src::MulProc, dest::Dagger.ThreadProc, x::Function) = Base.:* + Dagger.move(src::MulProc, dest::Dagger.OSProc, ::Function) = Base.:* + Dagger.move(src::MulProc, dest::Dagger.ThreadProc, ::Function) = Base.:* end @testset "@par" begin @@ -326,21 +326,11 @@ end @testset "lazy API" begin a = delayed(+)(1,2) @test !(a.f isa Chunk) + @test a.compute_scope == Dagger.DefaultScope() a = delayed(+; scope=NodeScope())(1,2) - @test a.f isa Chunk - @test a.f.processor isa OSProc - @test a.f.scope isa NodeScope - - a = delayed(+; processor=Dagger.ThreadProc(1,1))(1,2) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope == DefaultScope() - - a = delayed(+; processor=Dagger.ThreadProc(1,1), scope=NodeScope())(1,2) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope isa NodeScope + @test !(a.f isa Chunk) + @test a.compute_scope isa NodeScope @testset "Scope Restrictions" begin pls = ProcessLockedStruct(Ptr{Int}(42)) @@ -354,28 +344,16 @@ end end @testset "Processor Data Movement" begin @everywhere Dagger.add_processor_callback!(()->MulProc(), :mulproc) - @test collect(delayed(+; processor=MulProc())(3,4)) == 12 + plus_chunk = Dagger.tochunk(+, MulProc()) + @test collect(delayed(plus_chunk)(3,4)) == 12 @everywhere Dagger.delete_processor_callback!(:mulproc) end end @testset "eager API" begin _a = Dagger.@spawn scope=NodeScope() 1+2 a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa OSProc - @test a.f.scope isa NodeScope - - _a = Dagger.@spawn processor=Dagger.ThreadProc(1,1) 1+2 - a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope == DefaultScope() - - _a = Dagger.@spawn processor=Dagger.ThreadProc(1,1) scope=NodeScope() 1+2 - a = Dagger.Sch._find_thunk(_a) - @test a.f isa Chunk - @test a.f.processor isa Dagger.ThreadProc - @test a.f.scope isa NodeScope + @test !(a.f isa Chunk) + @test a.compute_scope isa NodeScope end end @testset "parent fetch child, one thread" begin