Skip to content

Commit 694dd49

Browse files
committed
errormonitor_tracked: Add task name
1 parent 0c123a8 commit 694dd49

File tree

5 files changed

+19
-14
lines changed

5 files changed

+19
-14
lines changed

src/precompile.jl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
t1 = @spawn 1+1
77
t2 = spawn(+, 1, t1)
88
fetch(t2)
9+
10+
# Shutdown scheduler and clean up
911
spawn() do
1012
Sch.halt!(sch_handle())
1113
end
@@ -16,12 +18,15 @@
1618
GC.gc()
1719
yield()
1820
lock(Sch.ERRORMONITOR_TRACKED) do tracked
19-
if all(t->istaskdone(t) || istaskfailed(t), tracked)
21+
if all(t->istaskdone(t) || istaskfailed(t), map(last, tracked))
2022
empty!(tracked)
2123
return
2224
end
23-
for t in tracked
24-
Base.throwto(t, InterruptException())
25+
for (name, t) in tracked
26+
@warn "Waiting on $name"
27+
if t.state == :runnable
28+
Base.throwto(t, InterruptException())
29+
end
2530
end
2631
end
2732
MemPool.exit_hook()

src/sch/Sch.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ function init_proc(state, p, log_sink)
366366
end
367367
end
368368
end
369-
errormonitor_tracked(t)
369+
errormonitor_tracked("worker monitor $wid", t)
370370
WORKER_MONITOR_TASKS[wid] = t
371371
WORKER_MONITOR_CHANS[wid] = Dict{UInt64,RemoteChannel}()
372372
end
@@ -1290,7 +1290,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12901290
else
12911291
t.sticky = false
12921292
end
1293-
tasks[thunk_id] = errormonitor_tracked(schedule(t))
1293+
tasks[thunk_id] = errormonitor_tracked("thunk $thunk_id", schedule(t))
12941294
proc_occupancy[] += task_occupancy
12951295
time_pressure[] += time_util
12961296
end
@@ -1302,7 +1302,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13021302
else
13031303
proc_run_task.sticky = false
13041304
end
1305-
return errormonitor_tracked(schedule(proc_run_task))
1305+
return errormonitor_tracked("processor $to_proc", schedule(proc_run_task))
13061306
end
13071307

13081308
"""

src/sch/dynamic.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ function dynamic_listener!(ctx, state, wid)
8787
end
8888
end
8989
end
90-
errormonitor_tracked(listener_task)
91-
errormonitor_tracked(@async begin
90+
errormonitor_tracked("dynamic_listener! $wid", listener_task)
91+
errormonitor_tracked("dynamic_listener! (halt+throw) $wid", @async begin
9292
wait(state.halt)
9393
# TODO: Not sure why we need the @async here, but otherwise we
9494
# don't stop all the listener tasks

src/sch/eager.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ function init_eager()
2020
return
2121
end
2222
ctx = eager_context()
23-
errormonitor_tracked(Threads.@spawn try
23+
errormonitor_tracked("eager compute()", Threads.@spawn try
2424
sopts = SchedulerOptions(;allow_errors=true)
2525
opts = Dagger.Options((;scope=Dagger.ExactScope(Dagger.ThreadProc(1, 1)),
2626
occupancy=Dict(Dagger.ThreadProc=>0)))
@@ -101,7 +101,7 @@ function thunk_yield(f)
101101
end
102102

103103
eager_cleanup(t::Dagger.EagerThunkFinalizer) =
104-
errormonitor_tracked(Threads.@spawn eager_cleanup(EAGER_STATE[], t.uid))
104+
errormonitor_tracked("eager_cleanup $(t.uid)", Threads.@spawn eager_cleanup(EAGER_STATE[], t.uid))
105105
function eager_cleanup(state, uid)
106106
tid = nothing
107107
lock(EAGER_ID_MAP) do id_map

src/sch/util.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
"Like `errormonitor`, but tracks how many outstanding tasks are running."
2-
function errormonitor_tracked(t::Task)
2+
function errormonitor_tracked(name::String, t::Task)
33
errormonitor(t)
44
@safe_lock_spin1 ERRORMONITOR_TRACKED tracked begin
5-
push!(tracked, t)
5+
push!(tracked, name => t)
66
end
77
errormonitor(Threads.@spawn begin
88
try
99
wait(t)
1010
finally
1111
lock(ERRORMONITOR_TRACKED) do tracked
12-
idx = findfirst(o->o===t, tracked)
12+
idx = findfirst(o->o[2]===t, tracked)
1313
# N.B. This may be nothing if precompile emptied these
1414
if idx !== nothing
1515
deleteat!(tracked, idx)
@@ -18,7 +18,7 @@ function errormonitor_tracked(t::Task)
1818
end
1919
end)
2020
end
21-
const ERRORMONITOR_TRACKED = LockedObject(Task[])
21+
const ERRORMONITOR_TRACKED = LockedObject(Pair{String,Task}[])
2222

2323
"""
2424
unwrap_nested_exception(err::Exception) -> Bool

0 commit comments

Comments
 (0)