Skip to content

Commit 59fae18

Browse files
committed
Sch: Add support for MemPool tags and retain
1 parent f64df52 commit 59fae18

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2424
ContextVariablesX = "0.1"
2525
DataStructures = "0.18"
2626
MacroTools = "0.5"
27-
MemPool = "0.4.3"
27+
MemPool = "0.4.4"
2828
Requires = "1"
2929
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
3030
TimespanLogging = "0.1"

src/sch/Sch.jl

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,16 @@ error will be displayed.
219219
when constructing `Chunk`s (such as when constructing the return value). The
220220
device must support `MemPool.CPURAMResource`. When `nothing`, uses
221221
`MemPool.GLOBAL_DEVICE[]`.
222+
- `storage_root_tag::Any=nothing`: If not `nothing`,
223+
specifies the MemPool storage leaf tag to associate with the thunk's result.
224+
This tag can be used by MemPool's storage devices to manipulate their behavior,
225+
such as the file name used to store data on disk."
226+
- `storage_leaf_tag::MemPool.Tag,Nothing}=nothing`: If not `nothing`,
227+
specifies the MemPool storage leaf tag to associate with the thunk's result.
228+
This tag can be used by MemPool's storage devices to manipulate their behavior,
229+
such as the file name used to store data on disk."
230+
- `storage_retain::Bool=false`: The value of `retain` to pass to
231+
`MemPool.poolset` when constructing the result `Chunk`.
222232
"""
223233
Base.@kwdef struct ThunkOptions
224234
single::Union{Int,Nothing} = nothing
@@ -230,6 +240,9 @@ Base.@kwdef struct ThunkOptions
230240
checkpoint = nothing
231241
restore = nothing
232242
storage::Union{Chunk,Nothing} = nothing
243+
storage_root_tag = nothing
244+
storage_leaf_tag::Union{MemPool.Tag,Nothing} = nothing
245+
storage_retain::Bool = false
233246
end
234247

235248
"""
@@ -249,7 +262,10 @@ function Base.merge(sopts::SchedulerOptions, topts::ThunkOptions)
249262
allow_errors,
250263
topts.checkpoint,
251264
topts.restore,
252-
topts.storage)
265+
topts.storage,
266+
topts.storage_root_tag,
267+
topts.storage_leaf_tag,
268+
topts.storage_retain)
253269
end
254270
Base.merge(sopts::SchedulerOptions, ::Nothing) =
255271
ThunkOptions(sopts.single,
@@ -283,6 +299,9 @@ function populate_defaults(opts::ThunkOptions, Tf, Targs)
283299
maybe_default(:checkpoint),
284300
maybe_default(:restore),
285301
maybe_default(:storage),
302+
maybe_default(:storage_root_tag),
303+
maybe_default(:storage_leaf_tag),
304+
maybe_default(:storage_retain),
286305
)
287306
end
288307

@@ -530,9 +549,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
530549
node = unwrap_weak_checked(state.thunk_dict[thunk_id])
531550
if metadata !== nothing
532551
state.worker_time_pressure[pid][proc] = metadata.time_pressure
533-
to_storage = node.options.storage
534-
state.worker_storage_pressure[pid][to_storage] = metadata.storage_pressure
535-
state.worker_storage_capacity[pid][to_storage] = metadata.storage_capacity
552+
#to_storage = fetch(node.options.storage)
553+
#state.worker_storage_pressure[pid][to_storage] = metadata.storage_pressure
554+
#state.worker_storage_capacity[pid][to_storage] = metadata.storage_capacity
536555
state.worker_loadavg[pid] = metadata.loadavg
537556
sig = signature(node, state)
538557
state.signature_time_cost[sig] = (metadata.threadtime + get(state.signature_time_cost, sig, 0)) ÷ 2
@@ -1546,7 +1565,10 @@ function do_task(to_proc, task_desc)
15461565

15471566
# Construct result
15481567
# TODO: We should cache this locally
1549-
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache)
1568+
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache,
1569+
tag=options.storage_root_tag,
1570+
leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()),
1571+
retain=options.storage_retain)
15501572
catch ex
15511573
bt = catch_backtrace()
15521574
RemoteException(myid(), CapturedException(ex, bt))

0 commit comments

Comments
 (0)