@@ -396,7 +396,7 @@ function _get_write_deps!(state::DataDepsState{DataDepsAliasingState}, ainfo::Ab
396396 other_task, other_write_num = other_task_write_num
397397 write_num == other_write_num && continue
398398 @dagdebug nothing :spawn_datadeps " Sync with writer via $ainfo -> $other_ainfo "
399- push! (syncdeps, other_task)
399+ push! (syncdeps, ThunkSyncdep ( other_task) )
400400 end
401401end
402402function _get_read_deps! (state:: DataDepsState{DataDepsAliasingState} , ainfo:: AbstractAliasing , task, write_num, syncdeps)
@@ -408,7 +408,7 @@ function _get_read_deps!(state::DataDepsState{DataDepsAliasingState}, ainfo::Abs
408408 for (other_task, other_write_num) in other_tasks
409409 write_num == other_write_num && continue
410410 @dagdebug nothing :spawn_datadeps " Sync with reader via $ainfo -> $other_ainfo "
411- push! (syncdeps, other_task)
411+ push! (syncdeps, ThunkSyncdep ( other_task) )
412412 end
413413 end
414414end
@@ -427,14 +427,14 @@ function _get_write_deps!(state::DataDepsState{DataDepsNonAliasingState}, arg, t
427427 if other_task_write_num != = nothing
428428 other_task, other_write_num = other_task_write_num
429429 if write_num != other_write_num
430- push! (syncdeps, other_task)
430+ push! (syncdeps, ThunkSyncdep ( other_task) )
431431 end
432432 end
433433end
434434function _get_read_deps! (state:: DataDepsState{DataDepsNonAliasingState} , arg, task, write_num, syncdeps)
435435 for (other_task, other_write_num) in state. alias_state. args_readers[arg]
436436 if write_num != other_write_num
437- push! (syncdeps, other_task)
437+ push! (syncdeps, ThunkSyncdep ( other_task) )
438438 end
439439 end
440440end
@@ -590,6 +590,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
590590 write_num = 1
591591 proc_idx = 1
592592 pressures = Dict {Processor,Int} ()
593+ proc_to_scope_lfu = BasicLFUCache {Processor,AbstractScope} (1024 )
593594 for (spec, task) in queue. seen_tasks[task_order]
594595 # Populate all task dependencies
595596 populate_task_info! (state, spec, task)
@@ -723,9 +724,20 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
723724 end
724725 @assert our_proc in all_procs
725726 our_space = only (memory_spaces (our_proc))
726- our_procs = filter (proc-> proc in all_procs, collect (processors (our_space)))
727- task_scope = @something (spec. options. scope, AnyScope ())
728- our_scope = constrain (UnionScope (map (ExactScope, our_procs)... ), task_scope)
727+
728+ # Find the scope for this task (and its copies)
729+ task_scope = @something (spec. options. compute_scope, spec. options. scope, DefaultScope ())
730+ if task_scope == scope
731+ # Optimize for the common case, cache the proc=>scope mapping
732+ our_scope = get! (proc_to_scope_lfu, our_proc) do
733+ our_procs = filter (proc-> proc in all_procs, collect (processors (our_space)))
734+ return constrain (UnionScope (map (ExactScope, our_procs)... ), scope)
735+ end
736+ else
737+ # Use the provided scope and constrain it to the available processors
738+ our_procs = filter (proc-> proc in all_procs, collect (processors (our_space)))
739+ our_scope = constrain (UnionScope (map (ExactScope, our_procs)... ), task_scope)
740+ end
729741 if our_scope isa InvalidScope
730742 throw (Sch. SchedulingException (" Scopes are not compatible: $(our_scope. x) , $(our_scope. y) " ))
731743 end
@@ -769,10 +781,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
769781 generate_slot! (state, data_space, arg)
770782 end
771783 copy_to_scope = our_scope
772- copy_to_syncdeps = Set {Any } ()
784+ copy_to_syncdeps = Set {ThunkSyncdep } ()
773785 get_write_deps! (state, ainfo, task, write_num, copy_to_syncdeps)
774786 @dagdebug nothing :spawn_datadeps " ($(repr (value (f))) )[$idx ][$dep_mod ] $(length (copy_to_syncdeps)) syncdeps"
775- copy_to = Dagger. @spawn scope= copy_to_scope syncdeps= copy_to_syncdeps meta= true Dagger. move! (dep_mod, our_space, data_space, arg_remote, arg_local)
787+ copy_to = Dagger. @spawn scope= copy_to_scope exec_scope = copy_to_scope syncdeps= copy_to_syncdeps meta= true Dagger. move! (dep_mod, our_space, data_space, arg_remote, arg_local)
776788 add_writer! (state, ainfo, copy_to, write_num)
777789
778790 astate. data_locality[ainfo] = our_space
@@ -790,10 +802,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
790802 generate_slot! (state, data_space, arg)
791803 end
792804 copy_to_scope = our_scope
793- copy_to_syncdeps = Set {Any } ()
805+ copy_to_syncdeps = Set {ThunkSyncdep } ()
794806 get_write_deps! (state, arg, task, write_num, copy_to_syncdeps)
795807 @dagdebug nothing :spawn_datadeps " ($(repr (value (f))) )[$idx ] $(length (copy_to_syncdeps)) syncdeps"
796- copy_to = Dagger. @spawn scope= copy_to_scope syncdeps= copy_to_syncdeps Dagger. move! (identity, our_space, data_space, arg_remote, arg_local)
808+ copy_to = Dagger. @spawn scope= copy_to_scope exec_scope = copy_to_scope syncdeps= copy_to_syncdeps Dagger. move! (identity, our_space, data_space, arg_remote, arg_local)
797809 add_writer! (state, arg, copy_to, write_num)
798810
799811 astate. data_locality[arg] = our_space
@@ -820,7 +832,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
820832
821833 # Calculate this task's syncdeps
822834 if spec. options. syncdeps === nothing
823- spec. options. syncdeps = Set {Any } ()
835+ spec. options. syncdeps = Set {ThunkSyncdep } ()
824836 end
825837 syncdeps = spec. options. syncdeps
826838 for (idx, (_, arg)) in enumerate (task_args)
@@ -853,6 +865,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
853865
854866 # Launch user's task
855867 spec. options. scope = our_scope
868+ spec. options. exec_scope = our_scope
856869 enqueue! (upper_queue, spec=> task)
857870
858871 # Update read/write tracking for arguments
@@ -947,10 +960,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
947960 @assert arg_remote != = arg_local
948961 data_local_proc = first (processors (data_local_space))
949962 copy_from_scope = UnionScope (map (ExactScope, collect (processors (data_local_space)))... )
950- copy_from_syncdeps = Set ()
963+ copy_from_syncdeps = Set {ThunkSyncdep} ()
951964 get_write_deps! (state, ainfo, nothing , write_num, copy_from_syncdeps)
952965 @dagdebug nothing :spawn_datadeps " $(length (copy_from_syncdeps)) syncdeps"
953- copy_from = Dagger. @spawn scope= copy_from_scope syncdeps= copy_from_syncdeps meta= true Dagger. move! (dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
966+ copy_from = Dagger. @spawn scope= copy_from_scope exec_scope = copy_from_scope syncdeps= copy_from_syncdeps meta= true Dagger. move! (dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
954967 else
955968 @dagdebug nothing :spawn_datadeps " [$dep_mod ] Skipped copy-from (local): $data_remote_space "
956969 end
@@ -980,10 +993,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
980993 @assert arg_remote != = arg_local
981994 data_local_proc = first (processors (data_local_space))
982995 copy_from_scope = ExactScope (data_local_proc)
983- copy_from_syncdeps = Set ()
996+ copy_from_syncdeps = Set {ThunkSyncdep} ()
984997 get_write_deps! (state, arg, nothing , write_num, copy_from_syncdeps)
985998 @dagdebug nothing :spawn_datadeps " $(length (copy_from_syncdeps)) syncdeps"
986- copy_from = Dagger. @spawn scope= copy_from_scope syncdeps= copy_from_syncdeps meta= true Dagger. move! (identity, data_local_space, data_remote_space, arg_local, arg_remote)
999+ copy_from = Dagger. @spawn scope= copy_from_scope exec_scope = copy_from_scope syncdeps= copy_from_syncdeps meta= true Dagger. move! (identity, data_local_space, data_remote_space, arg_local, arg_remote)
9871000 else
9881001 @dagdebug nothing :spawn_datadeps " Skipped copy-from (local): $data_remote_space "
9891002 end
0 commit comments