@@ -318,7 +318,7 @@ const WORKER_MONITOR_TASKS = Dict{Int,Task}()
318318const WORKER_MONITOR_CHANS = Dict {Int,Dict{UInt64,RemoteChannel}} ()
319319function init_proc (state, p, log_sink)
320320 ctx = Context (Int[]; log_sink)
321- timespan_start (ctx, :init_proc , (;worker= p. pid), nothing )
321+ timespan_start (ctx, :init_proc , (;uid = state . uid, worker= p. pid), nothing )
322322 # Initialize pressure and capacity
323323 gproc = OSProc (p. pid)
324324 lock (state. lock) do
@@ -383,7 +383,7 @@ function init_proc(state, p, log_sink)
383383 # Setup dynamic listener
384384 dynamic_listener! (ctx, state, p. pid)
385385
386- timespan_finish (ctx, :init_proc , (;worker= p. pid), nothing )
386+ timespan_finish (ctx, :init_proc , (;uid = state . uid, worker= p. pid), nothing )
387387end
388388function _cleanup_proc (uid, log_sink)
389389 empty! (CHUNK_CACHE) # FIXME : Should be keyed on uid!
399399function cleanup_proc (state, p, log_sink)
400400 ctx = Context (Int[]; log_sink)
401401 wid = p. pid
402- timespan_start (ctx, :cleanup_proc , (;worker= wid), nothing )
402+ timespan_start (ctx, :cleanup_proc , (;uid = state . uid, worker= wid), nothing )
403403 lock (WORKER_MONITOR_LOCK) do
404404 if haskey (WORKER_MONITOR_CHANS, wid)
405405 delete! (WORKER_MONITOR_CHANS[wid], state. uid)
@@ -419,7 +419,7 @@ function cleanup_proc(state, p, log_sink)
419419 end
420420 end
421421
422- timespan_finish (ctx, :cleanup_proc , (;worker= wid), nothing )
422+ timespan_finish (ctx, :cleanup_proc , (;uid = state . uid, worker= wid), nothing )
423423end
424424
425425" Process-local condition variable (and lock) indicating task completion."
@@ -467,24 +467,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
467467
468468 master = OSProc (myid ())
469469
470- timespan_start (ctx, :scheduler_init , nothing , master)
470+ timespan_start (ctx, :scheduler_init , (;uid = state . uid) , master)
471471 try
472472 scheduler_init (ctx, state, d, options, deps)
473473 finally
474- timespan_finish (ctx, :scheduler_init , nothing , master)
474+ timespan_finish (ctx, :scheduler_init , (;uid = state . uid) , master)
475475 end
476476
477477 value, errored = try
478478 scheduler_run (ctx, state, d, options)
479479 finally
480480 # Always try to tear down the scheduler
481- timespan_start (ctx, :scheduler_exit , nothing , master)
481+ timespan_start (ctx, :scheduler_exit , (;uid = state . uid) , master)
482482 try
483483 scheduler_exit (ctx, state, options)
484484 catch err
485485 @error " Error when tearing down scheduler" exception= (err,catch_backtrace ())
486486 finally
487- timespan_finish (ctx, :scheduler_exit , nothing , master)
487+ timespan_finish (ctx, :scheduler_exit , (;uid = state . uid) , master)
488488 end
489489 end
490490
@@ -545,10 +545,10 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
545545 check_integrity (ctx)
546546
547547 isempty (state. running) && continue
548- timespan_start (ctx, :take , nothing , nothing )
548+ timespan_start (ctx, :take , (;uid = state . uid) , nothing )
549549 @dagdebug nothing :take " Waiting for results"
550550 chan_value = take! (state. chan) # get result of completed thunk
551- timespan_finish (ctx, :take , nothing , nothing )
551+ timespan_finish (ctx, :take , (;uid = state . uid) , nothing )
552552 if chan_value isa RescheduleSignal
553553 continue
554554 end
@@ -563,13 +563,13 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
563563 @warn " Worker $(pid) died, rescheduling work"
564564
565565 # Remove dead worker from procs list
566- timespan_start (ctx, :remove_procs , (;worker= pid), nothing )
566+ timespan_start (ctx, :remove_procs , (;uid = state . uid, worker= pid), nothing )
567567 remove_dead_proc! (ctx, state, gproc)
568- timespan_finish (ctx, :remove_procs , (;worker= pid), nothing )
568+ timespan_finish (ctx, :remove_procs , (;uid = state . uid, worker= pid), nothing )
569569
570- timespan_start (ctx, :handle_fault , (;worker= pid), nothing )
570+ timespan_start (ctx, :handle_fault , (;uid = state . uid, worker= pid), nothing )
571571 handle_fault (ctx, state, gproc)
572- timespan_finish (ctx, :handle_fault , (;worker= pid), nothing )
572+ timespan_finish (ctx, :handle_fault , (;uid = state . uid, worker= pid), nothing )
573573 return # effectively `continue`
574574 else
575575 if something (ctx. options. allow_errors, false ) ||
@@ -604,9 +604,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
604604 end
605605 end
606606
607- timespan_start (ctx, :finish , (;thunk_id), (;thunk_id, result= res))
607+ timespan_start (ctx, :finish , (;uid = state . uid, thunk_id), (;thunk_id, result= res))
608608 finish_task! (ctx, state, node, thunk_failed)
609- timespan_finish (ctx, :finish , (;thunk_id), (;thunk_id, result= res))
609+ timespan_finish (ctx, :finish , (;uid = state . uid, thunk_id), (;thunk_id, result= res))
610610
611611 delete_unused_tasks! (state)
612612 end
@@ -691,13 +691,13 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
691691 task = nothing
692692 @label pop_task
693693 if task != = nothing
694- timespan_finish (ctx, :schedule , (;thunk_id= task. id), (;thunk_id= task. id))
694+ timespan_finish (ctx, :schedule , (;uid = state . uid, thunk_id= task. id), (;thunk_id= task. id))
695695 end
696696 if isempty (state. ready)
697697 @goto fire_tasks
698698 end
699699 task = pop! (state. ready)
700- timespan_start (ctx, :schedule , (;thunk_id= task. id), (;thunk_id= task. id))
700+ timespan_start (ctx, :schedule , (;uid = state . uid, thunk_id= task. id), (;thunk_id= task. id))
701701 if haskey (state. cache, task)
702702 if haskey (state. errored, task)
703703 # An error was eagerly propagated to this task
@@ -887,7 +887,7 @@ function monitor_procs_changed!(ctx, state)
887887 wait (ctx. proc_notify)
888888 end
889889
890- timespan_start (ctx, :assign_procs , nothing , nothing )
890+ timespan_start (ctx, :assign_procs , (;uid = state . uid) , nothing )
891891
892892 # Load new set of procs
893893 new_ps = procs_to_use (ctx)
@@ -915,7 +915,7 @@ function monitor_procs_changed!(ctx, state)
915915 end
916916 end
917917
918- timespan_finish (ctx, :assign_procs , nothing , nothing )
918+ timespan_finish (ctx, :assign_procs , (;uid = state . uid) , nothing )
919919 old_ps = new_ps
920920 end
921921end
@@ -1085,16 +1085,17 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10851085 # know which task failed.
10861086 tasks = Task[]
10871087 for ts in to_send
1088+ # TODO : errormonitor
10881089 task = Threads. @spawn begin
1089- timespan_start (ctx, :fire , (;worker= gproc. pid), nothing )
1090+ timespan_start (ctx, :fire , (;uid = state . uid, worker= gproc. pid), nothing )
10901091 try
10911092 remotecall_wait (do_tasks, gproc. pid, proc, state. chan, [ts]);
10921093 catch err
10931094 bt = catch_backtrace ()
10941095 thunk_id = ts[1 ]
10951096 put! (state. chan, (gproc. pid, proc, thunk_id, (CapturedException (err, bt), nothing )))
10961097 finally
1097- timespan_finish (ctx, :fire , (;worker= gproc. pid), nothing )
1098+ timespan_finish (ctx, :fire , (;uid = state . uid, worker= gproc. pid), nothing )
10981099 end
10991100 end
11001101 end
@@ -1212,6 +1213,7 @@ proc_has_occupancy(proc_occupancy, task_occupancy) =
12121213function start_processor_runner! (istate:: ProcessorInternalState , uid:: UInt64 , return_queue:: RemoteChannel )
12131214 to_proc = istate. proc
12141215 proc_run_task = @task begin
1216+ # FIXME : Context changes aren't noticed over time
12151217 ctx = istate. ctx
12161218 tasks = istate. tasks
12171219 proc_occupancy = istate. proc_occupancy
@@ -1223,20 +1225,20 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12231225 # Wait for new tasks
12241226 if ! work_to_do
12251227 @dagdebug nothing :processor " Waiting for tasks"
1226- timespan_start (ctx, :proc_run_wait , (;worker= wid, processor= to_proc), nothing )
1228+ timespan_start (ctx, :proc_run_wait , (;uid, worker= wid, processor= to_proc), nothing )
12271229 wait (istate. reschedule)
12281230 @static if VERSION >= v " 1.9"
12291231 reset (istate. reschedule)
12301232 end
1231- timespan_finish (ctx, :proc_run_wait , (;worker= wid, processor= to_proc), nothing )
1233+ timespan_finish (ctx, :proc_run_wait , (;uid, worker= wid, processor= to_proc), nothing )
12321234 if istate. done[]
12331235 return
12341236 end
12351237 end
12361238
12371239 # Fetch a new task to execute
12381240 @dagdebug nothing :processor " Trying to dequeue"
1239- timespan_start (ctx, :proc_run_fetch , (;worker= wid, processor= to_proc), nothing )
1241+ timespan_start (ctx, :proc_run_fetch , (;uid, worker= wid, processor= to_proc), nothing )
12401242 work_to_do = false
12411243 task_and_occupancy = lock (istate. queue) do queue
12421244 # Only steal if there are multiple queued tasks, to prevent
@@ -1255,7 +1257,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12551257 return queue_result
12561258 end
12571259 if task_and_occupancy === nothing
1258- timespan_finish (ctx, :proc_run_fetch , (;worker= wid, processor= to_proc), nothing )
1260+ timespan_finish (ctx, :proc_run_fetch , (;uid, worker= wid, processor= to_proc), nothing )
12591261
12601262 @dagdebug nothing :processor " Failed to dequeue"
12611263
@@ -1270,7 +1272,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12701272 @dagdebug nothing :processor " Trying to steal"
12711273
12721274 # Try to steal a task
1273- timespan_start (ctx, :steal_local , (;worker= wid, processor= to_proc), nothing )
1275+ timespan_start (ctx, :proc_steal_local , (;uid, worker= wid, processor= to_proc), nothing )
12741276
12751277 # Try to steal from local queues randomly
12761278 # TODO : Prioritize stealing from busiest processors
@@ -1305,12 +1307,12 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13051307 from_proc = other_istate. proc
13061308 thunk_id = task[1 ]
13071309 @dagdebug thunk_id :processor " Stolen from $from_proc by $to_proc "
1308- timespan_finish (ctx, :steal_local , (;worker= wid, processor= to_proc), (;from_proc, thunk_id))
1310+ timespan_finish (ctx, :proc_steal_local , (;uid, worker= wid, processor= to_proc), (;from_proc, thunk_id))
13091311 # TODO : Keep stealing until we hit full occupancy?
13101312 @goto execute
13111313 end
13121314 end
1313- timespan_finish (ctx, :steal_local , (;worker= wid, processor= to_proc), nothing )
1315+ timespan_finish (ctx, :proc_steal_local , (;uid, worker= wid, processor= to_proc), nothing )
13141316
13151317 # TODO : Try to steal from remote queues
13161318
@@ -1322,7 +1324,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13221324 task = task_spec[]
13231325 thunk_id = task[1 ]
13241326 time_util = task[2 ]
1325- timespan_finish (ctx, :proc_run_fetch , (;worker= wid, processor= to_proc), (;thunk_id, proc_occupancy= proc_occupancy[], task_occupancy))
1327+ timespan_finish (ctx, :proc_run_fetch , (;uid, worker= wid, processor= to_proc), (;thunk_id, proc_occupancy= proc_occupancy[], task_occupancy))
13261328 @dagdebug thunk_id :processor " Dequeued task"
13271329
13281330 # Execute the task and return its result
@@ -1423,7 +1425,7 @@ function do_tasks(to_proc, return_queue, tasks)
14231425 for task in tasks
14241426 thunk_id = task[1 ]
14251427 occupancy = task[4 ]
1426- timespan_start (ctx, :enqueue , (;processor= to_proc, thunk_id), nothing )
1428+ timespan_start (ctx, :enqueue , (;uid, processor= to_proc, thunk_id), nothing )
14271429 should_launch = lock (TASK_SYNC) do
14281430 # Already running; don't try to re-launch
14291431 if ! (thunk_id in TASKS_RUNNING)
@@ -1435,7 +1437,7 @@ function do_tasks(to_proc, return_queue, tasks)
14351437 end
14361438 should_launch || continue
14371439 enqueue! (queue, TaskSpecKey (task), occupancy)
1438- timespan_finish (ctx, :enqueue , (;processor= to_proc, thunk_id), nothing )
1440+ timespan_finish (ctx, :enqueue , (;uid, processor= to_proc, thunk_id), nothing )
14391441 @dagdebug thunk_id :processor " Enqueued task"
14401442 end
14411443 end
0 commit comments