@@ -19,16 +19,15 @@ function tid_to_uid(thunk_id)
1919end
2020function Base. put! (store:: StreamStore{T,B} , value) where {T,B}
2121 thunk_id = STREAM_THUNK_ID[]
22- uid = tid_to_uid (thunk_id)
2322 @lock store. lock begin
2423 if ! isopen (store)
25- @dagdebug thunk_id :stream " [ $uid ] closed!"
24+ @dagdebug thunk_id :stream " closed!"
2625 throw (InvalidStateException (" Stream is closed" , :closed ))
2726 end
28- @dagdebug thunk_id :stream " [ $uid ] adding $value "
27+ @dagdebug thunk_id :stream " adding $value "
2928 for buffer in values (store. buffers)
3029 while isfull (buffer)
31- @dagdebug thunk_id :stream " [ $uid ] buffer full, waiting"
30+ @dagdebug thunk_id :stream " buffer full, waiting"
3231 wait (store. lock)
3332 end
3433 put! (buffer, value)
@@ -38,16 +37,15 @@ function Base.put!(store::StreamStore{T,B}, value) where {T,B}
3837end
3938function Base. take! (store:: StreamStore , id:: UInt )
4039 thunk_id = STREAM_THUNK_ID[]
41- uid = tid_to_uid (thunk_id)
4240 @lock store. lock begin
4341 buffer = store. buffers[id]
4442 while isempty (buffer) && isopen (store, id)
45- @dagdebug thunk_id :stream " [ $uid ] no elements, not taking"
43+ @dagdebug thunk_id :stream " no elements, not taking"
4644 wait (store. lock)
4745 end
48- @dagdebug thunk_id :stream " [ $uid ] wait finished"
46+ @dagdebug thunk_id :stream " wait finished"
4947 if ! isopen (store, id)
50- @dagdebug thunk_id :stream " [ $uid ] closed!"
48+ @dagdebug thunk_id :stream " closed!"
5149 throw (InvalidStateException (" Stream is closed" , :closed ))
5250 end
5351 unlock (store. lock)
@@ -56,7 +54,7 @@ function Base.take!(store::StreamStore, id::UInt)
5654 finally
5755 lock (store. lock)
5856 end
59- @dagdebug thunk_id :stream " [ $uid ] value accepted"
57+ @dagdebug thunk_id :stream " value accepted"
6058 notify (store. lock)
6159 return value
6260 end
@@ -129,46 +127,53 @@ function Base.take!(stream::Stream{T,B}, id::UInt) where {T,B}
129127 return take! (stream. input_buffer)
130128end
131129function Base. isopen (stream:: Stream , id:: UInt ):: Bool
132- return remotecall_fetch (stream. store_ref. handle. owner, stream . store_ref . handle ) do ref
133- return isopen (MemPool . poolget (ref) :: StreamStore , id)
130+ return MemPool . access_ref (stream. store_ref. handle, id ) do store, id
131+ return isopen (store :: StreamStore , id)
134132 end
135133end
136134function Base. close (stream:: Stream )
137- remotecall_wait (stream. store_ref. handle. owner, stream. store_ref. handle) do ref
138- close (MemPool. poolget (ref):: StreamStore )
135+ MemPool. access_ref (stream. store_ref. handle) do store
136+ close (store:: StreamStore )
137+ return
139138 end
139+ return
140140end
141141function add_waiters! (stream:: Stream , waiters:: Vector{Int} )
142- remotecall_wait (stream. store_ref. handle. owner, stream. store_ref. handle) do ref
143- add_waiters! (MemPool. poolget (ref):: StreamStore , waiters)
142+ MemPool. access_ref (stream. store_ref. handle, waiters) do store, waiters
143+ add_waiters! (store:: StreamStore , waiters)
144+ return
144145 end
146+ return
145147end
146148add_waiters! (stream:: Stream , waiter:: Integer ) =
147149 add_waiters! (stream:: Stream , Int[waiter])
148150function remove_waiters! (stream:: Stream , waiters:: Vector{Int} )
149- remotecall_wait (stream. store_ref. handle. owner, stream. store_ref. handle) do ref
150- remove_waiters! (MemPool. poolget (ref):: StreamStore , waiters)
151+ MemPool. access_ref (stream. store_ref. handle, waiters) do store, waiters
152+ remove_waiters! (store:: StreamStore , waiters)
153+ return
151154 end
155+ return
152156end
153157remove_waiters! (stream:: Stream , waiter:: Integer ) =
154158 remove_waiters! (stream:: Stream , Int[waiter])
155159
156160function migrate_stream! (stream:: Stream , w:: Integer = myid ())
157- if ! isdefined (MemPool, :migrate! )
158- @warn " MemPool migration support not enabled!\n Performance may be degraded" maxlog= 1
159- return
160- end
161-
162161 # Perform migration of the StreamStore
163162 # MemPool will block access to the new ref until the migration completes
163+ # FIXME : Do this with MemPool.access_ref, in case stream was already migrated
164164 if stream. store_ref. handle. owner != w
165- # Take lock to prevent any further modifications
166- # N.B. Serialization automatically unlocks
167- remotecall_wait (stream. store_ref. handle. owner, stream. store_ref. handle) do ref
168- lock ((MemPool. poolget (ref):: StreamStore ). lock)
165+ new_store_ref = MemPool. migrate! (stream. store_ref. handle, w; pre_migration= store-> begin
166+ # Lock store to prevent any further modifications
167+ # N.B. Serialization automatically unlocks the migrated copy
168+ lock ((store:: StreamStore ). lock)
169+ end , post_migration= store-> begin
170+ # Unlock the store
171+ # FIXME : Indicate to all waiters that this store is dead
172+ unlock ((store:: StreamStore ). lock)
173+ end )
174+ if w == myid ()
175+ stream. store = MemPool. access_ref (identity, new_store_ref; local_only= true )
169176 end
170-
171- MemPool. migrate! (stream. store_ref. handle, w)
172177 end
173178end
174179
@@ -272,6 +277,7 @@ function (sf::StreamingFunction)(args...; kwargs...)
272277 # Migrate our output stream to this worker
273278 if sf. stream isa Stream
274279 migrate_stream! (sf. stream)
280+ @dagdebug thunk_id :stream " Migration complete"
275281 end
276282
277283 try
@@ -299,13 +305,13 @@ function (sf::StreamingFunction)(args...; kwargs...)
299305 end
300306 end
301307 for stream in streams
302- @dagdebug thunk_id :stream " [ $uid ] dropping waiter"
308+ @dagdebug thunk_id :stream " dropping waiter"
303309 remove_waiters! (stream, uid)
304- @dagdebug thunk_id :stream " [ $uid ] dropped waiter"
310+ @dagdebug thunk_id :stream " dropped waiter"
305311 end
306312
307313 # Ensure downstream tasks also terminate
308- @dagdebug thunk_id :stream " [ $uid ] closed stream"
314+ @dagdebug thunk_id :stream " closed stream"
309315 close (sf. stream)
310316 end
311317end
0 commit comments