Skip to content

Commit 9246a77

Browse files
committed
submission: Take lock on workers
1 parent 0d525cc commit 9246a77

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

src/submission.jl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# Remote
22
function eager_submit_internal!(@nospecialize(payload))
3-
Sch.init_eager()
4-
53
ctx = Dagger.Sch.eager_context()
64
state = Dagger.Sch.EAGER_STATE[]
75
task = current_task()
@@ -12,8 +10,6 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
1210
@nospecialize payload
1311
ntasks, uid, future, ref, f, args, options, reschedule = payload
1412

15-
Sch.init_eager()
16-
1713
if uid isa Vector
1814
thunk_ids = Sch.ThunkID[]
1915
for i in 1:ntasks
@@ -125,7 +121,14 @@ function eager_submit!(ntasks, uid, future, finalizer_ref, f, args, options)
125121
h = Dagger.sch_handle()
126122
return exec!(eager_submit_internal!, h, ntasks, uid, future, finalizer_ref, f, args, options, true)
127123
elseif myid() != 1
128-
return remotecall_fetch(eager_submit_internal!, 1, (ntasks, uid, future, finalizer_ref, f, args, options, true))
124+
return remotecall_fetch(1, (ntasks, uid, future, finalizer_ref, f, args, options, true)) do payload
125+
@nospecialize payload
126+
Sch.init_eager()
127+
state = Dagger.Sch.EAGER_STATE[]
128+
lock(state.lock) do
129+
eager_submit_internal!(payload)
130+
end
131+
end
129132
else
130133
Sch.init_eager()
131134
state = Dagger.Sch.EAGER_STATE[]

0 commit comments

Comments
 (0)