@@ -256,14 +256,27 @@ function del_clients(pairs::Vector)
256256 end
257257end
258258
259- const any_gc_flag = Condition ()
259+ # The task below is coalescing the `flush_gc_msgs` call
260+ # across multiple producers, see `send_del_client`,
261+ # and `send_add_client`.
262+ # XXX : Is this worth the additional complexity?
263+ # `flush_gc_msgs` has to iterate over all connected workers.
264+ const any_gc_flag = Threads. Condition ()
260265function start_gc_msgs_task ()
261- errormonitor (@async while true
262- wait (any_gc_flag)
263- flush_gc_msgs ()
264- end )
266+ errormonitor (
267+ Threads. @spawn begin
268+ while true
269+ lock (any_gc_flag) do
270+ # this might miss events
271+ wait (any_gc_flag)
272+ end
273+ flush_gc_msgs () # handles throws internally
274+ end
275+ end
276+ )
265277end
266278
279+ # Function can be called within a finalizer
267280function send_del_client (rr)
268281 if rr. where == myid ()
269282 del_client (rr)
@@ -281,11 +294,27 @@ function send_del_client_no_lock(rr)
281294 end
282295end
283296
297+ function publish_del_msg! (w:: Worker , msg)
298+ lock (w. msg_lock) do
299+ push! (w. del_msgs, msg)
300+ @atomic w. gcflag = true
301+ end
302+ lock (any_gc_flag) do
303+ notify (any_gc_flag)
304+ end
305+ end
306+
284307function process_worker (rr)
285308 w = worker_from_id (rr. where):: Worker
286- push! (w. del_msgs, (remoteref_id (rr), myid ()))
287- w. gcflag = true
288- notify (any_gc_flag)
309+ msg = (remoteref_id (rr), myid ())
310+
311+ # Needs to aquire a lock on the del_msg queue
312+ T = Threads. @spawn begin
313+ publish_del_msg! ($ w, $ msg)
314+ end
315+ Base. errormonitor (T)
316+
317+ return
289318end
290319
291320function add_client (id, client)
@@ -310,9 +339,13 @@ function send_add_client(rr::AbstractRemoteRef, i)
310339 # to the processor that owns the remote ref. it will add_client
311340 # itself inside deserialize().
312341 w = worker_from_id (rr. where)
313- push! (w. add_msgs, (remoteref_id (rr), i))
314- w. gcflag = true
315- notify (any_gc_flag)
342+ lock (w. msg_lock) do
343+ push! (w. add_msgs, (remoteref_id (rr), i))
344+ @atomic w. gcflag = true
345+ end
346+ lock (any_gc_flag) do
347+ notify (any_gc_flag)
348+ end
316349 end
317350end
318351
0 commit comments