@@ -289,6 +289,20 @@ struct StreamingFunction{F, S}
289
289
new {F, S} (f, stream, max_evals)
290
290
end
291
291
292
+ struct DestPostMigration
293
+ thunk_id:: Int
294
+ cancel_token:: CancelToken
295
+ f
296
+ DestPostMigration (thunk_id, tls, f) = new (thunk_id, tls. cancel_token, f)
297
+ end
298
+ function (dpm:: DestPostMigration )(store, unsent)
299
+ STREAM_THUNK_ID[] = dpm. thunk_id
300
+ @assert ! in_task ()
301
+ tls = DTaskTLS (OSProc (), typemax (UInt64), nothing , [], dpm. cancel_token)
302
+ set_tls! (tls)
303
+ return dpm. f (store, unsent)
304
+ end
305
+
292
306
function migrate_stream! (stream:: Stream , w:: Integer = myid ())
293
307
# Perform migration of the StreamStore
294
308
# MemPool will block access to the new ref until the migration completes
@@ -318,11 +332,8 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
318
332
empty! (store. output_buffers)
319
333
return (unsent_inputs, unsent_outputs)
320
334
end ,
321
- dest_post_migration= (store, unsent)-> begin
335
+ dest_post_migration= DestPostMigration (thunk_id, tls, (store, unsent)-> begin
322
336
# Initialize the StreamStore on the destination with the unsent inputs/outputs.
323
- STREAM_THUNK_ID[] = thunk_id
324
- @assert ! in_task ()
325
- set_tls! (tls)
326
337
# get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true)
327
338
unsent_inputs, unsent_outputs = unsent
328
339
for (input_uid, inputs) in unsent_inputs
@@ -342,7 +353,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
342
353
# Reset the state of this new store
343
354
store. open = true
344
355
store. migrating = false
345
- end ,
356
+ end ) ,
346
357
post_migration= store-> begin
347
358
# Indicate that this store has migrated
348
359
store. migrating = true
0 commit comments