Skip to content

Commit c32db49

Browse files
committed
fixup! Add streaming API
1 parent 776aa9e commit c32db49

File tree

2 files changed

+223
-99
lines changed

2 files changed

+223
-99
lines changed

src/eager_thunk.jl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,17 @@ function Base.fetch(t::EagerThunk; raw=false)
5555
if !isdefined(t, :thunk_ref)
5656
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
5757
end
58-
return fetch(t.future; raw)
58+
stream = task_to_stream(t.uid)
59+
if stream !== nothing
60+
add_waiters!(stream, [0])
61+
end
62+
try
63+
return fetch(t.future; raw)
64+
finally
65+
if stream !== nothing
66+
remove_waiters!(stream, [0])
67+
end
68+
end
5969
end
6070
function Base.show(io::IO, t::EagerThunk)
6171
status = if isdefined(t, :thunk_ref)

0 commit comments

Comments
 (0)