Skip to content

Commit 080849f

Browse files
committed
Sch: Fix WeakChunk usage
Construct WeakChunk within scheduler Add anti-serialization method to WeakChunk Add Chunk type to Chunk hashing method Track DRef owner and ID in WeakChunk struct Preserve old arguments in scheduler submission Check for matching Chunk in state.waiting_data
1 parent 9246a77 commit 080849f

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

src/chunks.jl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ affinity(c::Chunk) = affinity(c.handle)
6565
is_task_or_chunk(c::Chunk) = true
6666

6767
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
68-
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
68+
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, hash(Chunk, x))
6969

7070
collect_remote(chunk::Chunk) =
7171
move(chunk.processor, OSProc(), poolget(chunk.handle))
@@ -281,16 +281,22 @@ function savechunk(data, dir, f)
281281
end
282282

283283
struct WeakChunk
284+
wid::Int
285+
id::Int
284286
x::WeakRef
287+
function WeakChunk(c::Chunk)
288+
return new(c.handle.owner, c.handle.id, WeakRef(c))
289+
end
285290
end
286-
WeakChunk(c::Chunk) = WeakChunk(WeakRef(c))
287291
unwrap_weak(c::WeakChunk) = c.x.value
288292
function unwrap_weak_checked(c::WeakChunk)
289-
c = unwrap_weak(c)
290-
@assert c !== nothing
291-
return c
293+
cw = unwrap_weak(c)
294+
@assert cw !== nothing "WeakChunk expired: ($(c.wid), $(c.id))"
295+
return cw
292296
end
293297
is_task_or_chunk(c::WeakChunk) = true
298+
Serialization.serialize(io::AbstractSerializer, wc::WeakChunk) =
299+
error("Cannot serialize a WeakChunk")
294300

295301
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
296302
Base.@deprecate_binding Part Chunk

src/sch/util.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ function reschedule_syncdeps!(state, thunk, seen=Set{Thunk}())
107107
if haskey(state.cache, thunk) || (thunk in state.ready) || (thunk in state.running)
108108
continue
109109
end
110+
for (_,input) in thunk.inputs
111+
if input isa WeakChunk
112+
input = unwrap_weak_checked(input)
113+
end
114+
if input isa Chunk
115+
# N.B. Different Chunks with the same DRef handle will hash to the same slot,
116+
# so we just pick an equivalent Chunk as our upstream
117+
if !haskey(state.waiting_data, input)
118+
push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk)
119+
end
120+
end
121+
end
110122
w = get!(()->Set{Thunk}(), state.waiting, thunk)
111123
for input in thunk.syncdeps
112124
input = unwrap_weak_checked(input)

src/submission.jl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
2727
timespan_start(ctx, :add_thunk, tid, 0)
2828

2929
# Lookup EagerThunk/ThunkID -> Thunk
30+
old_args = copy(args)
3031
args::Vector{Any}
3132
syncdeps = if haskey(options, :syncdeps)
3233
collect(options.syncdeps)
@@ -47,6 +48,13 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
4748
elseif arg isa Sch.ThunkID
4849
arg_tid = arg.id
4950
state.thunk_dict[arg_tid]
51+
elseif arg isa Chunk
52+
# N.B. Different Chunks with the same DRef handle will hash to the same slot,
53+
# so we just pick an equivalent Chunk as our upstream
54+
if haskey(state.waiting_data, arg)
55+
arg = only(filter(o->o isa Chunk && o.handle == arg.handle, keys(state.waiting_data)))::Chunk
56+
end
57+
WeakChunk(arg)
5058
else
5159
arg
5260
end
@@ -76,7 +84,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
7684
options = merge(options, (;syncdeps))
7785
end
7886

79-
GC.@preserve args begin
87+
GC.@preserve old_args args begin
8088
# Create the `Thunk`
8189
thunk = Thunk(f, args...; options...)
8290

@@ -146,8 +154,6 @@ function eager_process_elem_submission_to_local(id_map, x)
146154
@assert !isa(x, Thunk) "Cannot use `Thunk`s in `@spawn`/`spawn`"
147155
if x isa Dagger.EagerThunk && haskey(id_map, x.uid)
148156
return Sch.ThunkID(id_map[x.uid], x.thunk_ref)
149-
elseif x isa Dagger.Chunk
150-
return WeakChunk(x)
151157
else
152158
return x
153159
end

0 commit comments

Comments
 (0)