1
- import Base:
2
- collect, eltype, fetch, getproperty, isready, iterate, length, names, propertynames, show, wait
3
- import SentinelArrays
4
- import TableOperations
5
- import Tables
6
-
7
- export DTable, tabletype, tabletype!, trim, trim!, leftjoin, innerjoin, DTableColumn
8
-
9
1
const VTYPE = Vector{Union{Dagger. Chunk,Dagger. EagerThunk}}
10
2
11
3
"""
@@ -20,7 +12,7 @@ the underlying partitions was applied to it (currently only `filter`).
20
12
mutable struct DTable
21
13
chunks:: VTYPE
22
14
tabletype
23
- schema:: Union{Nothing,Tables. Schema}
15
+ schema:: Union{Nothing,Schema}
24
16
end
25
17
26
18
DTable (chunks:: Vector , tabletype) = DTable (VTYPE (chunks), tabletype, nothing )
@@ -30,15 +22,15 @@ DTable(chunks::Vector, tabletype, schema) = DTable(VTYPE(chunks), tabletype, sch
30
22
DTable(table; tabletype=nothing) -> DTable
31
23
32
24
Constructs a `DTable` using a `Tables.jl`-compatible input `table`.
33
- Calls `Tables. partitions` on `table` and assumes the provided partitioning.
25
+ Calls `partitions` on `table` and assumes the provided partitioning.
34
26
"""
35
27
function DTable (table; tabletype= nothing )
36
28
chunks = Vector {Dagger.Chunk} ()
37
29
type = nothing
38
30
sink = nothing
39
- for partition in Tables . partitions (table)
31
+ for partition in partitions (table)
40
32
if sink === nothing
41
- sink = Tables . materializer (tabletype != = nothing ? tabletype () : partition)
33
+ sink = materializer (tabletype != = nothing ? tabletype () : partition)
42
34
end
43
35
44
36
tpart = sink (partition)
@@ -73,25 +65,25 @@ function DTable(table, chunksize::Integer; tabletype=nothing, interpartition_mer
73
65
leftovers = nothing
74
66
leftovers_length = 0
75
67
76
- for partition in Tables . partitions (table)
68
+ for partition in partitions (table)
77
69
if sink === nothing
78
- sink = Tables . materializer (tabletype != = nothing ? tabletype () : partition)
70
+ sink = materializer (tabletype != = nothing ? tabletype () : partition)
79
71
end
80
72
81
73
if interpartition_merges && leftovers != = nothing
82
- inner_partitions = Tables . partitions (
74
+ inner_partitions = partitions (
83
75
TableOperations. makepartitions (sink (partition), chunksize - leftovers_length)
84
76
)
85
77
86
78
merged_data = sink (
87
79
TableOperations. joinpartitions (
88
- Tables . partitioner (identity, [leftovers, sink (first (inner_partitions))])
80
+ partitioner (identity, [leftovers, sink (first (inner_partitions))])
89
81
),
90
82
)
91
83
92
84
if length (inner_partitions) == 1
93
85
leftovers = merged_data
94
- leftovers_length = Tables . length (Tables . rows (leftovers))
86
+ leftovers_length = length (rows (leftovers))
95
87
if leftovers_length == chunksize
96
88
# sometimes the next partition will be exactly the size of
97
89
# the chunksize - leftovers_length, so perfect match
@@ -105,28 +97,26 @@ function DTable(table, chunksize::Integer; tabletype=nothing, interpartition_mer
105
97
leftovers = nothing
106
98
leftovers_length = 0
107
99
partition = TableOperations. joinpartitions (
108
- Tables . partitioner (identity, Iterators. drop (inner_partitions, 1 ))
100
+ partitioner (identity, Iterators. drop (inner_partitions, 1 ))
109
101
)
110
102
end
111
103
end
112
104
113
- inner_partitions = Tables. partitions (
114
- TableOperations. makepartitions (sink (partition), chunksize)
115
- )
105
+ inner_partitions = partitions (TableOperations. makepartitions (sink (partition), chunksize))
116
106
117
107
for inner_partition in inner_partitions
118
108
chunk_data = sink (inner_partition)
119
- chunk_data_rows = Tables . rows (chunk_data)
109
+ chunk_data_rows = rows (chunk_data)
120
110
121
111
if (
122
112
interpartition_merges &&
123
113
Base. haslength (chunk_data_rows) &&
124
- Tables . length (chunk_data_rows) < chunksize
114
+ length (chunk_data_rows) < chunksize
125
115
)
126
116
# this is the last chunk with fewer than requested records
127
117
# merge it with the first of the next partition
128
118
leftovers = chunk_data
129
- leftovers_length = Tables . length (chunk_data_rows)
119
+ leftovers_length = length (chunk_data_rows)
130
120
else
131
121
push! (chunks, Dagger. tochunk (chunk_data))
132
122
end
158
148
159
149
function _file_load (filename:: AbstractString , loader_function:: Function , tabletype:: Any )
160
150
part = loader_function (filename)
161
- sink = Tables . materializer (tabletype === nothing ? part : tabletype ())
151
+ sink = materializer (tabletype === nothing ? part : tabletype ())
162
152
tpart = sink (part)
163
153
return tpart
164
154
end
@@ -172,7 +162,7 @@ instance of the underlying table type.
172
162
Fetching an empty DTable results in returning an empty `NamedTuple` regardless of the underlying `tabletype`.
173
163
"""
174
164
function fetch (d:: DTable )
175
- sink = Tables . materializer (tabletype (d)())
165
+ sink = materializer (tabletype (d)())
176
166
return sink (retrieve_partitions (d))
177
167
end
178
168
@@ -187,7 +177,7 @@ fetch(d::DTable, sink) = sink(retrieve_partitions(d))
187
177
function retrieve_partitions (d:: DTable )
188
178
d2 = trim (d)
189
179
return if nchunks (d2) > 0
190
- TableOperations. joinpartitions (Tables . partitioner (retrieve, d2. chunks))
180
+ TableOperations. joinpartitions (partitioner (retrieve, d2. chunks))
191
181
else
192
182
NamedTuple ()
193
183
end
@@ -229,7 +219,7 @@ function resolve_tabletype(d::DTable)
229
219
end
230
220
231
221
function isnonempty (chunk)
232
- return length (Tables . rows (chunk)) > 0 && length (Tables . columnnames (chunk)) > 0
222
+ return length (rows (chunk)) > 0 && length (columnnames (chunk)) > 0
233
223
end
234
224
235
225
"""
@@ -260,7 +250,7 @@ function show(io::IO, ::MIME"text/plain", d::DTable)
260
250
end
261
251
262
252
function chunk_lengths (table:: DTable )
263
- f = x -> length (Tables . rows (x))
253
+ f = x -> length (rows (x))
264
254
return fetch .([Dagger. @spawn f (c) for c in table. chunks])
265
255
end
266
256
@@ -276,27 +266,31 @@ end
276
266
@inline nchunks (d:: DTable ) = length (d. chunks)
277
267
278
268
function merge_chunks (sink, chunks)
279
- return sink (TableOperations. joinpartitions (Tables . partitioner (retrieve, chunks)))
269
+ return sink (TableOperations. joinpartitions (partitioner (retrieve, chunks)))
280
270
end
281
271
282
- Base . names (dt:: DTable ) = string .(columnnames_svector (dt))
283
- Base . propertynames (dt:: DTable ) = columnnames_svector (dt)
272
+ names (dt:: DTable ) = string .(columnnames_svector (dt))
273
+ propertynames (dt:: DTable ) = columnnames_svector (dt)
284
274
285
- function Base . wait (dt:: DTable )
275
+ function wait (dt:: DTable )
286
276
for ch in dt. chunks
287
277
! (ch isa Dagger. Chunk) && wait (ch)
288
278
end
289
279
return nothing
290
280
end
291
281
292
- function Base . isready (dt:: DTable )
282
+ function isready (dt:: DTable )
293
283
return all ([ch isa Dagger. Chunk ? true : (isready (ch); true ) for ch in dt. chunks])
294
284
end
295
285
296
- function Base . getproperty (dt:: DTable , s:: Symbol )
286
+ function getproperty (dt:: DTable , s:: Symbol )
297
287
if s in fieldnames (DTable)
298
288
return getfield (dt, s)
299
289
else
300
290
return DTableColumn (dt, s)
301
291
end
302
292
end
293
+
294
+ ncol (d:: DTable ) = length (columns (d))
295
+ nrow (d:: DTable ) = length (d)
296
+ index (df:: DTable ) = Index (columnnames_svector (df))
0 commit comments