@@ -72,6 +72,7 @@ Fields:
7272- `lock::ReentrantLock` - Lock around operations which modify the state
7373- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
7474- `errored::WeakKeyDict{Thunk,Bool}` - Indicates if a thunk's result is an error.
75+ - `thunks_to_delete::Set{Thunk}` - The list of `Thunk`s ready to be deleted upon completion.
7576- `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks.
7677"""
7778struct ComputeState
@@ -98,6 +99,7 @@ struct ComputeState
9899 lock:: ReentrantLock
99100 futures:: Dict{Thunk, Vector{ThunkFuture}}
100101 errored:: WeakKeyDict{Thunk,Bool}
102+ thunks_to_delete:: Set{Thunk}
101103 chan:: RemoteChannel{Channel{Any}}
102104end
103105
@@ -127,6 +129,7 @@ function start_state(deps::Dict, node_order, chan)
127129 ReentrantLock (),
128130 Dict {Thunk, Vector{ThunkFuture}} (),
129131 WeakKeyDict {Thunk,Bool} (),
132+ Set {Thunk} (),
130133 chan)
131134
132135 for k in sort (collect (keys (deps)), by= node_order)
@@ -366,7 +369,7 @@ function init_proc(state, p, log_sink)
366369 end
367370 end
368371 end
369- errormonitor_tracked (t)
372+ errormonitor_tracked (" worker monitor $wid " , t)
370373 WORKER_MONITOR_TASKS[wid] = t
371374 WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
372375 end
@@ -590,6 +593,8 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
590593 timespan_start (ctx, :finish , thunk_id, (;thunk_id))
591594 finish_task! (ctx, state, node, thunk_failed)
592595 timespan_finish (ctx, :finish , thunk_id, (;thunk_id))
596+
597+ delete_unused_tasks! (state)
593598 end
594599
595600 safepoint (state)
@@ -931,6 +936,39 @@ function finish_task!(ctx, state, node, thunk_failed)
931936 evict_all_chunks! (ctx, to_evict)
932937end
933938
939+ function delete_unused_tasks! (state)
940+ to_delete = Thunk[]
941+ for thunk in state. thunks_to_delete
942+ if task_unused (state, thunk)
943+ # Finished and nobody waiting on us, we can be deleted
944+ push! (to_delete, thunk)
945+ end
946+ end
947+ for thunk in to_delete
948+ # Delete all cached data
949+ task_delete! (state, thunk)
950+
951+ pop! (state. thunks_to_delete, thunk)
952+ end
953+ end
954+ function delete_unused_task! (state, thunk)
955+ if task_unused (state, thunk)
956+ # Will not be accessed further, delete all cached data
957+ task_delete! (state, thunk)
958+ return true
959+ else
960+ return false
961+ end
962+ end
963+ task_unused (state, thunk) =
964+ haskey (state. cache, thunk) && ! haskey (state. waiting_data, thunk)
965+ function task_delete! (state, thunk)
966+ delete! (state. cache, thunk)
967+ delete! (state. errored, thunk)
968+ delete! (state. valid, thunk)
969+ delete! (state. thunk_dict, thunk. id)
970+ end
971+
934972function evict_all_chunks! (ctx, to_evict)
935973 if ! isempty (to_evict)
936974 @sync for w in map (p-> p. pid, procs_to_use (ctx))
@@ -1290,7 +1328,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12901328 else
12911329 t. sticky = false
12921330 end
1293- tasks[thunk_id] = errormonitor_tracked (schedule (t))
1331+ tasks[thunk_id] = errormonitor_tracked (" thunk $thunk_id " , schedule (t))
12941332 proc_occupancy[] += task_occupancy
12951333 time_pressure[] += time_util
12961334 end
@@ -1302,7 +1340,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13021340 else
13031341 proc_run_task. sticky = false
13041342 end
1305- return errormonitor_tracked (schedule (proc_run_task))
1343+ return errormonitor_tracked (" processor $to_proc " , schedule (proc_run_task))
13061344end
13071345
13081346"""
0 commit comments