@@ -339,30 +339,39 @@ function init_proc(state, p, log_sink)
339339
340340 state. worker_loadavg[p. pid] = (0.0 , 0.0 , 0.0 )
341341 end
342- lock (WORKER_MONITOR_LOCK) do
343- wid = p. pid
344- if ! haskey (WORKER_MONITOR_TASKS, wid)
345- t = @async begin
346- try
347- # Wait until this connection is terminated
348- remotecall_fetch (sleep, wid, typemax (UInt64))
349- catch err
350- if err isa ProcessExitedException
342+ if p. pid != 1
343+ lock (WORKER_MONITOR_LOCK) do
344+ wid = p. pid
345+ if ! haskey (WORKER_MONITOR_TASKS, wid)
346+ t = @async begin
347+ try
348+ # Wait until this connection is terminated
349+ remotecall_fetch (sleep, wid, typemax (UInt64))
350+ catch err
351+ # TODO : Report other kinds of errors? IOError, etc.
352+ # if !(err isa ProcessExitedException)
353+ # end
354+ finally
351355 lock (WORKER_MONITOR_LOCK) do
352356 d = WORKER_MONITOR_CHANS[wid]
353357 for uid in keys (d)
354- put! (d[uid], (wid, OSProc (wid), nothing , (ProcessExitedException (wid), nothing )))
358+ try
359+ put! (d[uid], (wid, OSProc (wid), nothing , (ProcessExitedException (wid), nothing )))
360+ catch
361+ end
355362 end
356363 empty! (d)
357364 delete! (WORKER_MONITOR_CHANS, wid)
365+ delete! (WORKER_MONITOR_TASKS, wid)
358366 end
359367 end
360368 end
369+ errormonitor_tracked (t)
370+ WORKER_MONITOR_TASKS[wid] = t
371+ WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
361372 end
362- WORKER_MONITOR_TASKS[wid] = t
363- WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
373+ WORKER_MONITOR_CHANS[wid][state. uid] = state. chan
364374 end
365- WORKER_MONITOR_CHANS[wid][state. uid] = state. chan
366375 end
367376
368377 # Setup worker-to-scheduler channels
@@ -379,18 +388,26 @@ function init_proc(state, p, log_sink)
379388end
380389function _cleanup_proc (uid, log_sink)
381390 empty! (CHUNK_CACHE) # FIXME : Should be keyed on uid!
391+ proc_states (uid) do states
392+ for (proc, state) in states
393+ istate = state. state
394+ istate. done[] = true
395+ notify (istate. reschedule)
396+ end
397+ empty! (states)
398+ end
382399end
383400function cleanup_proc (state, p, log_sink)
384401 ctx = Context (Int[]; log_sink)
385- timespan_start (ctx, :cleanup_proc , p. pid, 0 )
402+ wid = p. pid
403+ timespan_start (ctx, :cleanup_proc , wid, 0 )
386404 lock (WORKER_MONITOR_LOCK) do
387- wid = p. pid
388405 if haskey (WORKER_MONITOR_CHANS, wid)
389406 delete! (WORKER_MONITOR_CHANS[wid], state. uid)
390- remote_do (_cleanup_proc, wid, state. uid, log_sink)
391407 end
392408 end
393- timespan_finish (ctx, :cleanup_proc , p. pid, 0 )
409+ remote_do (_cleanup_proc, wid, state. uid, log_sink)
410+ timespan_finish (ctx, :cleanup_proc , wid, 0 )
394411end
395412
396413" Process-local condition variable (and lock) indicating task completion."
@@ -1096,6 +1113,7 @@ struct ProcessorInternalState
10961113 tasks:: Dict{Int,Task}
10971114 proc_occupancy:: Base.RefValue{UInt32}
10981115 time_pressure:: Base.RefValue{UInt64}
1116+ done:: Base.RefValue{Bool}
10991117end
11001118struct ProcessorState
11011119 state:: ProcessorInternalState
@@ -1144,6 +1162,9 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
11441162 reset (istate. reschedule)
11451163 end
11461164 timespan_finish (ctx, :proc_run_wait , to_proc, nothing )
1165+ if istate. done[]
1166+ return
1167+ end
11471168 end
11481169
11491170 # Fetch a new task to execute
@@ -1270,7 +1291,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12701291 else
12711292 t. sticky = false
12721293 end
1273- tasks[thunk_id] = errormonitor (schedule (t))
1294+ tasks[thunk_id] = errormonitor_tracked (schedule (t))
12741295 proc_occupancy[] += task_occupancy
12751296 time_pressure[] += time_util
12761297 end
@@ -1283,7 +1304,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12831304 else
12841305 proc_run_task. sticky = false
12851306 end
1286- return errormonitor (schedule (proc_run_task))
1307+ return errormonitor_tracked (schedule (proc_run_task))
12871308end
12881309
12891310"""
@@ -1307,7 +1328,8 @@ function do_tasks(to_proc, return_queue, tasks)
13071328 istate = ProcessorInternalState (ctx, to_proc,
13081329 queue_locked, reschedule,
13091330 Dict {Int,Task} (),
1310- Ref (UInt32 (0 )), Ref (UInt64 (0 )))
1331+ Ref (UInt32 (0 )), Ref (UInt64 (0 )),
1332+ Ref (false ))
13111333 runner = start_processor_runner! (istate, uid, return_queue)
13121334 @static if VERSION < v " 1.9"
13131335 reschedule. waiter = runner
0 commit comments