Skip to content

Commit a64b7ff

Browse files
authored
Merge pull request #361 from JuliaParallel/kr/enable_caching
`enable_disk_caching` function for handling MemPool storage setup
2 parents eee71ec + bc9808a commit a64b7ff

File tree

8 files changed

+144
-16
lines changed

8 files changed

+144
-16
lines changed

.buildkite/pipeline.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ steps:
7676
version: "1.7"
7777
- JuliaCI/julia-coverage#v1:
7878
codecov: true
79-
command: "julia --project -e 'using Pkg; Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.test(\"TimespanLogging\")'"
79+
command: "julia --project -e 'using Pkg; Pkg.instantiate(); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.test(\"TimespanLogging\")'"
8080
- label: Julia 1.7 - DaggerWebDash
8181
timeout_in_minutes: 60
8282
<<: *test

Manifest.toml

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ uuid = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
1717
version = "1.16.0"
1818

1919
[[deps.ChangesOfVariables]]
20-
deps = ["LinearAlgebra", "Test"]
21-
git-tree-sha1 = "f84967c4497e0e1955f9a582c232b02847c5f589"
20+
deps = ["InverseFunctions", "LinearAlgebra", "Test"]
21+
git-tree-sha1 = "2fba81a302a7be671aefe194f0525ef231104e7f"
2222
uuid = "9e997f8a-9a97-42d5-a9f1-ce6bfc15e2c0"
23-
version = "0.1.7"
23+
version = "0.1.8"
2424

2525
[[deps.ColorTypes]]
2626
deps = ["FixedPointNumbers", "Random"]
@@ -36,9 +36,9 @@ version = "0.12.10"
3636

3737
[[deps.Compat]]
3838
deps = ["Dates", "LinearAlgebra", "UUIDs"]
39-
git-tree-sha1 = "7a60c856b9fa189eb34f5f8a6f6b5529b7942957"
39+
git-tree-sha1 = "4e88377ae7ebeaf29a047aa1ee40826e0b708a5d"
4040
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
41-
version = "4.6.1"
41+
version = "4.7.0"
4242

4343
[[deps.CompilerSupportLibraries_jll]]
4444
deps = ["Artifacts", "Libdl"]
@@ -57,9 +57,9 @@ version = "1.15.0"
5757

5858
[[deps.DataStructures]]
5959
deps = ["Compat", "InteractiveUtils", "OrderedCollections"]
60-
git-tree-sha1 = "d1fff3a548102f48987a52a2e0d114fa97d730f0"
60+
git-tree-sha1 = "cf25ccb972fec4e4817764d01c82386ae94f77b4"
6161
uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
62-
version = "0.18.13"
62+
version = "0.18.14"
6363

6464
[[deps.Dates]]
6565
deps = ["Printf"]
@@ -87,9 +87,9 @@ uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
8787

8888
[[deps.InverseFunctions]]
8989
deps = ["Test"]
90-
git-tree-sha1 = "6667aadd1cdee2c6cd068128b3d226ebc4fb0c67"
90+
git-tree-sha1 = "eabe3125edba5c9c10b60a160b1779a000dc8b29"
9191
uuid = "3587e190-3f89-42d0-90ee-14403ec27112"
92-
version = "0.1.9"
92+
version = "0.1.11"
9393

9494
[[deps.IrrationalConstants]]
9595
git-tree-sha1 = "630b497eafcc20001bba38a4651b327dcfc491d2"
@@ -128,7 +128,9 @@ uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"
128128

129129
[[deps.MemPool]]
130130
deps = ["DataStructures", "Distributed", "Mmap", "Random", "Serialization", "Sockets"]
131-
git-tree-sha1 = "303f55cede4947d0acb325ac0bd7595fbcad6b99"
131+
git-tree-sha1 = "8862fe03a7edf2faa9cf4d0584d453a05377bf9c"
132+
repo-rev = "master"
133+
repo-url = "https://github.com/JuliaData/MemPool.jl.git"
132134
uuid = "f9f48841-c794-520a-933b-121f7ba6ed94"
133135
version = "0.4.2"
134136

@@ -149,9 +151,9 @@ deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"]
149151
uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
150152

151153
[[deps.OrderedCollections]]
152-
git-tree-sha1 = "d321bf2de576bf25ec4d3e4360faca399afca282"
154+
git-tree-sha1 = "2e73fe17cac3c62ad1aebe70d44c963c3cfdc3e3"
153155
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
154-
version = "1.6.0"
156+
version = "1.6.2"
155157

156158
[[deps.Printf]]
157159
deps = ["Unicode"]
@@ -191,9 +193,9 @@ uuid = "6462fe0b-24de-5631-8697-dd941f90decc"
191193

192194
[[deps.SortingAlgorithms]]
193195
deps = ["DataStructures"]
194-
git-tree-sha1 = "a4ada03f999bd01b3a25dcaa30b2d929fe537e00"
196+
git-tree-sha1 = "c60ec5c62180f27efea3ba2908480f8055e17cee"
195197
uuid = "a2af1166-a08f-5f64-846c-94a0d3cef48c"
196-
version = "1.1.0"
198+
version = "1.1.1"
197199

198200
[[deps.SparseArrays]]
199201
deps = ["LinearAlgebra", "Random"]

Project.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ julia = "1.7"
3434

3535
[extras]
3636
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
37+
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
3738

3839
[targets]
39-
test = ["Test"]
40+
test = ["Test", "Pkg"]

src/Dagger.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ include("compute.jl")
3636
include("utils/clock.jl")
3737
include("utils/system_uuid.jl")
3838
include("utils/locked-object.jl")
39+
include("utils/caching.jl")
3940
include("sch/Sch.jl"); using .Sch
4041

4142
# Array computations

src/utils/caching.jl

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""
2+
mem_limit(total::UInt, percentage_limit::Int, nprocs::Int)
3+
4+
Returns the per process mem limit in MiB based on the provided `total` memory,
5+
`percentage_limit` (1-100) and the number of processes in the current setup.
6+
"""
7+
function mem_limit(total::UInt, percentage_limit::Int, nprocs::Int)
8+
return total * percentage_limit / 100 ÷ nprocs
9+
end
10+
11+
"""
12+
enable_disk_caching!(ram_percentage_limit::Int=30, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())
13+
14+
Sets up disk caching on all processes available in the environment according to the provided
15+
limits. The user should provide the percentage, which will decide what's the memory limit on
16+
each participating machine (differentiated by hostname). The disk limit is set strictly per
17+
process and doesn't include any hostname related logic.
18+
"""
19+
function enable_disk_caching!(
20+
ram_percentage_limit::Int=30,
21+
disk_limit_mb::Int=16*2^10,
22+
processes::Vector{Int}=procs(),
23+
)
24+
if !(0 < ram_percentage_limit <= 100)
25+
throw(ArgumentError("Ram limit values must be in (1, 100] range"))
26+
end
27+
28+
total_mem = @static VERSION >= v"1.8-" ? Sys.total_physical_memory : Sys.total_memory
29+
process_info = [
30+
id => remotecall(id, total_mem) do total_mem
31+
return (; total_memory=total_mem(), hostname=gethostname())
32+
end for id in processes
33+
]
34+
35+
machines = Dict{NamedTuple,Vector{Int}}()
36+
for (id, info) in process_info
37+
key = fetch(info)
38+
if key isa NamedTuple
39+
machines[key] = push!(get(machines, key, Int[]), id)
40+
else
41+
@error("Error querying hardware information on process id = $id", ex = key)
42+
end
43+
end
44+
45+
mem_limits = Dict{Int,Int}()
46+
for (info, ids) in machines
47+
for id in ids
48+
mem_limits[id] = mem_limit(info.total_memory, ram_percentage_limit, length(ids))
49+
end
50+
end
51+
52+
return enable_disk_caching!(mem_limits, disk_limit_mb, processes)
53+
end
54+
55+
56+
"""
57+
enable_disk_caching!(mem_limits::Dict{Int,Int}, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())
58+
59+
Sets up disk caching on participating processes.
60+
This is a low level method for applying the `mem_limits` directly onto the processes.
61+
This skips the process discovery stage and the limit calculation.
62+
"""
63+
function enable_disk_caching!(
64+
mem_limits::Dict{Int,Int},
65+
disk_limit_mb::Int=16*2^10,
66+
processes::Vector{Int}=procs(),
67+
)
68+
results = [
69+
remotecall(id) do
70+
!isdefined(Main, :Dagger) && Main.eval(:(using Dagger))
71+
Dagger.MemPool.setup_global_device!(
72+
Dagger.MemPool.DiskCacheConfig(;
73+
toggle=true, membound=mem_limits[id], diskbound=disk_limit_mb * 2^20
74+
),
75+
)
76+
nothing
77+
end for id in processes
78+
]
79+
any_error = false
80+
for (i, id) in enumerate(processes)
81+
r = fetch(results[i])
82+
any_error |= r !== nothing
83+
if r !== nothing
84+
@error("Error setting up disk caching on process id = $id", ex = r)
85+
end
86+
end
87+
88+
return if any_error
89+
@error("Disk cache setup failed")
90+
false
91+
else
92+
true
93+
end
94+
end

test/cache_setup_test.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Distributed
2+
@everywhere using Pkg
3+
@everywhere Pkg.activate("..")
4+
using Dagger
5+
@assert Dagger.enable_disk_caching!()
6+
total_mem = @static VERSION >= v"1.8-" ? Sys.total_physical_memory : Sys.total_memory
7+
expected_mem_limit = Dagger.mem_limit(total_mem(), 30, length(procs()))
8+
applied_mem_limits = fetch.(remotecall.(() -> Dagger.MemPool.GLOBAL_DEVICE[].mem_limit, procs()))
9+
@assert all(applied_mem_limits .== expected_mem_limit)

test/diskcaching.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
@testset "Disk caching setup on multiple processes (single machine)" begin
2+
runs = []
3+
for p in 0:3
4+
j = if p == 0
5+
Cmd(`julia --startup-file=no`)
6+
else
7+
Cmd(`julia --startup-file=no -p $p`)
8+
end
9+
withenv("JULIA_MEMPOOL_EXPERIMENTAL_FANCY_ALLOCATOR"=>nothing,
10+
"JULIA_MEMPOOL_EXPERIMENTAL_MEMORY_BOUND"=>nothing,
11+
"JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"=>nothing,
12+
"JULIA_MEMPOOL_EXPERIMENTAL_DISK_BOUND"=>nothing,
13+
"JULIA_MEMPOOL_EXPERIMENTAL_ALLOCATOR_KIND"=>nothing) do
14+
push!(runs, run(`$j cache_setup_test.jl`; wait=true))
15+
end
16+
end
17+
wait.(runs)
18+
@test all(getproperty.(runs, :exitcode) .== 0)
19+
end

test/runtests.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ include("task-queues.jl")
3737
include("domain.jl")
3838
include("array.jl")
3939
include("cache.jl")
40+
include("diskcaching.jl")
41+
4042
try # TODO: Fault tolerance is sometimes unreliable
4143
#include("fault-tolerance.jl")
4244
catch

0 commit comments

Comments
 (0)