Skip to content

Commit 65bb3fd

Browse files
committed
Add more Sch debug statements
And some fixes
1 parent 7e30787 commit 65bb3fd

File tree

6 files changed

+32
-8
lines changed

6 files changed

+32
-8
lines changed

src/sch/Sch.jl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,12 @@ end
555555
@goto fire_tasks
556556
end
557557
task = popfirst!(state.ready)
558+
@dagdebug task :schedule "Scheduling task"
558559
@maybelog ctx timespan_start(ctx, :schedule, (;uid=state.uid, thunk_id=task.id), (;thunk_id=task.id))
559560
if has_result(state, task)
560561
if haskey(state.errored, task)
561562
# An error was eagerly propagated to this task
563+
@dagdebug task :schedule "Task received upstream error, finishing"
562564
finish_failed!(state, task)
563565
else
564566
# This shouldn't have happened
@@ -663,9 +665,11 @@ end
663665
end
664666
end
665667
end
668+
666669
ex = SchedulingException("No processors available, try widening scope")
667670
store_result!(state, task, ex; error=true)
668671
set_failed!(state, task)
672+
@dagdebug task :schedule "No processors available, skipping"
669673
sorted_procs_cleanup()
670674
costs_cleanup()
671675
@goto pop_task
@@ -732,6 +736,7 @@ function remove_dead_proc!(ctx, state, proc, options)
732736
end
733737

734738
function finish_task!(ctx, state, node, thunk_failed)
739+
@dagdebug node :finish "Finishing with $(thunk_failed ? "error" : "result")"
735740
pop!(state.running, node)
736741
delete!(state.running_on, node)
737742
if thunk_failed
@@ -1224,7 +1229,7 @@ function (dts::DoTaskSpec)()
12241229
if unwrap_nested_exception(err) isa InvalidStateException || !isopen(return_queue)
12251230
@dagdebug tid :execute "Return queue is closed, failing to put result" chan=return_queue exception=(err, catch_backtrace())
12261231
else
1227-
rethrow(err)
1232+
rethrow()
12281233
end
12291234
end
12301235
return

src/sch/util.jl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ end
9393
function fill_registered_futures!(state, thunk, failed)
9494
if haskey(state.futures, thunk)
9595
# Notify any listening thunks
96+
@dagdebug thunk :finish "Notifying $(length(state.futures[thunk])) futures"
9697
for future in state.futures[thunk]
9798
put!(future, load_result(state, thunk); error=failed)
9899
end
@@ -135,14 +136,20 @@ end
135136

136137
"Schedules any dependents that may be ready to execute."
137138
function schedule_dependents!(state, thunk, failed)
139+
@dagdebug thunk :finish "Checking dependents"
138140
if !haskey(state.waiting_data, thunk) || isempty(state.waiting_data[thunk])
139141
return
140142
end
143+
ctr = 0
141144
for dep in state.waiting_data[thunk]
145+
@dagdebug dep :schedule "Checking dependent"
142146
dep_isready = false
143147
if haskey(state.waiting, dep)
144148
set = state.waiting[dep]
145149
thunk in set && pop!(set, thunk)
150+
if length(set) > 0
151+
@dagdebug dep :schedule "Dependent has $(length(set)) upstreams"
152+
end
146153
dep_isready = isempty(set)
147154
if dep_isready
148155
delete!(state.waiting, dep)
@@ -151,11 +158,17 @@ function schedule_dependents!(state, thunk, failed)
151158
dep_isready = true
152159
end
153160
if dep_isready
161+
ctr += 1
154162
if !failed
155163
push!(state.ready, dep)
164+
@dagdebug dep :schedule "Dependent is now ready"
165+
else
166+
set_failed!(state, thunk, dep)
167+
@dagdebug dep :schedule "Dependent has transitively failed"
156168
end
157169
end
158170
end
171+
@dagdebug thunk :finish "Marked $ctr dependents as $(failed ? "failed" : "ready")"
159172
end
160173

161174
"""
@@ -226,6 +239,8 @@ const RESCHEDULE_SYNCDEPS_SEEN_CACHE = TaskLocalValue{ReusableCache{Set{Thunk},N
226239
"Marks `thunk` and all dependent thunks as failed."
227240
function set_failed!(state, origin, thunk=origin)
228241
@assert islocked(state.lock)
242+
has_result(state, thunk) && return
243+
@dagdebug thunk :finish "Setting as failed"
229244
filter!(x -> x !== thunk, state.ready)
230245
# N.B. If origin === thunk, we assume that the caller has already set the error
231246
if origin !== thunk
@@ -554,6 +569,7 @@ end
554569
end
555570
end
556571

572+
# Estimate total cost for executing this task on each candidate processor
557573
for proc in procs
558574
gproc = get_parent(proc)
559575
chunks_filt = Iterators.filter(c->get_parent(processor(c)) != gproc, chunks)

src/submission.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ const UID_TO_TID_CACHE = TaskLocalValue{ReusableCache{Dict{UInt64,Int},Nothing}}
185185
#=FIXME:REALLOC=#
186186
Sch.reschedule_syncdeps!(state, thunk)
187187
old_fargs_cleanup() # reschedule_syncdeps! preserves all referenced tasks/chunks
188-
@dagdebug thunk :submit "Added to scheduler"
188+
n_upstreams = haskey(state.waiting, thunk) ? length(state.waiting[thunk]) : 0
189+
@dagdebug thunk :submit "Added to scheduler with $n_upstreams upstreams"
189190
if future !== nothing
190191
# Ensure we attach a future before the thunk is scheduled
191192
Sch._register_future!(ctx, state, task, tid, (future, thunk_id, false))

src/utils/dagdebug.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ function istask end
22
function task_id end
33

44
const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope,
5-
:take, :execute, :move, :processor, :cancel,
6-
:stream]
5+
:take, :execute, :move, :processor, :finish,
6+
:cancel, :stream]
77
macro dagdebug(thunk, category, msg, args...)
88
cat_sym = category.value
99
@gensym id

src/utils/scopes.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ compatible with the given scope.
2626
"""
2727
compatible_processors(scope::AbstractScope=get_compute_scope(), ctx::Context=Sch.eager_context()) =
2828
compatible_processors(scope, procs(ctx))
29-
function compatible_processors(scope::AbstractScope, procs::Vector{Processor})
29+
function compatible_processors(scope::AbstractScope, procs::Vector{<:Processor})
3030
compat_procs = Set{Processor}()
3131
for gproc in procs
3232
# Fast-path in case entire process is incompatible

test/util.jl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ end
1414
replace_obj!(ex::Symbol, obj) = Expr(:(.), obj, QuoteNode(ex))
1515
replace_obj!(ex, obj) = ex
1616
function _test_throws_unwrap(terr, ex; to_match=[])
17-
@gensym oerr rerr
17+
@gensym oerr rerr bt
1818
match_expr = Expr(:block)
1919
for m in to_match
2020
if m.head == :(=)
@@ -35,9 +35,11 @@ function _test_throws_unwrap(terr, ex; to_match=[])
3535
end
3636
end
3737
quote
38+
$bt = nothing
3839
$oerr, $rerr = try
3940
nothing, $(esc(ex))
4041
catch err
42+
$bt = catch_backtrace()
4143
(err, Dagger.Sch.unwrap_nested_exception(err))
4244
end
4345
if $terr isa Tuple
@@ -48,7 +50,7 @@ function _test_throws_unwrap(terr, ex; to_match=[])
4850
else
4951
println("Full error:")
5052
Base.showerror(stdout, $oerr)
51-
Base.show_backtrace(stdout, backtrace())
53+
Base.show_backtrace(stdout, $bt)
5254
end
5355
else
5456
@test $rerr isa $terr
@@ -57,7 +59,7 @@ function _test_throws_unwrap(terr, ex; to_match=[])
5759
else
5860
println("Full error:")
5961
Base.showerror(stdout, $oerr)
60-
Base.show_backtrace(stdout, backtrace())
62+
Base.show_backtrace(stdout, $bt)
6163
end
6264
end
6365
end

0 commit comments

Comments
 (0)