Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,11 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
@assert our_proc in all_procs
our_space = only(memory_spaces(our_proc))
our_procs = filter(proc->proc in all_procs, collect(processors(our_space)))
our_scope = UnionScope(map(ExactScope, our_procs)...)
task_scope = get(spec.options, :scope, AnyScope())
our_scope = constrain(UnionScope(map(ExactScope, our_procs)...), task_scope)
if our_scope isa InvalidScope
throw(Sch.SchedulingException("Scopes are not compatible: $(our_scope.x), $(our_scope.y)"))
end

spec.f = move(ThreadProc(myid(), 1), our_proc, spec.f)
@dagdebug nothing :spawn_datadeps "($(repr(spec.f))) Scheduling: $our_proc ($our_space)"
Expand Down
7 changes: 6 additions & 1 deletion test/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ function test_datadeps(;args_chunks::Bool,

# FIXME: Deps

# Scope
# Outer Scope
exec_procs = fetch.(Dagger.spawn_datadeps(;aliasing) do
[Dagger.@spawn Dagger.task_processor() for i in 1:10]
end)
Expand All @@ -424,6 +424,11 @@ function test_datadeps(;args_chunks::Bool,
@test proc in exec_procs
end

# Inner Scope
@test_throws Dagger.Sch.SchedulingException Dagger.spawn_datadeps() do
Dagger.@spawn scope=Dagger.ExactScope(Dagger.ThreadProc(1, 5000)) 1+1
end

# Add-to-copy
A = rand(1000)
B = rand(1000)
Expand Down