Skip to content

Commit 0f07a91

Browse files
committed
Task names and GraphVizExt improvements
at-spawn: Add `name` option for custom task names logging: Allow `DTask` in `logs_annotate!` logging: Allow `DArray` in `logs_annotate!` (labels array chunks) logging: Add task result, DTask-to-Thunk, UID-to-TID logging events logging: Add `all_task_deps` option to `enable_logging!` for convenience GraphVizExt: Properly label tasks and objects GraphVizExt: Show box for task and oval for object GraphVizExt: Improve connectivity checks GraphVizExt/PlotsExt: Change default colors to Tab20 PlotsExt: Add processor and scheduler visualization modes PlotsExt: Remove unused plotting implementation PlotsExt: Ensure X axis is globally aligned
1 parent 841c067 commit 0f07a91

File tree

8 files changed

+474
-141
lines changed

8 files changed

+474
-141
lines changed

ext/GraphVizExt.jl

Lines changed: 247 additions & 78 deletions
Large diffs are not rendered by default.

ext/PlotsExt.jl

Lines changed: 102 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,20 @@ import Dagger.TimespanLogging: Timespan
1515
_name_to_color(name::AbstractString, colors) =
1616
colors[mod1(hash(name), length(colors))]
1717
_name_to_color(name::AbstractString, ::Nothing) = "black"
18-
_default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"]
18+
tab20_colors = [
19+
"#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78",
20+
"#2ca02c", "#98df8a", "#d62728", "#ff9896",
21+
"#9467bd", "#c5b0d5", "#8c564b", "#c49c94",
22+
"#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7",
23+
"#bcbd22", "#dbdb8d", "#17becf", "#9edae5"
24+
]
25+
_default_colors = tab20_colors
1926

20-
function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn)
27+
function logs_to_df(logs::Dict, ::Val{:execution}; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn)
2128
# Generate function names
2229
fn_names = Dict{Int, String}()
30+
dtask_names = Dict{UInt, String}()
31+
uid_to_tid = Dict{UInt, Int}()
2332
for w in keys(logs)
2433
for idx in 1:length(logs[w][:core])
2534
category = logs[w][:core][idx].category::Symbol
@@ -32,10 +41,33 @@ function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_c
3241
@warn "Task names missing from logs"
3342
fn_names[tid] = "unknown"
3443
end
44+
if haskey(logs[w], :taskuidtotid)
45+
uid_tid = logs[w][:taskuidtotid][idx]
46+
if uid_tid !== nothing
47+
uid, tid = uid_tid::Pair{UInt,Int}
48+
uid_to_tid[uid] = tid
49+
end
50+
end
51+
elseif category == :data_annotation && kind == :start
52+
id = logs[w][:id][idx]::NamedTuple
53+
name = String(id.name)
54+
obj = id.objectid::Dagger.LoggedMutableObject
55+
objid = obj.objid
56+
if obj.kind == :task
57+
dtask_names[objid] = name
58+
end
3559
end
3660
end
3761
end
3862

63+
# Process DTasks-to-Thunk mappings
64+
for (uid, tid) in uid_to_tid
65+
# Patch in custom name
66+
if haskey(dtask_names, uid)
67+
fn_names[tid] = dtask_names[uid]
68+
end
69+
end
70+
3971
# FIXME: Color eltype
4072
df = DataFrame(proc=Processor[], proc_name=String[], fn_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[])
4173
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
@@ -59,43 +91,42 @@ function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_c
5991
end
6092
return df
6193
end
62-
63-
# Implementation by Przemyslaw Szufel
64-
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt_ps})
65-
df = logs_to_df(logs)
66-
67-
proc_names = sort!(unique(df.proc_name))
68-
proc_idx = Dict{String,Int}()
69-
for name in proc_names
70-
proc_idx[name] = findfirst(==(name), proc_names)
71-
end
72-
proc_idxs = map(name->proc_idx[name], proc_names)
73-
tvals = zeros(UInt64, length(proc_names))
74-
plt = bar(orientation=:h, yticks=(1:length(proc_names), proc_names), linewidth=0,yflip=true,color=:green,legend=nothing)
75-
xlabel!(plt, "Time in seconds")
76-
dfc = deepcopy(df)
77-
while nrow(dfc) > 0
78-
rowslast = DataFrame([g[findmax(g.t_end)[2],:] for g in groupby(dfc, :proc_name)])
79-
tvals .= .0
80-
for i in 1:nrow(rowslast)
81-
tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_end[i]
94+
function logs_to_df(logs::Dict, ::Val{:processor}; colors=_default_colors, name_to_color=_name_to_color, kwargs...)
95+
# Collect processor events
96+
# FIXME: Color eltype
97+
df = DataFrame(proc=Processor[], proc_name=String[], category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[])
98+
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
99+
category = logs[w][:core][start_idx].category
100+
if category in (:compute, :storage_wait, :storage_safe_scan, :proc_run_fetch, :proc_steal_local)
101+
proc = logs[w][:id][start_idx].processor::Processor
102+
proc_name = Dagger.short_name(proc)
103+
t_start = logs[w][:core][start_idx].timestamp::UInt64
104+
t_end = logs[w][:core][finish_idx].timestamp::UInt64
105+
category_str = string(category)
106+
color = name_to_color(category_str, colors)
107+
push!(df, (;proc, proc_name, category=category_str, t_start, t_end, color))
82108
end
83-
#setindex!.(Ref(tvals), rowslast.t_end, getindex.(proc_idx, rowslast.proc_name))
84-
bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,yflip=true,color=:green)
85-
tvals .= .0
86-
for i in 1:nrow(rowslast)
87-
tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_start[i]
109+
end
110+
return df
111+
end
112+
function logs_to_df(logs::Dict, ::Val{:scheduler}; colors=_default_colors, name_to_color=_name_to_color, kwargs...)
113+
# Collect scheduler events
114+
# FIXME: Color eltype
115+
df = DataFrame(category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[])
116+
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
117+
category = logs[w][:core][start_idx].category
118+
if category in (:scheduler_init, :scheduler_exit, :init_proc, :remove_proc, :add_thunk, :handle_fault, :schedule, :fire, :enqueue, :take, :finish)
119+
t_start = logs[w][:core][start_idx].timestamp::UInt64
120+
t_end = logs[w][:core][finish_idx].timestamp::UInt64
121+
category_str = string(category)
122+
color = name_to_color(category_str, colors)
123+
push!(df, (;category=category_str, t_start, t_end, color))
88124
end
89-
#setindex!.(Ref(tvals), rowslast.t_start, proc_idx[rowslast.proc_name])
90-
bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,linecolor=:white,yflip=true,color=:white)
91-
annotate!.(Ref(plt),(rowslast.t_start .+ rowslast.t_end) ./ 2, findfirst.( .==(rowslast.proc_name), Ref(proc_names)), text.(string.(rowslast.tid),9,rotation=90 ))
92-
dfc = dfc[ .! (dfc.tid .∈ Ref(rowslast.tid) ), : ]
93125
end
94-
# FIXME: theoretic_optimal = simulate_polling(df)[1] + minimum(df.t_start)
95-
theoretic_optimal = minimum(df.t_start)
96-
plot!(plt, [theoretic_optimal,theoretic_optimal], [0, length(proc_names)+1],width=3,color=:black,style=:dot)
97-
return plt
126+
return df
98127
end
128+
logs_to_df(logs::Dict, ::Val{target}; kwargs...) where target =
129+
throw(ArgumentError("Invalid target: $(repr(target))"))
99130

100131
# Implementation adapted from:
101132
# https://discourse.julialang.org/t/how-to-make-a-gantt-plot-with-plots-jl/95165/7
@@ -105,30 +136,54 @@ end
105136
Render a Gantt chart of task execution in `logs` using Plots. `kwargs` are passed to `plot` directly.
106137
"""
107138
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
139+
target=:execution,
108140
colors=_default_colors, name_to_color=_name_to_color,
109141
color_by=:fn, kwargs...)
110-
df = logs_to_df(logs; colors, name_to_color, color_by)
142+
df = logs_to_df(logs, Val{target}(); colors, name_to_color, color_by)
143+
y_elem = if target == :execution || target == :processor
144+
:proc_name
145+
elseif target == :scheduler
146+
:category
147+
end
148+
ylabel = if target == :execution || target == :processor
149+
"Processor"
150+
elseif target == :scheduler
151+
"Category"
152+
end
153+
sort!(df, y_elem)
154+
155+
global_t_start, global_t_end = extrema(logs[1][:core][idx].timestamp for idx in 1:length(logs[1][:core]))
111156

112157
rect(w, h, x, y) = Shape(x .+ [0,w,w,0], y .+ [0,0,h,h])
113158

114-
t_init = minimum(df.t_start)
115-
t_start = (df.t_start .- t_init) ./ 1e9
116-
t_end = (df.t_end .- t_init) ./ 1e9
159+
t_start = (df.t_start .- global_t_start) ./ 1e9
160+
t_end = (df.t_end .- global_t_start) ./ 1e9
117161
duration = t_end .- t_start
118-
u = unique(df.proc_name)
162+
u = unique(getproperty(df, y_elem))
119163
dy = Dict(u .=> 1:length(u))
120-
r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, df.proc_name)]
121-
labels = permutedims(df.fn_name)
122-
# Deduplicate labels
123-
for idx in 1:length(labels)
124-
if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx
125-
labels[idx] = ""
164+
r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, getproperty(df, y_elem))]
165+
if target == :execution
166+
labels = permutedims(df.fn_name)
167+
elseif target == :processor
168+
labels = permutedims(df.category)
169+
elseif target == :scheduler
170+
labels = nothing
171+
end
172+
173+
if labels !== nothing
174+
# Deduplicate labels
175+
for idx in 1:length(labels)
176+
if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx
177+
labels[idx] = ""
178+
end
126179
end
127180
end
128181

129182
return plot(r; color=permutedims(df.color), labels,
130183
yticks=(1.5:(nrow(df) + 0.5), u),
131-
xlabel="Time (seconds)", ylabel="Processor",
184+
xlabel="Time (seconds)", ylabel,
185+
xlim=(0.0, (global_t_end - global_t_start) / 1e9),
186+
legendalpha=0,
132187
kwargs...)
133188
end
134189

src/array/darray.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,13 @@ function Base.:(==)(x::AbstractArray{T,N}, y::ArrayOp{S,N}) where {T,S,N}
524524
return collect(x) == y
525525
end
526526

527+
function logs_annotate!(ctx::Context, A::DArray, name::Union{String,Symbol})
528+
for (idx, chunk) in enumerate(A.chunks)
529+
sd = A.subdomains[idx]
530+
Dagger.logs_annotate!(ctx, chunk, name*'['*join(sd.indexes, ',')*']')
531+
end
532+
end
533+
527534
# TODO: Allow `f` to return proc
528535
mapchunk(f, chunk) = tochunk(f(poolget(chunk.handle)))
529536
function mapchunks(f, d::DArray{T,N,F}) where {T,N,F}

src/sch/Sch.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,9 +604,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
604604
end
605605
end
606606

607-
timespan_start(ctx, :finish, (;thunk_id), (;thunk_id))
607+
timespan_start(ctx, :finish, (;thunk_id), (;thunk_id, result=res))
608608
finish_task!(ctx, state, node, thunk_failed)
609-
timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id))
609+
timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id, result=res))
610610

611611
delete_unused_tasks!(state)
612612
end
@@ -1684,7 +1684,7 @@ function do_task(to_proc, task_desc)
16841684
threadtime = cputhreadtime() - threadtime_start
16851685
# FIXME: This is not a realistic measure of max. required memory
16861686
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))
1687-
timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f))
1687+
timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f, result=result_meta))
16881688
lock(TASK_SYNC) do
16891689
real_time_util[] -= est_time_util
16901690
pop!(TASKS_RUNNING, thunk_id)

src/submission.jl

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
2626

2727
id = next_id()
2828

29-
timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options))
29+
timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid))
3030

3131
# Lookup DTask/ThunkID -> Thunk
3232
old_args = copy(args)
@@ -129,7 +129,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
129129
put!(state.chan, Sch.RescheduleSignal())
130130
end
131131

132-
timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options))
132+
timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid))
133133

134134
return thunk_id
135135
end
@@ -224,10 +224,13 @@ function eager_spawn(spec::DTaskSpec)
224224
future = ThunkFuture()
225225
finalizer_ref = poolset(DTaskFinalizer(uid); device=MemPool.CPURAMDevice())
226226

227-
# Return unlaunched DTask
227+
# Create unlaunched DTask
228228
return DTask(uid, future, finalizer_ref)
229229
end
230230
function eager_launch!((spec, task)::Pair{DTaskSpec,DTask})
231+
# Assign a name, if specified
232+
eager_assign_name!(spec, task)
233+
231234
# Lookup DTask -> ThunkID
232235
local args, options
233236
lock(Sch.EAGER_ID_MAP) do id_map
@@ -244,6 +247,11 @@ end
244247
function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}})
245248
ntasks = length(specs)
246249

250+
# Assign a name, if specified
251+
for (spec, task) in specs
252+
eager_assign_name!(spec, task)
253+
end
254+
247255
uids = [task.uid for (_, task) in specs]
248256
futures = [task.future for (_, task) in specs]
249257
finalizer_refs = [task.finalizer_ref for (_, task) in specs]
@@ -263,3 +271,11 @@ function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}})
263271
task.thunk_ref = thunk_ids[i].ref
264272
end
265273
end
274+
275+
function eager_assign_name!(spec::DTaskSpec, task::DTask)
276+
# Assign a name, if specified
277+
if haskey(spec.options, :name)
278+
Dagger.logs_annotate!(task, spec.options.name)
279+
spec.options = (;filter(x -> x[1] != :name, Base.pairs(spec.options))...)
280+
end
281+
end

0 commit comments

Comments
 (0)