@@ -64,7 +64,9 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
6464 push! (thunk_ids, tid)
6565 uid_to_tid[payload. uid[i]] = tid. id
6666 end
67- put! (state. chan, Sch. RescheduleSignal ())
67+ @lock state. lock begin
68+ put! (state. chan, Sch. RescheduleSignal ())
69+ end
6870 return thunk_ids
6971 end
7072 payload:: PayloadOne
@@ -94,20 +96,26 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
9496 else
9597 uid_to_tid[arg_uid]
9698 end
97- @inbounds fargs[idx] = Argument (arg. pos, state. thunk_dict[arg_tid])
99+ @lock state. lock begin
100+ @inbounds fargs[idx] = Argument (arg. pos, state. thunk_dict[arg_tid])
101+ end
98102 elseif valuetype (arg) <: Sch.ThunkID
99103 arg_tid = (value (arg):: Sch.ThunkID ). id
100- @inbounds fargs[idx] = Argument (arg. pos, state. thunk_dict[arg_tid])
104+ @lock state. lock begin
105+ @inbounds fargs[idx] = Argument (arg. pos, state. thunk_dict[arg_tid])
106+ end
101107 elseif valuetype (arg) <: Chunk
102108 # N.B. Different Chunks with the same DRef handle will hash to the same slot,
103109 # so we just pick an equivalent Chunk as our upstream
104110 chunk = value (arg):: Chunk
105111 function find_equivalent_chunk (state, chunk:: C ) where {C<: Chunk }
106- if haskey (state. equiv_chunks, chunk. handle)
107- return state. equiv_chunks[chunk. handle]:: C
108- else
109- state. equiv_chunks[chunk. handle] = chunk
110- return chunk
112+ @lock state. lock begin
113+ if haskey (state. equiv_chunks, chunk. handle)
114+ return state. equiv_chunks[chunk. handle]:: C
115+ else
116+ state. equiv_chunks[chunk. handle] = chunk
117+ return chunk
118+ end
111119 end
112120 end
113121 chunk = find_equivalent_chunk (state, chunk)
@@ -124,10 +132,14 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
124132 else
125133 uid_to_tid[dep. uid]
126134 end
127- @inbounds syncdeps_vec[idx] = state. thunk_dict[tid]
135+ @lock state. lock begin
136+ @inbounds syncdeps_vec[idx] = state. thunk_dict[tid]
137+ end
128138 elseif dep isa Sch. ThunkID
129139 tid = dep. id
130- @inbounds syncdeps_vec[idx] = state. thunk_dict[tid]
140+ @lock state. lock begin
141+ @inbounds syncdeps_vec[idx] = state. thunk_dict[tid]
142+ end
131143 end
132144 end
133145 end
@@ -164,27 +176,29 @@ function eager_submit_internal!(ctx, state, task, tid, payload::AnyPayload; uid_
164176 #= FIXME :UNIQUE=#
165177 thunk_id = Sch. ThunkID (thunk. id, thunk_ref)
166178
167- # Attach `thunk` within the scheduler
168- state. thunk_dict[thunk. id] = WeakThunk (thunk)
169- #= FIXME :REALLOC=#
170- Sch. reschedule_syncdeps! (state, thunk)
171- empty! (old_fargs) # reschedule_syncdeps! preserves all referenced tasks/chunks
172- @dagdebug thunk :submit " Added to scheduler"
173- if future != = nothing
174- # Ensure we attach a future before the thunk is scheduled
175- Sch. _register_future! (ctx, state, task, tid, (future, thunk_id, false ))
176- @dagdebug thunk :submit " Registered future"
177- end
178- state. valid[thunk] = nothing
179+ @lock state. lock begin
180+ # Attach `thunk` within the scheduler
181+ state. thunk_dict[thunk. id] = WeakThunk (thunk)
182+ #= FIXME :REALLOC=#
183+ Sch. reschedule_syncdeps! (state, thunk)
184+ empty! (old_fargs) # reschedule_syncdeps! preserves all referenced tasks/chunks
185+ @dagdebug thunk :submit " Added to scheduler"
186+ if future != = nothing
187+ # Ensure we attach a future before the thunk is scheduled
188+ Sch. _register_future! (ctx, state, task, tid, (future, thunk_id, false ))
189+ @dagdebug thunk :submit " Registered future"
190+ end
191+ state. valid[thunk] = nothing
179192
180- # Register Eager UID -> Sch TID
181- lock (Sch. EAGER_ID_MAP) do id_map
182- id_map[uid] = thunk. id
183- end
193+ # Register Eager UID -> Sch TID
194+ lock (Sch. EAGER_ID_MAP) do id_map
195+ id_map[uid] = thunk. id
196+ end
184197
185- # Tell the scheduler that it has new tasks to schedule
186- if reschedule
187- put! (state. chan, Sch. RescheduleSignal ())
198+ # Tell the scheduler that it has new tasks to schedule
199+ if reschedule
200+ put! (state. chan, Sch. RescheduleSignal ())
201+ end
188202 end
189203
190204 @maybelog ctx timespan_finish (ctx, :add_thunk , (;thunk_id= id), (;f= fargs[1 ], args= fargs[2 : end ], options, uid))
@@ -236,13 +250,11 @@ function eager_submit!(payload::AnyPayload)
236250 elseif myid () != 1
237251 return remotecall_fetch (1 , payload) do payload
238252 Sch. init_eager ()
239- state = Dagger. Sch. EAGER_STATE[]
240- @lock state. lock eager_submit_internal! (payload)
253+ eager_submit_internal! (payload)
241254 end
242255 else
243256 Sch. init_eager ()
244- state = Dagger. Sch. EAGER_STATE[]
245- return @lock state. lock eager_submit_internal! (payload)
257+ return eager_submit_internal! (payload)
246258 end
247259end
248260
0 commit comments