@@ -342,7 +342,7 @@ function init_proc(state, p, log_sink)
342342 lock (WORKER_MONITOR_LOCK) do
343343 wid = p. pid
344344 if ! haskey (WORKER_MONITOR_TASKS, wid)
345- t = @async begin
345+ t = Threads . @spawn begin
346346 try
347347 # Wait until this connection is terminated
348348 remotecall_fetch (sleep, wid, typemax (UInt64))
@@ -505,7 +505,7 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
505505
506506 # Initialize workers
507507 @sync for p in procs_to_use (ctx)
508- @async begin
508+ Threads . @spawn begin
509509 try
510510 init_proc (state, p, ctx. log_sink)
511511 catch err
@@ -521,7 +521,7 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
521521 end
522522
523523 # Listen for new workers
524- @async begin
524+ Threads . @spawn begin
525525 try
526526 monitor_procs_changed! (ctx, state)
527527 catch err
@@ -632,7 +632,7 @@ function scheduler_exit(ctx, state::ComputeState, options)
632632 @dagdebug nothing :global " Tearing down scheduler" uid= state. uid
633633
634634 @sync for p in procs_to_use (ctx)
635- @async cleanup_proc (state, p, ctx. log_sink)
635+ Threads . @spawn cleanup_proc (state, p, ctx. log_sink)
636636 end
637637
638638 lock (state. lock) do
988988function evict_all_chunks! (ctx, to_evict)
989989 if ! isempty (to_evict)
990990 @sync for w in map (p-> p. pid, procs_to_use (ctx))
991- @async remote_do (evict_chunks!, w, ctx. log_sink, to_evict)
991+ Threads . @spawn remote_do (evict_chunks!, w, ctx. log_sink, to_evict)
992992 end
993993 end
994994end
@@ -1075,8 +1075,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10751075 # know which task failed.
10761076 tasks = Task[]
10771077 for ts in to_send
1078- # TODO : errormonitor
1079- @async begin
1078+ task = Threads. @spawn begin
10801079 timespan_start (ctx, :fire , (;worker= gproc. pid), nothing )
10811080 try
10821081 remotecall_wait (do_tasks, gproc. pid, proc, state. chan, [ts]);
@@ -1523,7 +1522,7 @@ function do_task(to_proc, task_desc)
15231522 (data, ids)
15241523 end
15251524 fetch_tasks = map (Iterators. zip (_data,_ids)) do (x, id)
1526- @async begin
1525+ Threads . @spawn begin
15271526 timespan_start (ctx, :move , (;thunk_id, id, processor= to_proc), (;f, data= x))
15281527 #= FIXME : This isn't valid if x is written to
15291528 x = if x isa Chunk
0 commit comments