Skip to content

Commit 72d7afa

Browse files
committed
Add File and tofile helpers
1 parent 59fae18 commit 72d7afa

File tree

4 files changed

+197
-2
lines changed

4 files changed

+197
-2
lines changed

docs/src/index.md

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,30 @@ or by specifying the `worker` argument to `@mutable`:
110110
A = Dagger.@mutable worker=2 rand(1000, 1000)
111111
```
112112

113-
### Parallel reduction
113+
### Operate on distributed data
114+
115+
Often we want to work with more than one piece of data; the common case of
116+
wanting one piece of data per worker is easy to do by using `@shard`:
117+
118+
```julia
119+
X = Dagger.@shard myid()
120+
```
121+
122+
This will execute `myid()` independently on every worker in your Julia
123+
cluster, and place references to each within a `Shard` object called `X`. We
124+
can then use `X` in task spawning, but we'll only get the result of
125+
`myid()` that corresponds to the worker that the task is running on:
126+
127+
```julia
128+
for w in workers()
129+
@show fetch(Dagger.@spawn scope=Dagger.scope(worker=w) identity(X))
130+
end
131+
```
132+
133+
The above should print the result of `myid()` for each worker in `worker()`, as
134+
`identity(X)` receives only the value of `X` specific to that worker.
135+
136+
### Reducing over distributed data
114137

115138
Reductions are often parallelized by reducing a set of partitions on each
116139
worker, and then reducing those intermediate reductions on a single worker.
@@ -131,14 +154,74 @@ points to a set of histogram bins on each worker. When we `@spawn hist!`,
131154
Dagger passes in the random array and bins for only the specific worker that
132155
the task is run on; i.e. a call to `hist!` that runs on worker 2 will get a
133156
different `A` and `temp_bins` from a call to `hist!` on worker 3. All of the
134-
calls to `hist!` may run in parallel
157+
calls to `hist!` may run in parallel.
135158

136159
By using `map` on `temp_bins`, we then make a copy of each worker's bins that
137160
we can safely return back to our current worker, and sum them together to get
138161
our total histogram.
139162

140163
-----
141164

165+
## Quickstart: File IO
166+
167+
Dagger has support for loading and saving files that integrates seamlessly with
168+
its task system, in the form of `Dagger.File` and `Dagger.tofile`.
169+
170+
!!! warn
171+
These functions are not yet fully tested, so please make sure to take backups of any files that you load with them.
172+
173+
### Loading files from disk
174+
175+
In order to load one or more files from disk, Dagger provides the `File`
176+
function, which creates a lazy reference to a file:
177+
178+
```julia
179+
f = Dagger.File("myfile.jls")
180+
```
181+
182+
`f` is now a lazy reference to `"myfile.jls"`, and its contents can be loaded
183+
automatically by just passing the object to a task:
184+
185+
```julia
186+
wait(Dagger.@spawn println(f))
187+
# Prints the loaded contents of the file
188+
```
189+
190+
By default, `File` assumes that the file uses Julia's Serialization format;
191+
this can be easily changed to assume Arrow format, for example:
192+
193+
```julia
194+
using Arrow
195+
f = Dagger.File("myfile.arrow"; serialize=Arrow.write, deserialize=Arrow.Table)
196+
```
197+
198+
### Writing data to disk
199+
200+
Saving data to disk is as easy as loading it; `tofile` provides this capability
201+
in a similar manner to `File`:
202+
203+
```julia
204+
A = rand(1000)
205+
f = Dagger.tofile(A, "mydata.jls")
206+
```
207+
208+
Like `File`, `f` can still be used to reference the file's data in tasks. It is
209+
likely most useful to use `tofile` at the end of a task to save results:
210+
211+
```julia
212+
function make_data()
213+
A = rand(1000)
214+
return Dagger.tofile(A, "mydata.jls")
215+
end
216+
fetch(Dagger.@spawn make_data())
217+
# Data was also written to "mydata.jls"
218+
```
219+
220+
`tofile` takes the same keyword arguments as `File`, allowing the format of
221+
data on disk to be specified as desired.
222+
223+
-----
224+
142225
## Quickstart: Distributed Arrays
143226

144227
Dagger's `DArray` type represents a distributed array, where a single large

src/file-io.jl

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,83 @@
11
export save, load
22
using SparseArrays
3+
import MemPool: GenericFileDevice, FileRef
4+
5+
struct File
6+
path::String
7+
chunk::Chunk
8+
end
9+
10+
"""
11+
File(path::AbstractString;
12+
serialize::Base.Callable, deserialize::Base.Callable,
13+
use_io::Bool, mmap::Bool) -> Dagger.File
14+
15+
References data in the file at `path`, using the derialization function `deserialize`.
16+
`use_io` specifies whether `deserialize` takes an `IO` (the default) or takes a
17+
`String` path. `mmap` is experimental, and specifies whether `deserialize` can
18+
use MemPool's `MMWrap` memory mapping functionality (default is `false`).
19+
20+
The file at `path` is not yet loaded when the call to `File` returns; passing
21+
it to a task will cause reading to occur, and the result will be passed to the
22+
task.
23+
"""
24+
function File(path::AbstractString;
25+
serialize::Base.Callable=serialize,
26+
deserialize::Base.Callable=deserialize,
27+
use_io::Bool=true,
28+
mmap::Bool=false)
29+
if !isfile(path)
30+
# FIXME: Once MemPool better propagates errors, this check won't be necessary
31+
throw(ArgumentError("`Dagger.File` expected file to exist at \"$path\""))
32+
end
33+
if isabspath(path)
34+
dir, file = dirname(path), basename(path)
35+
else
36+
dir, file = pwd(), basename(path)
37+
end
38+
Tdevice = GenericFileDevice{serialize, deserialize, use_io, mmap}
39+
device = Tdevice(dir)
40+
leaf_tag = MemPool.Tag(Tdevice=>file)
41+
chunk = Dagger.tochunk(FileRef(path), OSProc(), ProcessScope();
42+
restore=true, retain=true, device, leaf_tag)
43+
return File(path, chunk)
44+
end
45+
46+
"""
47+
tofile(data, path::AbstractString;
48+
serialize::Base.Callable, deserialize::Base.Callable,
49+
use_io::Bool, mmap::Bool) -> Dagger.File
50+
51+
Writes `data` to disk at `path`, using the serialization function `serialize`.
52+
`use_io` specifies whether `serialize` takes an `IO` (the default) or takes a
53+
`String` path. `mmap` is experimental, and specifies whether `serialize` can
54+
use MemPool's `MMWrap` memory mapping functionality (default is `false`).
55+
56+
The returned `File` object can be passed to tasks as an argument, or returned
57+
from tasks as a result.
58+
"""
59+
function tofile(@nospecialize(data), path::AbstractString;
60+
serialize::Base.Callable=serialize,
61+
deserialize::Base.Callable=deserialize,
62+
use_io::Bool=true,
63+
mmap::Bool=false)
64+
if isabspath(path)
65+
dir, file = dirname(path), basename(path)
66+
else
67+
dir, file = pwd(), basename(path)
68+
end
69+
Tdevice = GenericFileDevice{serialize, deserialize, use_io, mmap}
70+
device = Tdevice(dir)
71+
chunk = Dagger.tochunk(data, OSProc(), ProcessScope();
72+
retain=true, device, tag=file)
73+
return File(path, chunk)
74+
end
75+
Base.fetch(file::File) = fetch(file.chunk)
76+
Base.show(io::IO, file::File) = print(io, "Dagger.File(path=\"$(file.path)\")")
77+
78+
function move(from_proc::Processor, to_proc::Processor, file::File)
79+
return move(from_proc, to_proc, file.chunk)
80+
end
381

482
"""
583
FileReader

test/file-io.jl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using Serialization
2+
3+
@testset "File IO" begin
4+
@testset "File" begin
5+
data = [1,2,3]
6+
data_path = joinpath(tempdir(), "jl_" * join(rand('a':'z', 8)) * ".jls")
7+
atexit() do
8+
@assert isfile(data_path)
9+
rm(data_path)
10+
end
11+
serialize(data_path, data)
12+
13+
data_c = Dagger.File(data_path)::Dagger.File
14+
@test fetch(data_c) == data
15+
@test fetch(Dagger.@spawn identity(data_c)) == data
16+
17+
@test isfile(data_path)
18+
end
19+
@testset "tofile" begin
20+
data = [4,5,6]
21+
data_path = joinpath(tempdir(), "jl_" * join(rand('a':'z', 8)) * ".jls")
22+
atexit() do
23+
@assert isfile(data_path)
24+
rm(data_path)
25+
end
26+
27+
data_c = Dagger.tofile(data, data_path)::Dagger.File
28+
@test fetch(data_c) == data
29+
@test fetch(Dagger.@spawn identity(data_c)) == data
30+
31+
@test isfile(data_path)
32+
end
33+
end

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ include("domain.jl")
3838
include("array.jl")
3939
include("cache.jl")
4040
include("diskcaching.jl")
41+
include("file-io.jl")
4142

4243
try # TODO: Fault tolerance is sometimes unreliable
4344
#include("fault-tolerance.jl")

0 commit comments

Comments
 (0)