File tree Expand file tree Collapse file tree 1 file changed +17
-0
lines changed Expand file tree Collapse file tree 1 file changed +17
-0
lines changed Original file line number Diff line number Diff line change 138138remove_waiters! (stream:: Stream , waiter:: Integer ) =
139139 remove_waiters! (stream:: Stream , Int[waiter])
140140
141+ function migrate_stream! (stream:: Stream , w:: Integer = myid ())
142+ # Take lock to prevent any further modifications
143+ # N.B. Serialization automatically unlocks
144+ remotecall_wait (stream. ref. handle. owner, stream. ref. handle) do ref
145+ lock ((MemPool. poolget (ref):: StreamStore ). lock)
146+ end
147+
148+ # Perform migration of the StreamStore
149+ # MemPool will block access to the new ref until the migration completes
150+ if stream. ref. handle. owner != w
151+ MemPool. migrate! (stream. ref. handle, w)
152+ end
153+ end
154+
141155struct NullStream end
142156Base. put! (ns:: NullStream , x) = nothing
143157Base. take! (ns:: NullStream ) = throw (ConcurrencyViolationError (" Cannot `take!` from a `NullStream`" ))
@@ -245,6 +259,9 @@ function (sf::StreamingFunction)(args...; kwargs...)
245259 end
246260 end
247261 end
262+ if sf. stream isa Stream
263+ migrate_stream! (sf. stream)
264+ end
248265 try
249266 kwarg_names = map (name-> Val {name} (), map (first, (kwargs... ,)))
250267 kwarg_values = map (last, (kwargs... ,))
You can’t perform that action at this time.
0 commit comments