Skip to content

Commit 83fda26

Browse files
committed
update docs
1 parent 5cd08e2 commit 83fda26

File tree

4 files changed

+349
-7
lines changed

4 files changed

+349
-7
lines changed

docs/make.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ makedocs(;
1616
),
1717
pages=[
1818
"Home" => "index.md",
19+
"User guide" => "dtable.md",
20+
"API" => "api.md",
1921
],
2022
)
2123

docs/src/api.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# API
2+
3+
Documentation for [DTables](https://github.com/juliaparallel/DTables.jl).
4+
5+
```@index
6+
```
7+
8+
```@autodocs
9+
Modules = [DTables]
10+
```

docs/src/dtable.md.txt

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
# User guide
2+
3+
## Creating a `DTable`
4+
5+
There are currently two ways of constructing a distributed table:
6+
7+
### Tables.jl source
8+
9+
Provide a `Tables.jl` compatible source, as well as a `chunksize`, which is the
10+
maximum number of rows of each partition:
11+
12+
```julia
13+
julia> using Dagger
14+
15+
julia> table = (a=[1, 2, 3, 4, 5], b=[6, 7, 8, 9, 10]);
16+
17+
julia> d = DTable(table, 2)
18+
DTable with 3 partitions
19+
Tabletype: NamedTuple
20+
21+
julia> fetch(d)
22+
(a = [1, 2, 3, 4, 5], b = [6, 7, 8, 9, 10])
23+
```
24+
25+
### Loader function and file list
26+
27+
Provide a `loader_function` and a list of filenames, which are parts of the
28+
full table:
29+
30+
```julia
31+
julia> using Dagger, CSV
32+
33+
julia> files = ["1.csv", "2.csv", "3.csv"];
34+
35+
julia> d = DTable(CSV.File, files)
36+
DTable with 3 partitions
37+
Tabletype: unknown (use `tabletype!(::DTable)`)
38+
39+
julia> tabletype(d)
40+
NamedTuple
41+
42+
julia> fetch(d)
43+
(a = [1, 2, 1, 2, 1, 2], b = [6, 7, 6, 7, 6, 7])
44+
```
45+
46+
## Underlying table type
47+
48+
The underlying type of the partition is, by default, of the type constructed by
49+
`Tables.materializer(source)`:
50+
51+
```julia
52+
julia> table = (a=[1, 2, 3, 4, 5], b=[6, 7, 8, 9, 10]);
53+
54+
julia> d = DTable(table, 2)
55+
DTable with 3 partitions
56+
Tabletype: NamedTuple
57+
58+
julia> fetch(d)
59+
(a = [1, 2, 3, 4, 5], b = [6, 7, 8, 9, 10])
60+
```
61+
62+
To override the underlying type you can provide a kwarg `tabletype` to the
63+
`DTable` constructor. You can also choose which tabletype the `DTable` should
64+
be fetched into:
65+
66+
```julia
67+
julia> using DataFrames
68+
69+
julia> table = (a=[1, 2, 3, 4, 5], b=[6, 7, 8, 9, 10]);
70+
71+
julia> d = DTable(table, 2; tabletype=DataFrame)
72+
DTable with 3 partitions
73+
Tabletype: DataFrame
74+
75+
julia> fetch(d)
76+
5×2 DataFrame
77+
Row │ a b
78+
│ Int64 Int64
79+
─────┼──────────────
80+
1 │ 1 6
81+
2 │ 2 7
82+
3 │ 3 8
83+
4 │ 4 9
84+
5 │ 5 10
85+
86+
julia> fetch(d, NamedTuple)
87+
(a = [1, 2, 3, 4, 5], b = [6, 7, 8, 9, 10])
88+
```
89+
90+
# Table operations
91+
92+
**Warning: this interface is experimental and may change at any time**
93+
94+
The current set of operations available consist of three simple functions:
95+
`map`, `filter` and `reduce`.
96+
97+
Below is an example of their usage.
98+
99+
For more information please refer to the API documentation and unit tests.
100+
101+
```julia
102+
julia> using Dagger
103+
104+
julia> d = DTable((k = repeat(['a', 'b'], 500), v = repeat(1:10, 100)), 100)
105+
DTable with 10 partitions
106+
Tabletype: NamedTuple
107+
108+
julia> using DataFrames
109+
110+
julia> m = map(x -> (t = x.k + x.v, v = x.v), d)
111+
DTable with 10 partitions
112+
Tabletype: NamedTuple
113+
114+
julia> fetch(m, DataFrame)
115+
1000×2 DataFrame
116+
Row │ t v
117+
│ Char Int64
118+
──────┼─────────────
119+
1 │ b 1
120+
2 │ d 2
121+
3 │ d 3
122+
⋮ │ ⋮ ⋮
123+
999 │ j 9
124+
1000 │ l 10
125+
995 rows omitted
126+
127+
julia> f = filter(x -> x.t == 'd', m)
128+
DTable with 10 partitions
129+
Tabletype: NamedTuple
130+
131+
julia> fetch(f, DataFrame)
132+
200×2 DataFrame
133+
Row │ t v
134+
│ Char Int64
135+
─────┼─────────────
136+
1 │ d 2
137+
2 │ d 3
138+
⋮ │ ⋮ ⋮
139+
200 │ d 3
140+
197 rows omitted
141+
142+
julia> r = reduce(+, m, cols=[:v])
143+
EagerThunk (running)
144+
145+
julia> fetch(r)
146+
(v = 5500,)
147+
```
148+
149+
# Dagger.groupby interface
150+
151+
A `DTable` can be grouped which will result in creation of a `GDTable`.
152+
A distinct set of values contained in a single or multiple columns can be used as grouping keys.
153+
If a transformation of a row needs to be performed in order to obtain the grouping key there's
154+
also an option to provide a custom function returning a key, which is applied per row.
155+
156+
The set of keys the `GDTable` is grouped by can be obtained using
157+
the `keys(gd::GDTable)` function. To get a fragment of the `GDTable` containing
158+
records belonging under a single key the `getindex(gd::GDTable, key)` function can be used.
159+
160+
```julia
161+
julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
162+
DTable with 16 partitions
163+
Tabletype: NamedTuple
164+
165+
julia> Dagger.groupby(d, :a)
166+
GDTable with 4 partitions and 4 keys
167+
Tabletype: NamedTuple
168+
Grouped by: [:a]
169+
170+
julia> Dagger.groupby(d, [:a, :b])
171+
GDTable with 16 partitions and 16 keys
172+
Tabletype: NamedTuple
173+
Grouped by: [:a, :b]
174+
175+
julia> Dagger.groupby(d, row -> row.a + row.b)
176+
GDTable with 7 partitions and 7 keys
177+
Tabletype: NamedTuple
178+
Grouped by: #5
179+
180+
julia> g = Dagger.groupby(d, :a); keys(g)
181+
KeySet for a Dict{Char, Vector{UInt64}} with 4 entries. Keys:
182+
'c'
183+
'd'
184+
'a'
185+
'b'
186+
187+
julia> g['c']
188+
DTable with 1 partitions
189+
Tabletype: NamedTuple
190+
```
191+
192+
## GDTable operations
193+
194+
Operations such as `map`, `filter`, `reduce` can be performed on a `GDTable`
195+
196+
```julia
197+
julia> g = Dagger.groupby(d, [:a, :b])
198+
GDTable with 16 partitions and 16 keys
199+
Tabletype: NamedTuple
200+
Grouped by: [:a, :b]
201+
202+
julia> f = filter(x -> x.a != 'd', g)
203+
GDTable with 16 partitions and 16 keys
204+
Tabletype: NamedTuple
205+
Grouped by: [:a, :b]
206+
207+
julia> trim!(f)
208+
GDTable with 12 partitions and 12 keys
209+
Tabletype: NamedTuple
210+
Grouped by: [:a, :b]
211+
212+
julia> m = map(r -> (a = r.a, b = r.b, c = r.b .- 3), f)
213+
GDTable with 12 partitions and 12 keys
214+
Tabletype: NamedTuple
215+
Grouped by: [:a, :b]
216+
217+
julia> r = reduce(*, m)
218+
EagerThunk (running)
219+
220+
julia> DataFrame(fetch(r))
221+
12×5 DataFrame
222+
Row │ a b result_a result_b result_c
223+
│ Char Int64 String Int64 Int64
224+
─────┼───────────────────────────────────────────
225+
1 │ a 1 aaaa 1 16
226+
2 │ c 3 ccc 27 0
227+
3 │ a 3 aa 9 0
228+
4 │ b 4 bbbb 256 1
229+
5 │ c 4 cccc 256 1
230+
6 │ b 2 bbbb 16 1
231+
7 │ b 1 bbbb 1 16
232+
8 │ a 2 aaa 8 -1
233+
9 │ a 4 aaaaaaa 16384 1
234+
10 │ b 3 bbbb 81 0
235+
11 │ c 2 ccccc 32 -1
236+
12 │ c 1 cccc 1 16
237+
```
238+
239+
## Iterating over a GDTable
240+
241+
`GDTable` can be iterated over and each element returned will be a pair of key
242+
and a `DTable` containing all rows associated with that grouping key.
243+
244+
```julia
245+
julia> d = DTable((a=repeat('a':'b', inner=2),b=1:4), 2)
246+
DTable with 2 partitions
247+
Tabletype: NamedTuple
248+
249+
julia> g = Dagger.groupby(d, :a)
250+
GDTable with 2 partitions and 2 keys
251+
Tabletype: NamedTuple
252+
Grouped by: [:a]
253+
254+
julia> for (key, dt) in g
255+
println("Key: $key")
256+
println(fetch(dt, DataFrame))
257+
end
258+
Key: a
259+
2×2 DataFrame
260+
Row │ a b
261+
│ Char Int64
262+
─────┼─────────────
263+
1 │ a 1
264+
2 │ a 2
265+
Key: b
266+
2×2 DataFrame
267+
Row │ a b
268+
│ Char Int64
269+
─────┼─────────────
270+
1 │ b 3
271+
2 │ b 4
272+
```
273+
274+
# Joins
275+
276+
There are two join methods available currently: `leftjoin` and `innerjoin`.
277+
The interface is aiming to be compatible with the `DataFrames.jl` join interface, but for now it only supports
278+
the `on` keyword argument with symbol input. More keyword arguments known from `DataFrames` may be introduced in the future.
279+
280+
It's possible to perform a join on a `DTable` and any `Tables.jl` compatible table type.
281+
Joining two `DTable`s is also supported and it will leverage the fact that the second `DTable` is partitioned during the joining process.
282+
283+
There are several options to make your joins faster by providing additional information about the tables.
284+
It can be done by using the following keyword arguments:
285+
286+
- `l_sorted`: To indicate the left table is sorted - only useful if the `r_sorted` is set to `true` as well.
287+
- `r_sorted`: To indicate the right table is sorted.
288+
- `r_unique`: To indicate the right table only contains unique keys.
289+
- `lookup`: To provide a dict-like structure that will allow for quicker matching of inner rows. The structure needs to contain keys in form of a `Tuple` of the matched columns and values in form of type `Vector{UInt}` containing the related row indices.
290+
291+
Currently there is a special case available where joining a `DTable` (with `DataFrame` as the underlying table type) with a `DataFrame` will use
292+
the join functions coming from the `DataFrames.jl` package for the per chunk joins.
293+
In the future this behavior will be expanded to any type that implements its own join methods, but for now is limited to `DataFrame` only.
294+
295+
Please note that the usage of any of the keyword arguments described above will always result in the usage of generic join methods
296+
defined in `Dagger` regardless of the availability of specialized methods.
297+
298+
```julia
299+
julia> using Tables; pp = d -> for x in Tables.rows(d) println("$(x.a), $(x.b), $(x.c)") end;
300+
301+
julia> d1 = (a=collect(1:6), b=collect(1:6));
302+
303+
julia> d2 = (a=collect(2:5), c=collect(-2:-1:-5));
304+
305+
julia> dt = DTable(d1, 2)
306+
DTable with 3 partitions
307+
Tabletype: NamedTuple
308+
309+
julia> pp(leftjoin(dt, d2, on=:a))
310+
2, 2, -2
311+
1, 1, missing
312+
3, 3, -3
313+
4, 4, -4
314+
5, 5, -5
315+
6, 6, missing
316+
317+
julia> pp(innerjoin(dt, d2, on=:a))
318+
2, 2, -2
319+
3, 3, -3
320+
4, 4, -4
321+
5, 5, -5
322+
```

docs/src/index.md

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@
22
CurrentModule = DTables
33
```
44

5-
# DTables
5+
# Distributed table
66

7-
Documentation for [DTables](https://github.com/juliaparallel/DTables.jl).
7+
The `DTable`, or "distributed table", is an abstraction layer on top of Dagger
8+
that allows loading table-like structures into a distributed environment. The
9+
main idea is that a Tables.jl-compatible source provided by the user gets
10+
partitioned into several parts and stored as `Chunk`s. These can then be
11+
distributed across worker processes by the scheduler as operations are
12+
performed on the containing `DTable`.
813

9-
```@index
10-
```
14+
Operations performed on a `DTable` leverage the fact that the table is
15+
partitioned, and will try to apply functions per-partition first, afterwards
16+
merging the results if needed.
17+
18+
The distributed table is backed by Dagger's Eager API (`Dagger.@spawn` and
19+
`Dagger.spawn`). To provide a familiar usage pattern you can call `fetch` on a
20+
`DTable` instance, which returns an in-memory instance of the underlying table
21+
type (such as a `DataFrame`, `TypedTable`, etc).
1122

12-
```@autodocs
13-
Modules = [DTables]
14-
```

0 commit comments

Comments
 (0)