Skip to content

Commit 884d971

Browse files
committed
Eagerly free Thunk cached results when unneeded
Uses the new MemPool destructor functionality to eagerly clean up Thunk cached results once no more EagerThunk handles exist for the Thunk. Will either free the cache immediately, or attempt to do so upon finishing each Thunk.
1 parent 694dd49 commit 884d971

File tree

5 files changed

+103
-24
lines changed

5 files changed

+103
-24
lines changed

Manifest.toml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
julia_version = "1.8.5"
44
manifest_format = "2.0"
5-
project_hash = "5333a6c200b6e6add81c46547527f66ddc0dc16c"
5+
project_hash = "8da7911e4788068aaea8c0ef8589d674bce0fb39"
66

77
[[deps.Artifacts]]
88
uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33"
@@ -12,9 +12,9 @@ uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
1212

1313
[[deps.ChainRulesCore]]
1414
deps = ["Compat", "LinearAlgebra", "SparseArrays"]
15-
git-tree-sha1 = "b66b8f8e3db5d7835fb8cbe2589ffd1cd456e491"
15+
git-tree-sha1 = "2118cb2765f8197b08e5958cdd17c165427425ee"
1616
uuid = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
17-
version = "1.17.0"
17+
version = "1.19.0"
1818

1919
[[deps.ChangesOfVariables]]
2020
deps = ["InverseFunctions", "LinearAlgebra", "Test"]
@@ -24,9 +24,9 @@ version = "0.1.8"
2424

2525
[[deps.Compat]]
2626
deps = ["Dates", "LinearAlgebra", "UUIDs"]
27-
git-tree-sha1 = "8a62af3e248a8c4bad6b32cbbe663ae02275e32c"
27+
git-tree-sha1 = "886826d76ea9e72b35fcd000e535588f7b60f21d"
2828
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
29-
version = "4.10.0"
29+
version = "4.10.1"
3030

3131
[[deps.CompilerSupportLibraries_jll]]
3232
deps = ["Artifacts", "Libdl"]
@@ -100,19 +100,19 @@ uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"
100100

101101
[[deps.MacroTools]]
102102
deps = ["Markdown", "Random"]
103-
git-tree-sha1 = "9ee1618cbf5240e6d4e0371d6f24065083f60c48"
103+
git-tree-sha1 = "b211c553c199c111d998ecdaf7623d1b89b69f93"
104104
uuid = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
105-
version = "0.5.11"
105+
version = "0.5.12"
106106

107107
[[deps.Markdown]]
108108
deps = ["Base64"]
109109
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"
110110

111111
[[deps.MemPool]]
112-
deps = ["DataStructures", "Distributed", "Mmap", "Random", "Serialization", "Sockets"]
113-
git-tree-sha1 = "b9c1a032c3c1310a857c061ce487c632eaa1faa4"
112+
deps = ["DataStructures", "Distributed", "Mmap", "Random", "ScopedValues", "Serialization", "Sockets"]
113+
git-tree-sha1 = "60dd4ac427d39e0b3f15b193845324523ee71c03"
114114
uuid = "f9f48841-c794-520a-933b-121f7ba6ed94"
115-
version = "0.4.4"
115+
version = "0.4.6"
116116

117117
[[deps.Missings]]
118118
deps = ["DataAPI"]
@@ -133,9 +133,9 @@ uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
133133
version = "0.3.20+0"
134134

135135
[[deps.OrderedCollections]]
136-
git-tree-sha1 = "2e73fe17cac3c62ad1aebe70d44c963c3cfdc3e3"
136+
git-tree-sha1 = "dfdf5519f235516220579f949664f1bf44e741c5"
137137
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
138-
version = "1.6.2"
138+
version = "1.6.3"
139139

140140
[[deps.PrecompileTools]]
141141
deps = ["Preferences"]
@@ -173,9 +173,9 @@ version = "0.7.0"
173173

174174
[[deps.ScopedValues]]
175175
deps = ["HashArrayMappedTries", "Logging"]
176-
git-tree-sha1 = "e3b5e4ccb1702db2ae9ac2a660d4b6b2a8595742"
176+
git-tree-sha1 = "c27d546a4749c81f70d1fabd604da6aa5054e3d2"
177177
uuid = "7e506255-f358-4e82-b7e4-beb19740aa63"
178-
version = "1.1.0"
178+
version = "1.2.0"
179179

180180
[[deps.Serialization]]
181181
uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
@@ -189,9 +189,9 @@ uuid = "6462fe0b-24de-5631-8697-dd941f90decc"
189189

190190
[[deps.SortingAlgorithms]]
191191
deps = ["DataStructures"]
192-
git-tree-sha1 = "c60ec5c62180f27efea3ba2908480f8055e17cee"
192+
git-tree-sha1 = "66e0a8e672a0bdfca2c3f5937efb8538b9ddc085"
193193
uuid = "a2af1166-a08f-5f64-846c-94a0d3cef48c"
194-
version = "1.1.1"
194+
version = "1.2.1"
195195

196196
[[deps.SparseArrays]]
197197
deps = ["LinearAlgebra", "Random"]

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2424
[compat]
2525
DataStructures = "0.18"
2626
MacroTools = "0.5"
27-
MemPool = "0.4.5"
27+
MemPool = "0.4.6"
2828
PrecompileTools = "1.2"
2929
Requires = "1"
3030
ScopedValues = "1.1"

src/precompile.jl

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,35 @@
33
add_processor_callback!("__cpu_thread_1__") do
44
ThreadProc(1, 1)
55
end
6-
t1 = @spawn 1+1
6+
# FIXME: t1 = @spawn 1+1
7+
t1 = spawn(+, 1, 1)
8+
fetch(t1)
79
t2 = spawn(+, 1, t1)
810
fetch(t2)
911

10-
# Shutdown scheduler and clean up
11-
spawn() do
12-
Sch.halt!(sch_handle())
12+
# Clean up refs
13+
t1 = nothing; t2 = nothing
14+
state = Sch.EAGER_STATE[]
15+
for i in 1:5
16+
length(state.thunk_dict) == 1 && break
17+
GC.gc()
18+
yield()
1319
end
20+
@assert length(state.thunk_dict) == 1
21+
22+
# Halt scheduler
23+
notify(state.halt)
24+
put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing)))
25+
state = nothing
26+
27+
# Wait for halt
1428
while Sch.EAGER_INIT[]
1529
sleep(0.1)
1630
end
31+
32+
# Final clean-up
1733
Sch.EAGER_CONTEXT[] = nothing
18-
GC.gc()
19-
yield()
34+
GC.gc(); yield()
2035
lock(Sch.ERRORMONITOR_TRACKED) do tracked
2136
if all(t->istaskdone(t) || istaskfailed(t), map(last, tracked))
2237
empty!(tracked)
@@ -26,6 +41,8 @@
2641
@warn "Waiting on $name"
2742
if t.state == :runnable
2843
Base.throwto(t, InterruptException())
44+
else
45+
wait(t)
2946
end
3047
end
3148
end

src/sch/Sch.jl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ Fields:
7272
- `lock::ReentrantLock` - Lock around operations which modify the state
7373
- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
7474
- `errored::WeakKeyDict{Thunk,Bool}` - Indicates if a thunk's result is an error.
75+
- `thunks_to_delete::Set{Thunk}` - The list of `Thunk`s ready to be deleted upon completion.
7576
- `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks.
7677
"""
7778
struct ComputeState
@@ -98,6 +99,7 @@ struct ComputeState
9899
lock::ReentrantLock
99100
futures::Dict{Thunk, Vector{ThunkFuture}}
100101
errored::WeakKeyDict{Thunk,Bool}
102+
thunks_to_delete::Set{Thunk}
101103
chan::RemoteChannel{Channel{Any}}
102104
end
103105

@@ -127,6 +129,7 @@ function start_state(deps::Dict, node_order, chan)
127129
ReentrantLock(),
128130
Dict{Thunk, Vector{ThunkFuture}}(),
129131
WeakKeyDict{Thunk,Bool}(),
132+
Set{Thunk}(),
130133
chan)
131134

132135
for k in sort(collect(keys(deps)), by=node_order)
@@ -590,6 +593,8 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
590593
timespan_start(ctx, :finish, thunk_id, (;thunk_id))
591594
finish_task!(ctx, state, node, thunk_failed)
592595
timespan_finish(ctx, :finish, thunk_id, (;thunk_id))
596+
597+
delete_unused_tasks!(state)
593598
end
594599

595600
safepoint(state)
@@ -931,6 +936,39 @@ function finish_task!(ctx, state, node, thunk_failed)
931936
evict_all_chunks!(ctx, to_evict)
932937
end
933938

939+
function delete_unused_tasks!(state)
940+
to_delete = Thunk[]
941+
for thunk in state.thunks_to_delete
942+
if task_unused(state, thunk)
943+
# Finished and nobody waiting on us, we can be deleted
944+
push!(to_delete, thunk)
945+
end
946+
end
947+
for thunk in to_delete
948+
# Delete all cached data
949+
task_delete!(state, thunk)
950+
951+
pop!(state.thunks_to_delete, thunk)
952+
end
953+
end
954+
function delete_unused_task!(state, thunk)
955+
if task_unused(state, thunk)
956+
# Will not be accessed further, delete all cached data
957+
task_delete!(state, thunk)
958+
return true
959+
else
960+
return false
961+
end
962+
end
963+
task_unused(state, thunk) =
964+
haskey(state.cache, thunk) && !haskey(state.waiting_data, thunk)
965+
function task_delete!(state, thunk)
966+
delete!(state.cache, thunk)
967+
delete!(state.errored, thunk)
968+
delete!(state.valid, thunk)
969+
delete!(state.thunk_dict, thunk.id)
970+
end
971+
934972
function evict_all_chunks!(ctx, to_evict)
935973
if !isempty(to_evict)
936974
@sync for w in map(p->p.pid, procs_to_use(ctx))

src/submission.jl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
8989
thunk = Thunk(f, args...; options...)
9090

9191
# Create a `DRef` to `thunk` so that the caller can preserve it
92-
thunk_ref = poolset(thunk; size=64, device=MemPool.CPURAMDevice())
92+
thunk_ref = poolset(thunk; size=64, device=MemPool.CPURAMDevice(),
93+
destructor=UnrefThunkByUser(thunk))
9394
thunk_id = Sch.ThunkID(thunk.id, thunk_ref)
9495

9596
# Attach `thunk` within the scheduler
@@ -122,6 +123,29 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
122123
return thunk_id
123124
end
124125
end
126+
struct UnrefThunkByUser
127+
thunk::Thunk
128+
end
129+
function (unref::UnrefThunkByUser)()
130+
Sch.errormonitor_tracked("unref thunk $(unref.thunk.id)", Threads.@spawn begin
131+
# This thunk is no longer referenced by the user, mark it as ready to be
132+
# cleaned up as eagerly as possible (or do so now)
133+
thunk = unref.thunk
134+
state = Sch.EAGER_STATE[]
135+
if state === nothing
136+
return
137+
end
138+
139+
@lock state.lock begin
140+
if !Sch.delete_unused_task!(state, thunk)
141+
# Register for deletion upon thunk completion
142+
push!(state.thunks_to_delete, thunk)
143+
end
144+
# TODO: On success, walk down to children, as a fast-path
145+
end
146+
end)
147+
end
148+
125149

126150
# Local -> Remote
127151
function eager_submit!(ntasks, uid, future, finalizer_ref, f, args, options)

0 commit comments

Comments
 (0)