Skip to content

Commit 2d6179b

Browse files
committed
Sch: Improve behavior on internal error
1 parent 0d00958 commit 2d6179b

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

src/sch/Sch.jl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,12 +575,23 @@ end
575575
function scheduler_exit(ctx, state::ComputeState, options)
576576
@dagdebug nothing :global "Tearing down scheduler" uid=state.uid
577577

578-
close(state.chan)
579-
notify(state.halt)
580578
@sync for p in procs_to_use(ctx)
581579
@async cleanup_proc(state, p, ctx.log_sink)
582580
end
583581

582+
lock(state.lock) do
583+
close(state.chan)
584+
notify(state.halt)
585+
586+
# Notify any waiting tasks
587+
for (_, futures) in state.futures
588+
for future in futures
589+
put!(future, SchedulingException("Scheduler exited"); error=true)
590+
end
591+
end
592+
empty!(state.futures)
593+
end
594+
584595
# Let the context procs handler clean itself up
585596
lock(ctx.proc_notify) do
586597
notify(ctx.proc_notify)

src/sch/eager.jl

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const EAGER_INIT = Ref{Bool}(false)
2-
const EAGER_THUNK_CHAN = Channel(typemax(Int))
2+
const EAGER_THUNK_CHAN = Ref{Channel{Any}}()
33
const EAGER_FORCE_KILL = Ref{Bool}(false)
44
const EAGER_ID_MAP = Dict{UInt64,Int}()
55
const EAGER_CONTEXT = Ref{Union{Context,Nothing}}(nothing)
@@ -15,15 +15,11 @@ end
1515
function init_eager()
1616
EAGER_INIT[] && return
1717
EAGER_INIT[] = true
18+
EAGER_THUNK_CHAN[] = Channel(typemax(Int))
1819
ctx = eager_context()
1920
@async try
2021
sopts = SchedulerOptions(;allow_errors=true)
21-
scope = Dagger.ExactScope(Dagger.ThreadProc(1, 1))
22-
atexit() do
23-
EAGER_FORCE_KILL[] = true
24-
close(EAGER_THUNK_CHAN)
25-
end
26-
opts = Dagger.Options((;scope,
22+
opts = Dagger.Options((;scope=Dagger.ExactScope(Dagger.ThreadProc(1, 1)),
2723
occupancy=Dict(Dagger.ThreadProc=>0)))
2824
Dagger.compute(ctx, Dagger.delayed(eager_thunk, opts)();
2925
options=sopts)
@@ -37,6 +33,8 @@ function init_eager()
3733
write(stderr, iob)
3834
finally
3935
EAGER_INIT[] = false
36+
EAGER_FORCE_KILL[] = true
37+
close(EAGER_THUNK_CHAN[])
4038
end
4139
end
4240

@@ -92,9 +90,10 @@ function eager_thunk()
9290
nothing
9391
end
9492
tls = Dagger.get_tls()
95-
while isopen(EAGER_THUNK_CHAN)
93+
chan = EAGER_THUNK_CHAN[]
94+
while isopen(chan)
9695
try
97-
added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN)
96+
added_future, future, uid, ref, f, args, opts = take!(chan)
9897
# preserve inputs until they enter the scheduler
9998
tid = GC.@preserve args begin
10099
_args = map(args) do pos_x

src/thunk.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ function _spawn(f, options::Options, args...)
313313
finalizer_ref = poolset(EagerThunkFinalizer(uid); device=MemPool.CPURAMDevice())
314314
added_future = Future()
315315
propagates = keys(options.options)
316-
put!(Dagger.Sch.EAGER_THUNK_CHAN, (added_future, future, uid, finalizer_ref, f, (args...,), (;propagates, options.options...,)))
316+
put!(Dagger.Sch.EAGER_THUNK_CHAN[], (added_future, future, uid, finalizer_ref, f, (args...,), (;propagates, options.options...,)))
317317
thunk_ref = fetch(added_future)
318318
return (uid, future, finalizer_ref, thunk_ref)
319319
end

0 commit comments

Comments
 (0)