@@ -289,6 +289,20 @@ struct StreamingFunction{F, S}
289289 new {F, S} (f, stream, max_evals)
290290end
291291
292+ struct DestPostMigration
293+ thunk_id:: Int
294+ cancel_token:: CancelToken
295+ f
296+ DestPostMigration (thunk_id, tls, f) = new (thunk_id, tls. cancel_token, f)
297+ end
298+ function (dpm:: DestPostMigration )(store, unsent)
299+ STREAM_THUNK_ID[] = dpm. thunk_id
300+ @assert ! in_task ()
301+ tls = DTaskTLS (OSProc (), typemax (UInt64), nothing , [], dpm. cancel_token)
302+ set_tls! (tls)
303+ return dpm. f (store, unsent)
304+ end
305+
292306function migrate_stream! (stream:: Stream , w:: Integer = myid ())
293307 # Perform migration of the StreamStore
294308 # MemPool will block access to the new ref until the migration completes
@@ -318,11 +332,8 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
318332 empty! (store. output_buffers)
319333 return (unsent_inputs, unsent_outputs)
320334 end ,
321- dest_post_migration= (store, unsent)-> begin
335+ dest_post_migration= DestPostMigration (thunk_id, tls, (store, unsent)-> begin
322336 # Initialize the StreamStore on the destination with the unsent inputs/outputs.
323- STREAM_THUNK_ID[] = thunk_id
324- @assert ! in_task ()
325- set_tls! (tls)
326337 # get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true)
327338 unsent_inputs, unsent_outputs = unsent
328339 for (input_uid, inputs) in unsent_inputs
@@ -342,7 +353,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
342353 # Reset the state of this new store
343354 store. open = true
344355 store. migrating = false
345- end ,
356+ end ) ,
346357 post_migration= store-> begin
347358 # Indicate that this store has migrated
348359 store. migrating = true
0 commit comments