@@ -8,9 +8,7 @@ mutable struct StreamStore{T,B}
88 new {T,B} (zeros (Int, 0 ), Dict {Int,B} (), buffer_amount,
99 true , Threads. Condition ())
1010end
11- tid () = Dagger. Sch. sch_handle (). thunk_id. id
12- function uid ()
13- thunk_id = tid ()
11+ function tid_to_uid (thunk_id)
1412 lock (Sch. EAGER_ID_MAP) do id_map
1513 for (uid, otid) in id_map
1614 if thunk_id == otid
@@ -20,15 +18,17 @@ function uid()
2018 end
2119end
2220function Base. put! (store:: StreamStore{T,B} , value) where {T,B}
21+ thunk_id = STREAM_THUNK_ID[]
22+ uid = tid_to_uid (thunk_id)
2323 @lock store. lock begin
2424 if ! isopen (store)
25- @dagdebug nothing :stream_put " [$( uid ()) ] closed!"
25+ @dagdebug thunk_id :stream " [$uid ] closed!"
2626 throw (InvalidStateException (" Stream is closed" , :closed ))
2727 end
28- @dagdebug nothing :stream_put " [$( uid ()) ] adding $value "
28+ @dagdebug thunk_id :stream " [$uid ] adding $value "
2929 for buffer in values (store. buffers)
3030 while isfull (buffer)
31- @dagdebug nothing :stream_put " [$( uid ()) ] buffer full, waiting"
31+ @dagdebug thunk_id :stream " [$uid ] buffer full, waiting"
3232 wait (store. lock)
3333 end
3434 put! (buffer, value)
@@ -37,15 +37,17 @@ function Base.put!(store::StreamStore{T,B}, value) where {T,B}
3737 end
3838end
3939function Base. take! (store:: StreamStore , id:: UInt )
40+ thunk_id = STREAM_THUNK_ID[]
41+ uid = tid_to_uid (thunk_id)
4042 @lock store. lock begin
4143 buffer = store. buffers[id]
4244 while isempty (buffer) && isopen (store, id)
43- @dagdebug nothing :stream_take " [$( uid ()) ] no elements, not taking"
45+ @dagdebug thunk_id :stream " [$uid ] no elements, not taking"
4446 wait (store. lock)
4547 end
46- @dagdebug nothing :stream_take " [$( uid ()) ] wait finished"
48+ @dagdebug thunk_id :stream " [$uid ] wait finished"
4749 if ! isopen (store, id)
48- @dagdebug nothing :stream_take " [$( uid ()) ] closed!"
50+ @dagdebug thunk_id :stream " [$uid ] closed!"
4951 throw (InvalidStateException (" Stream is closed" , :closed ))
5052 end
5153 unlock (store. lock)
@@ -54,7 +56,7 @@ function Base.take!(store::StreamStore, id::UInt)
5456 finally
5557 lock (store. lock)
5658 end
57- @dagdebug nothing :stream_take " [$( uid ()) ] value accepted"
59+ @dagdebug thunk_id :stream " [$uid ] value accepted"
5860 notify (store. lock)
5961 return value
6062 end
@@ -244,6 +246,8 @@ function cancel_stream!(t::EagerThunk)
244246 end
245247end
246248
249+ const STREAM_THUNK_ID = TaskLocalValue {Int} (()-> 0 )
250+
247251struct StreamingFunction{F, S}
248252 f:: F
249253 stream:: S
@@ -252,7 +256,9 @@ chunktype(sf::StreamingFunction{F}) where F = F
252256function (sf:: StreamingFunction )(args... ; kwargs... )
253257 @nospecialize sf args kwargs
254258 result = nothing
255- thunk_id = tid ()
259+ thunk_id = Sch. sch_handle (). thunk_id. id
260+ STREAM_THUNK_ID[] = thunk_id
261+ # FIXME : Remove when scheduler is distributed
256262 uid = remotecall_fetch (1 , thunk_id) do thunk_id
257263 lock (Sch. EAGER_ID_MAP) do id_map
258264 for (uid, otid) in id_map
@@ -293,13 +299,13 @@ function (sf::StreamingFunction)(args...; kwargs...)
293299 end
294300 end
295301 for stream in streams
296- @dagdebug nothing :stream_close " [$uid ] dropping waiter"
302+ @dagdebug thunk_id :stream " [$uid ] dropping waiter"
297303 remove_waiters! (stream, uid)
298- @dagdebug nothing :stream_close " [$uid ] dropped waiter"
304+ @dagdebug thunk_id :stream " [$uid ] dropped waiter"
299305 end
300306
301307 # Ensure downstream tasks also terminate
302- @dagdebug nothing :stream_close " [$uid ] closed stream"
308+ @dagdebug thunk_id :stream " [$uid ] closed stream"
303309 close (sf. stream)
304310 end
305311end
0 commit comments