Skip to content

Commit 5bbb9d1

Browse files
authored
Merge pull request #419 from JuliaParallel/jps/storage-tags
Add File and tofile helpers
2 parents f64df52 + 72d7afa commit 5bbb9d1

File tree

6 files changed

+225
-8
lines changed

6 files changed

+225
-8
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2424
ContextVariablesX = "0.1"
2525
DataStructures = "0.18"
2626
MacroTools = "0.5"
27-
MemPool = "0.4.3"
27+
MemPool = "0.4.4"
2828
Requires = "1"
2929
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
3030
TimespanLogging = "0.1"

docs/src/index.md

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,30 @@ or by specifying the `worker` argument to `@mutable`:
110110
A = Dagger.@mutable worker=2 rand(1000, 1000)
111111
```
112112

113-
### Parallel reduction
113+
### Operate on distributed data
114+
115+
Often we want to work with more than one piece of data; the common case of
116+
wanting one piece of data per worker is easy to do by using `@shard`:
117+
118+
```julia
119+
X = Dagger.@shard myid()
120+
```
121+
122+
This will execute `myid()` independently on every worker in your Julia
123+
cluster, and place references to each within a `Shard` object called `X`. We
124+
can then use `X` in task spawning, but we'll only get the result of
125+
`myid()` that corresponds to the worker that the task is running on:
126+
127+
```julia
128+
for w in workers()
129+
@show fetch(Dagger.@spawn scope=Dagger.scope(worker=w) identity(X))
130+
end
131+
```
132+
133+
The above should print the result of `myid()` for each worker in `worker()`, as
134+
`identity(X)` receives only the value of `X` specific to that worker.
135+
136+
### Reducing over distributed data
114137

115138
Reductions are often parallelized by reducing a set of partitions on each
116139
worker, and then reducing those intermediate reductions on a single worker.
@@ -131,14 +154,74 @@ points to a set of histogram bins on each worker. When we `@spawn hist!`,
131154
Dagger passes in the random array and bins for only the specific worker that
132155
the task is run on; i.e. a call to `hist!` that runs on worker 2 will get a
133156
different `A` and `temp_bins` from a call to `hist!` on worker 3. All of the
134-
calls to `hist!` may run in parallel
157+
calls to `hist!` may run in parallel.
135158

136159
By using `map` on `temp_bins`, we then make a copy of each worker's bins that
137160
we can safely return back to our current worker, and sum them together to get
138161
our total histogram.
139162

140163
-----
141164

165+
## Quickstart: File IO
166+
167+
Dagger has support for loading and saving files that integrates seamlessly with
168+
its task system, in the form of `Dagger.File` and `Dagger.tofile`.
169+
170+
!!! warn
171+
These functions are not yet fully tested, so please make sure to take backups of any files that you load with them.
172+
173+
### Loading files from disk
174+
175+
In order to load one or more files from disk, Dagger provides the `File`
176+
function, which creates a lazy reference to a file:
177+
178+
```julia
179+
f = Dagger.File("myfile.jls")
180+
```
181+
182+
`f` is now a lazy reference to `"myfile.jls"`, and its contents can be loaded
183+
automatically by just passing the object to a task:
184+
185+
```julia
186+
wait(Dagger.@spawn println(f))
187+
# Prints the loaded contents of the file
188+
```
189+
190+
By default, `File` assumes that the file uses Julia's Serialization format;
191+
this can be easily changed to assume Arrow format, for example:
192+
193+
```julia
194+
using Arrow
195+
f = Dagger.File("myfile.arrow"; serialize=Arrow.write, deserialize=Arrow.Table)
196+
```
197+
198+
### Writing data to disk
199+
200+
Saving data to disk is as easy as loading it; `tofile` provides this capability
201+
in a similar manner to `File`:
202+
203+
```julia
204+
A = rand(1000)
205+
f = Dagger.tofile(A, "mydata.jls")
206+
```
207+
208+
Like `File`, `f` can still be used to reference the file's data in tasks. It is
209+
likely most useful to use `tofile` at the end of a task to save results:
210+
211+
```julia
212+
function make_data()
213+
A = rand(1000)
214+
return Dagger.tofile(A, "mydata.jls")
215+
end
216+
fetch(Dagger.@spawn make_data())
217+
# Data was also written to "mydata.jls"
218+
```
219+
220+
`tofile` takes the same keyword arguments as `File`, allowing the format of
221+
data on disk to be specified as desired.
222+
223+
-----
224+
142225
## Quickstart: Distributed Arrays
143226

144227
Dagger's `DArray` type represents a distributed array, where a single large

src/file-io.jl

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,83 @@
11
export save, load
22
using SparseArrays
3+
import MemPool: GenericFileDevice, FileRef
4+
5+
struct File
6+
path::String
7+
chunk::Chunk
8+
end
9+
10+
"""
11+
File(path::AbstractString;
12+
serialize::Base.Callable, deserialize::Base.Callable,
13+
use_io::Bool, mmap::Bool) -> Dagger.File
14+
15+
References data in the file at `path`, using the derialization function `deserialize`.
16+
`use_io` specifies whether `deserialize` takes an `IO` (the default) or takes a
17+
`String` path. `mmap` is experimental, and specifies whether `deserialize` can
18+
use MemPool's `MMWrap` memory mapping functionality (default is `false`).
19+
20+
The file at `path` is not yet loaded when the call to `File` returns; passing
21+
it to a task will cause reading to occur, and the result will be passed to the
22+
task.
23+
"""
24+
function File(path::AbstractString;
25+
serialize::Base.Callable=serialize,
26+
deserialize::Base.Callable=deserialize,
27+
use_io::Bool=true,
28+
mmap::Bool=false)
29+
if !isfile(path)
30+
# FIXME: Once MemPool better propagates errors, this check won't be necessary
31+
throw(ArgumentError("`Dagger.File` expected file to exist at \"$path\""))
32+
end
33+
if isabspath(path)
34+
dir, file = dirname(path), basename(path)
35+
else
36+
dir, file = pwd(), basename(path)
37+
end
38+
Tdevice = GenericFileDevice{serialize, deserialize, use_io, mmap}
39+
device = Tdevice(dir)
40+
leaf_tag = MemPool.Tag(Tdevice=>file)
41+
chunk = Dagger.tochunk(FileRef(path), OSProc(), ProcessScope();
42+
restore=true, retain=true, device, leaf_tag)
43+
return File(path, chunk)
44+
end
45+
46+
"""
47+
tofile(data, path::AbstractString;
48+
serialize::Base.Callable, deserialize::Base.Callable,
49+
use_io::Bool, mmap::Bool) -> Dagger.File
50+
51+
Writes `data` to disk at `path`, using the serialization function `serialize`.
52+
`use_io` specifies whether `serialize` takes an `IO` (the default) or takes a
53+
`String` path. `mmap` is experimental, and specifies whether `serialize` can
54+
use MemPool's `MMWrap` memory mapping functionality (default is `false`).
55+
56+
The returned `File` object can be passed to tasks as an argument, or returned
57+
from tasks as a result.
58+
"""
59+
function tofile(@nospecialize(data), path::AbstractString;
60+
serialize::Base.Callable=serialize,
61+
deserialize::Base.Callable=deserialize,
62+
use_io::Bool=true,
63+
mmap::Bool=false)
64+
if isabspath(path)
65+
dir, file = dirname(path), basename(path)
66+
else
67+
dir, file = pwd(), basename(path)
68+
end
69+
Tdevice = GenericFileDevice{serialize, deserialize, use_io, mmap}
70+
device = Tdevice(dir)
71+
chunk = Dagger.tochunk(data, OSProc(), ProcessScope();
72+
retain=true, device, tag=file)
73+
return File(path, chunk)
74+
end
75+
Base.fetch(file::File) = fetch(file.chunk)
76+
Base.show(io::IO, file::File) = print(io, "Dagger.File(path=\"$(file.path)\")")
77+
78+
function move(from_proc::Processor, to_proc::Processor, file::File)
79+
return move(from_proc, to_proc, file.chunk)
80+
end
381

482
"""
583
FileReader

src/sch/Sch.jl

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,16 @@ error will be displayed.
219219
when constructing `Chunk`s (such as when constructing the return value). The
220220
device must support `MemPool.CPURAMResource`. When `nothing`, uses
221221
`MemPool.GLOBAL_DEVICE[]`.
222+
- `storage_root_tag::Any=nothing`: If not `nothing`,
223+
specifies the MemPool storage leaf tag to associate with the thunk's result.
224+
This tag can be used by MemPool's storage devices to manipulate their behavior,
225+
such as the file name used to store data on disk."
226+
- `storage_leaf_tag::MemPool.Tag,Nothing}=nothing`: If not `nothing`,
227+
specifies the MemPool storage leaf tag to associate with the thunk's result.
228+
This tag can be used by MemPool's storage devices to manipulate their behavior,
229+
such as the file name used to store data on disk."
230+
- `storage_retain::Bool=false`: The value of `retain` to pass to
231+
`MemPool.poolset` when constructing the result `Chunk`.
222232
"""
223233
Base.@kwdef struct ThunkOptions
224234
single::Union{Int,Nothing} = nothing
@@ -230,6 +240,9 @@ Base.@kwdef struct ThunkOptions
230240
checkpoint = nothing
231241
restore = nothing
232242
storage::Union{Chunk,Nothing} = nothing
243+
storage_root_tag = nothing
244+
storage_leaf_tag::Union{MemPool.Tag,Nothing} = nothing
245+
storage_retain::Bool = false
233246
end
234247

235248
"""
@@ -249,7 +262,10 @@ function Base.merge(sopts::SchedulerOptions, topts::ThunkOptions)
249262
allow_errors,
250263
topts.checkpoint,
251264
topts.restore,
252-
topts.storage)
265+
topts.storage,
266+
topts.storage_root_tag,
267+
topts.storage_leaf_tag,
268+
topts.storage_retain)
253269
end
254270
Base.merge(sopts::SchedulerOptions, ::Nothing) =
255271
ThunkOptions(sopts.single,
@@ -283,6 +299,9 @@ function populate_defaults(opts::ThunkOptions, Tf, Targs)
283299
maybe_default(:checkpoint),
284300
maybe_default(:restore),
285301
maybe_default(:storage),
302+
maybe_default(:storage_root_tag),
303+
maybe_default(:storage_leaf_tag),
304+
maybe_default(:storage_retain),
286305
)
287306
end
288307

@@ -530,9 +549,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
530549
node = unwrap_weak_checked(state.thunk_dict[thunk_id])
531550
if metadata !== nothing
532551
state.worker_time_pressure[pid][proc] = metadata.time_pressure
533-
to_storage = node.options.storage
534-
state.worker_storage_pressure[pid][to_storage] = metadata.storage_pressure
535-
state.worker_storage_capacity[pid][to_storage] = metadata.storage_capacity
552+
#to_storage = fetch(node.options.storage)
553+
#state.worker_storage_pressure[pid][to_storage] = metadata.storage_pressure
554+
#state.worker_storage_capacity[pid][to_storage] = metadata.storage_capacity
536555
state.worker_loadavg[pid] = metadata.loadavg
537556
sig = signature(node, state)
538557
state.signature_time_cost[sig] = (metadata.threadtime + get(state.signature_time_cost, sig, 0)) ÷ 2
@@ -1546,7 +1565,10 @@ function do_task(to_proc, task_desc)
15461565

15471566
# Construct result
15481567
# TODO: We should cache this locally
1549-
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache)
1568+
send_result || meta ? res : tochunk(res, to_proc; device, persist, cache=persist ? true : cache,
1569+
tag=options.storage_root_tag,
1570+
leaf_tag=something(options.storage_leaf_tag, MemPool.Tag()),
1571+
retain=options.storage_retain)
15501572
catch ex
15511573
bt = catch_backtrace()
15521574
RemoteException(myid(), CapturedException(ex, bt))

test/file-io.jl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using Serialization
2+
3+
@testset "File IO" begin
4+
@testset "File" begin
5+
data = [1,2,3]
6+
data_path = joinpath(tempdir(), "jl_" * join(rand('a':'z', 8)) * ".jls")
7+
atexit() do
8+
@assert isfile(data_path)
9+
rm(data_path)
10+
end
11+
serialize(data_path, data)
12+
13+
data_c = Dagger.File(data_path)::Dagger.File
14+
@test fetch(data_c) == data
15+
@test fetch(Dagger.@spawn identity(data_c)) == data
16+
17+
@test isfile(data_path)
18+
end
19+
@testset "tofile" begin
20+
data = [4,5,6]
21+
data_path = joinpath(tempdir(), "jl_" * join(rand('a':'z', 8)) * ".jls")
22+
atexit() do
23+
@assert isfile(data_path)
24+
rm(data_path)
25+
end
26+
27+
data_c = Dagger.tofile(data, data_path)::Dagger.File
28+
@test fetch(data_c) == data
29+
@test fetch(Dagger.@spawn identity(data_c)) == data
30+
31+
@test isfile(data_path)
32+
end
33+
end

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ include("domain.jl")
3838
include("array.jl")
3939
include("cache.jl")
4040
include("diskcaching.jl")
41+
include("file-io.jl")
4142

4243
try # TODO: Fault tolerance is sometimes unreliable
4344
#include("fault-tolerance.jl")

0 commit comments

Comments
 (0)