Skip to content

Commit eb9c651

Browse files
committed
logging/Sch/GraphVizExt: Various :graphviz improvements
1 parent a6feb8d commit eb9c651

File tree

6 files changed

+61
-39
lines changed

6 files changed

+61
-39
lines changed

ext/GraphVizExt.jl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,19 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
6565
tid_to_vertex[tid] = nv(g)
6666
push!(task_names, taskname)
6767
for dep in deps
68+
haskey(tid_to_vertex, dep) || continue
6869
add_edge!(g, tid_to_vertex[dep], nv(g))
6970
end
70-
if haskey(logs[w], :taskargs)
71-
id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector}
72-
append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args)
73-
end
7471
elseif category == :compute && kind == :start
7572
id::NamedTuple
7673
tid = id.thunk_id
7774
proc = id.processor
7875
tid_to_proc[tid] = proc
7976
elseif category == :move && kind == :finish
77+
if haskey(logs[w], :taskargs)
78+
id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector}
79+
append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args)
80+
end
8081
if haskey(logs[w], :taskargmoves)
8182
move_info = logs[w][:taskargmoves][idx]
8283
move_info === nothing && continue
@@ -88,7 +89,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
8889
id::NamedTuple
8990
objid = id.objectid
9091
name = id.name
91-
arg_names[objid] = name
92+
arg_names[objid] = String(name)
9293
end
9394
end
9495
end
@@ -163,8 +164,12 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
163164
end
164165

165166
# Add argument moves
167+
seen_moves = Set{Tuple{UInt,UInt}}()
166168
for (tid, moves) in arg_moves
167169
for (pos, (pre_objid, post_objid)) in moves
170+
pre_objid == post_objid && continue
171+
(pre_objid, post_objid) in seen_moves && continue
172+
push!(seen_moves, (pre_objid, post_objid))
168173
move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n"
169174
str *= move_str
170175
end
@@ -205,6 +210,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
205210
end
206211
end
207212
for (tid, args) in task_args
213+
haskey(tid_to_vertex, tid) || continue
208214
id = tid_to_vertex[tid]
209215
id in con_vs || continue
210216
for (pos, arg) in args

src/datadeps.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ function generate_slot!(state::DataDepsState, dest_space, data)
413413
w = only(unique(map(get_parent, collect(processors(dest_space))))).pid
414414
ctx = Sch.eager_context()
415415
id = rand(Int)
416-
timespan_start(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data))
416+
timespan_start(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data))
417417
dest_space_args[data] = remotecall_fetch(w, from_proc, to_proc, data) do from_proc, to_proc, data
418418
data_converted = move(from_proc, to_proc, data)
419419
data_chunk = tochunk(data_converted, to_proc)
@@ -422,7 +422,7 @@ function generate_slot!(state::DataDepsState, dest_space, data)
422422
@assert orig_space != memory_space(data_chunk) "space preserved! $orig_space != $(memory_space(data_chunk)) ($(typeof(data)) vs. $(typeof(data_chunk))), spaces ($orig_space -> $dest_space)"
423423
return data_chunk
424424
end
425-
timespan_finish(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data=dest_space_args[data]))
425+
timespan_finish(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data=dest_space_args[data]))
426426
end
427427
return dest_space_args[data]
428428
end

src/sch/Sch.jl

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,13 +1049,21 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10491049

10501050
ids = Int[0]
10511051
data = Any[thunk.f]
1052-
positions = Union{Symbol,Nothing}[]
1052+
positions = Union{Symbol,Int}[0]
1053+
arg_ctr = 1
10531054
for (idx, pos_x) in enumerate(thunk.inputs)
10541055
pos, x = pos_x
10551056
x = unwrap_weak_checked(x)
10561057
push!(ids, istask(x) ? x.id : -idx)
10571058
push!(data, istask(x) ? state.cache[x] : x)
1058-
push!(positions, pos)
1059+
if pos !== nothing
1060+
# Keyword arg
1061+
push!(positions, pos)
1062+
else
1063+
# Positional arg
1064+
push!(positions, arg_ctr)
1065+
arg_ctr += 1
1066+
end
10591067
end
10601068
toptions = thunk.options !== nothing ? thunk.options : ThunkOptions()
10611069
options = merge(ctx.options, toptions)
@@ -1537,14 +1545,14 @@ function do_task(to_proc, task_desc)
15371545
# Initiate data transfers for function and arguments
15381546
transfer_time = Threads.Atomic{UInt64}(0)
15391547
transfer_size = Threads.Atomic{UInt64}(0)
1540-
_data, _ids = if meta
1541-
(Any[first(data)], Int[first(ids)]) # always fetch function
1548+
_data, _ids, _positions = if meta
1549+
(Any[first(data)], Int[first(ids)], Union{Symbol,Int}[first(positions)]) # always fetch function
15421550
else
1543-
(data, ids)
1551+
(data, ids, positions)
15441552
end
1545-
fetch_tasks = map(Iterators.zip(_data,_ids)) do (x, id)
1553+
fetch_tasks = map(Iterators.zip(_data, _ids, _positions)) do (x, id, position)
15461554
Threads.@spawn begin
1547-
timespan_start(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x))
1555+
timespan_start(ctx, :move, (;thunk_id, id, position, processor=to_proc), (;f, data=x))
15481556
#= FIXME: This isn't valid if x is written to
15491557
x = if x isa Chunk
15501558
value = lock(TASK_SYNC) do
@@ -1587,11 +1595,13 @@ function do_task(to_proc, task_desc)
15871595
end
15881596
else
15891597
=#
1590-
x = @invokelatest move(to_proc, x)
1598+
new_x = @invokelatest move(to_proc, x)
15911599
#end
1592-
@dagdebug thunk_id :move "Moved argument $id to $to_proc: $(typeof(x))"
1593-
timespan_finish(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x); tasks=[Base.current_task()])
1594-
return x
1600+
if new_x !== x
1601+
@dagdebug thunk_id :move "Moved argument $position to $to_proc: $(typeof(x)) -> $(typeof(new_x))"
1602+
end
1603+
timespan_finish(ctx, :move, (;thunk_id, id, position, processor=to_proc), (;f, data=new_x); tasks=[Base.current_task()])
1604+
return new_x
15951605
end
15961606
end
15971607
fetched = Any[]
@@ -1607,8 +1617,8 @@ function do_task(to_proc, task_desc)
16071617
fetched_args = Any[]
16081618
fetched_kwargs = Pair{Symbol,Any}[]
16091619
for (idx, x) in enumerate(fetched)
1610-
pos = positions[idx]
1611-
if pos === nothing
1620+
pos = positions[idx+1]
1621+
if pos isa Int
16121622
push!(fetched_args, x)
16131623
else
16141624
push!(fetched_kwargs, pos => x)

src/submission.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
3838
end::Union{Vector{Any},Nothing}
3939
lock(Sch.EAGER_ID_MAP) do id_map
4040
for (idx, (pos, arg)) in enumerate(args)
41+
# FIXME: Switch to Union{Symbol,Int} to preserve positional information
4142
pos::Union{Symbol,Nothing}
4243
newarg = if arg isa DTask
4344
arg_uid = arg.uid

src/thunk.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,8 @@ function replace_broadcast(fn::Symbol)
362362
end
363363
return fn
364364
end
365+
# For TaskNames logging event
366+
Base.nameof(::ExpandedBroadcast{F}) where F = Symbol('.', nameof(F))
365367

366368
to_namedtuple(;kwargs...) = (;kwargs...)
367369

src/utils/logging-events.jl

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -117,21 +117,20 @@ end
117117
Records the raw (mutable) arguments of each submitted task.
118118
"""
119119
struct TaskArguments end
120-
function (::TaskArguments)(ev::Event{:start})
121-
if ev.category == :add_thunk
120+
(::TaskArguments)(ev::Event{:start}) = nothing
121+
function (ta::TaskArguments)(ev::Event{:finish})
122+
if ev.category == :move
122123
args = Pair{Union{Symbol,Int},UInt}[]
123-
for (idx, (pos, arg)) in enumerate(ev.timeline.args)
124-
pos_idx = pos === nothing ? idx : pos
125-
arg = Dagger.unwrap_weak_checked(arg)
126-
if ismutable(arg)
127-
push!(args, pos_idx => objectid(arg))
128-
end
124+
thunk_id = ev.id.thunk_id::Int
125+
pos = ev.id.position::Union{Symbol,Int}
126+
arg = ev.timeline.data
127+
if ismutable(arg)
128+
push!(args, pos => objectid(arg))
129129
end
130-
return ev.id.thunk_id => args
130+
return thunk_id => args
131131
end
132132
return
133133
end
134-
(ta::TaskArguments)(ev::Event{:finish}) = nothing
135134

136135
"""
137136
TaskArgumentMoves
@@ -148,8 +147,10 @@ function (ta::TaskArgumentMoves)(ev::Event{:start})
148147
if ev.category == :move
149148
data = ev.timeline.data
150149
if ismutable(data)
151-
d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, ev.id.thunk_id)
152-
d[ev.id.id] = objectid(data)
150+
thunk_id = ev.id.thunk_id::Int
151+
position = ev.id.position::Union{Symbol,Int}
152+
d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, thunk_id)
153+
d[position] = objectid(data)
153154
end
154155
end
155156
return
@@ -158,16 +159,18 @@ function (ta::TaskArgumentMoves)(ev::Event{:finish})
158159
if ev.category == :move
159160
post_data = ev.timeline.data
160161
if ismutable(post_data)
161-
if haskey(ta.pre_move_args, ev.id.thunk_id)
162-
d = ta.pre_move_args[ev.id.thunk_id]
163-
if haskey(d, ev.id.id)
164-
pre_data = d[ev.id.id]
165-
return ev.id.thunk_id, ev.id.id, pre_data, objectid(post_data)
162+
thunk_id = ev.id.thunk_id::Int
163+
position = ev.id.position::Union{Symbol,Int}
164+
if haskey(ta.pre_move_args, thunk_id)
165+
d = ta.pre_move_args[thunk_id]
166+
if haskey(d, position)
167+
pre_data = d[position]
168+
return thunk_id, position, pre_data, objectid(post_data)
166169
else
167-
@warn "No TID $(ev.id.thunk_id), ID $(ev.id.id)"
170+
@warn "No TID $(thunk_id), Position $(position)"
168171
end
169172
else
170-
@warn "No TID $(ev.id.thunk_id)"
173+
@warn "No TID $(thunk_id)"
171174
end
172175
end
173176
end

0 commit comments

Comments
 (0)