Skip to content

Commit 57cbaf0

Browse files
committed
Sch: Add at-dagdebug helper macro
1 parent bcf3f32 commit 57cbaf0

File tree

3 files changed

+58
-7
lines changed

3 files changed

+58
-7
lines changed

src/sch/Sch.jl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,8 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
467467
end
468468

469469
function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
470+
@dagdebug nothing :global "Initializing scheduler" uid=state.uid
471+
470472
safepoint(state)
471473

472474
# Loop while we still have thunks to execute
@@ -480,12 +482,14 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
480482

481483
isempty(state.running) && continue
482484
timespan_start(ctx, :take, 0, 0)
485+
@dagdebug nothing :take "Waiting for results"
483486
chan_value = take!(state.chan) # get result of completed thunk
484487
timespan_finish(ctx, :take, 0, 0)
485488
if chan_value isa RescheduleSignal
486489
continue
487490
end
488491
pid, proc, thunk_id, (res, metadata) = chan_value
492+
@dagdebug thunk_id :take "Got finished task"
489493
gproc = OSProc(pid)
490494
safepoint(state)
491495
lock(state.lock) do
@@ -559,6 +563,8 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
559563
return value, errored
560564
end
561565
function scheduler_exit(ctx, state::ComputeState, options)
566+
@dagdebug nothing :global "Tearing down scheduler" uid=state.uid
567+
562568
close(state.chan)
563569
notify(state.halt)
564570
@sync for p in procs_to_use(ctx)
@@ -697,6 +703,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
697703
# FIXME: est_time_util = est_time_util isa MaxUtilization ? cap : est_time_util
698704
push!(get!(()->Vector{Tuple{Thunk,<:Any,<:Any}}(), to_fire, (gproc, proc)), (task, est_time_util, est_alloc_util))
699705
state.worker_time_pressure[gproc.pid][proc] += est_time_util
706+
@dagdebug task :schedule "Scheduling to $gproc -> $proc"
700707
@goto pop_task
701708
end
702709
end
@@ -1066,6 +1073,8 @@ function do_task(to_proc, comm)
10661073
end
10671074
timespan_finish(ctx, :storage_wait, thunk_id, (;f, to_proc, device=typeof(to_storage)))
10681075

1076+
@dagdebug thunk_id :execute "Moving data"
1077+
10691078
# Initiate data transfers for function and arguments
10701079
transfer_time = Threads.Atomic{UInt64}(0)
10711080
transfer_size = Threads.Atomic{UInt64}(0)
@@ -1146,6 +1155,8 @@ function do_task(to_proc, comm)
11461155
# FIXME
11471156
#gcnum_start = Base.gc_num()
11481157

1158+
@dagdebug thunk_id :execute "Executing"
1159+
11491160
result_meta = try
11501161
# Set TLS variables
11511162
Dagger.set_tls!((
@@ -1180,6 +1191,7 @@ function do_task(to_proc, comm)
11801191
bt = catch_backtrace()
11811192
RemoteException(myid(), CapturedException(ex, bt))
11821193
end
1194+
11831195
threadtime = cputhreadtime() - threadtime_start
11841196
# FIXME: This is not a realistic measure of max. required memory
11851197
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))
@@ -1189,6 +1201,9 @@ function do_task(to_proc, comm)
11891201
pop!(TASKS_RUNNING, thunk_id)
11901202
notify(TASK_SYNC)
11911203
end
1204+
1205+
@dagdebug thunk_id :execute "Returning"
1206+
11921207
# TODO: debug_storage("Releasing $to_storage_name")
11931208
metadata = (
11941209
time_pressure=real_time_util[],

src/sch/dynamic.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,11 @@ function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future, ref))
207207
thunk_id = ThunkID(thunk.id, thunk_ref)
208208
state.thunk_dict[thunk.id] = WeakThunk(thunk)
209209
reschedule_inputs!(state, thunk)
210+
@dagdebug thunk :submit "Added to scheduler"
210211
if future !== nothing
211212
# Ensure we attach a future before the thunk is scheduled
212213
_register_future!(ctx, state, task, tid, (future, thunk_id, false))
214+
@dagdebug thunk :submit "Registered future"
213215
end
214216
if ref !== nothing
215217
# Preserve the `EagerThunkFinalizer` through `thunk`

src/sch/util.jl

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,37 @@
1+
const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope,
2+
:take, :execute]
3+
macro dagdebug(thunk, category, msg, args...)
4+
cat_sym = category.value
5+
@gensym id
6+
debug_ex_id = :(@debug "[$($id)] ($($(repr(cat_sym)))) $($msg)" _module=Dagger _file=$(string(__source__.file)) _line=$(__source__.line))
7+
append!(debug_ex_id.args, args)
8+
debug_ex_noid = :(@debug "($($(repr(cat_sym)))) $($msg)" _module=Dagger _file=$(string(__source__.file)) _line=$(__source__.line))
9+
append!(debug_ex_noid.args, args)
10+
esc(quote
11+
let $id = -1
12+
if $thunk isa Integer
13+
$id = Int($thunk)
14+
elseif $thunk isa Thunk
15+
$id = $thunk.id
16+
elseif $thunk === nothing
17+
$id = 0
18+
else
19+
@warn "Unsupported thunk argument to @dagdebug: $(typeof($thunk))"
20+
$id = -1
21+
end
22+
if $id > 0
23+
if $(QuoteNode(cat_sym)) in $DAGDEBUG_CATEGORIES
24+
$debug_ex_id
25+
end
26+
elseif $id == 0
27+
if $(QuoteNode(cat_sym)) in $DAGDEBUG_CATEGORIES
28+
$debug_ex_noid
29+
end
30+
end
31+
end
32+
end)
33+
end
34+
135
"""
236
unwrap_nested_exception(err::Exception) -> Bool
337
@@ -267,13 +301,13 @@ function can_use_proc(task, gproc, proc, opts, scope)
267301
@warn "The `proclist` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
268302
if opts.proclist isa Function
269303
if !Base.invokelatest(opts.proclist, proc)
270-
@debug "[$(task.id)] Rejected $proc: proclist(proc) == false"
304+
@dagdebug task :scope "Rejected $proc: proclist(proc) == false"
271305
return false, scope
272306
end
273307
scope = constrain(scope, Dagger.ExactScope(proc))
274308
elseif opts.proclist isa Vector
275309
if !(typeof(proc) in opts.proclist)
276-
@debug "[$(task.id)] Rejected $proc: !(typeof(proc) in proclist)"
310+
@dagdebug task :scope "Rejected $proc: !(typeof(proc) in proclist)"
277311
return false, scope
278312
end
279313
scope = constrain(scope,
@@ -282,7 +316,7 @@ function can_use_proc(task, gproc, proc, opts, scope)
282316
throw(SchedulingException("proclist must be a Function, Vector, or nothing"))
283317
end
284318
if scope isa Dagger.InvalidScope
285-
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
319+
@dagdebug task :scope "Rejected $proc: Not contained in task scope ($scope)"
286320
return false, scope
287321
end
288322
end
@@ -291,26 +325,26 @@ function can_use_proc(task, gproc, proc, opts, scope)
291325
if opts.single !== nothing
292326
@warn "The `single` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
293327
if gproc.pid != opts.single
294-
@debug "[$(task.id)] Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
328+
@dagdebug task :scope "Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
295329
return false, scope
296330
end
297331
scope = constrain(scope, Dagger.ProcessScope(opts.single))
298332
if scope isa Dagger.InvalidScope
299-
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
333+
@dagdebug task :scope "Rejected $proc: Not contained in task scope ($scope)"
300334
return false, scope
301335
end
302336
end
303337

304338
# Check against scope
305339
proc_scope = Dagger.ExactScope(proc)
306340
if constrain(scope, proc_scope) isa Dagger.InvalidScope
307-
@debug "[$(task.id)] Rejected $proc: Not contained in task scope ($scope)"
341+
@dagdebug task :scope "Rejected $proc: Not contained in task scope ($scope)"
308342
return false, scope
309343
end
310344

311345
@label accept
312346

313-
@debug "[$(task.id)] Accepted $proc"
347+
@dagdebug task :scope "Accepted $proc"
314348
return true, scope
315349
end
316350

0 commit comments

Comments
 (0)