@@ -58,6 +58,7 @@ Fields:
5858- `running_on::Dict{Thunk,OSProc}` - Map from `Thunk` to the OS process executing it
5959- `thunk_dict::Dict{Int, WeakThunk}` - Maps from thunk IDs to a `Thunk`
6060- `node_order::Any` - Function that returns the order of a thunk
61+ - `equiv_chunks::WeakKeyDict{DRef,Chunk}` - Cache mapping from `DRef` to a `Chunk` which contains it
6162- `worker_time_pressure::Dict{Int,Dict{Processor,UInt64}}` - Maps from worker ID to processor pressure
6263- `worker_storage_pressure::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}` - Maps from worker ID to storage resource pressure
6364- `worker_storage_capacity::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}` - Maps from worker ID to storage resource capacity
@@ -84,6 +85,7 @@ struct ComputeState
8485 running_on:: Dict{Thunk,OSProc}
8586 thunk_dict:: Dict{Int, WeakThunk}
8687 node_order:: Any
88+ equiv_chunks:: WeakKeyDict{DRef,Chunk}
8789 worker_time_pressure:: Dict{Int,Dict{Processor,UInt64}}
8890 worker_storage_pressure:: Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}
8991 worker_storage_capacity:: Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}
@@ -113,6 +115,7 @@ function start_state(deps::Dict, node_order, chan)
113115 Dict{Thunk,OSProc}(),
114116 Dict{Int, WeakThunk}(),
115117 node_order,
118+ WeakKeyDict{DRef,Chunk}(),
116119 Dict{Int,Dict{Processor,UInt64}}(),
117120 Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(),
118121 Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(),
@@ -428,6 +431,11 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options::SchedulerOpt
428431 state. transfer_rate[] = (state. transfer_rate[] + metadata. transfer_rate) ÷ 2
429432 end
430433 end
434+ if res isa Chunk
435+ if ! haskey(state. equiv_chunks, res)
436+ state. equiv_chunks[res. handle:: DRef ] = res
437+ end
438+ end
431439 store_result!(state, node, res; error= thunk_failed)
432440 if node. options != = nothing && node. options. checkpoint != = nothing
433441 try
0 commit comments