Skip to content

Commit 0543fc6

Browse files
committed
fixes to parquet and known cache
1 parent 2576617 commit 0543fc6

File tree

4 files changed

+75
-76
lines changed

4 files changed

+75
-76
lines changed

fsspec/caching.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,6 @@ def _fetch(self, start: int | None, stop: int | None) -> bytes:
668668
if stop is None:
669669
stop = self.size
670670
self.total_requested_bytes += stop - start
671-
672671
out = b""
673672
started = False
674673
loc_old = 0
@@ -699,11 +698,13 @@ def _fetch(self, start: int | None, stop: int | None) -> bytes:
699698
elif loc0 <= stop <= loc1:
700699
# end block
701700
self.hit_count += 1
702-
return out + self.data[(loc0, loc1)][: stop - loc0]
701+
out = out + self.data[(loc0, loc1)][: stop - loc0]
702+
return out
703703
loc_old = loc1
704704
self.miss_count += 1
705705
if started and not self.strict:
706-
return out + b"\x00" * (stop - loc_old)
706+
out = out + b"\x00" * (stop - loc_old)
707+
return out
707708
raise ValueError
708709

709710

fsspec/parquet.py

Lines changed: 41 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import io
22
import json
33
import warnings
4-
from typing import Literal
54

65
import fsspec
76

@@ -25,7 +24,6 @@ def _fetch_range(self, start, end):
2524

2625
def open_parquet_files(
2726
path: list[str],
28-
mode: Literal["rb"] = "rb",
2927
fs: None | fsspec.AbstractFileSystem = None,
3028
metadata=None,
3129
columns: None | list[str] = None,
@@ -54,8 +52,6 @@ def open_parquet_files(
5452
----------
5553
path: str
5654
Target file path.
57-
mode: str, optional
58-
Mode option to be passed through to `fs.open`. Default is "rb".
5955
metadata: Any, optional
6056
Parquet metadata object. Object type must be supported
6157
by the backend parquet engine. For now, only the "fastparquet"
@@ -150,16 +146,16 @@ def open_parquet_files(
150146
AlreadyBufferedFile(
151147
fs=None,
152148
path=fn,
153-
mode=mode,
149+
mode="rb",
154150
cache_type="parts",
155151
cache_options={
156152
**options,
157-
"data": data.get(fn, {}),
153+
"data": ranges,
158154
},
159-
size=max(_[1] for _ in data.get(fn, {})),
155+
size=max(_[1] for _ in ranges),
160156
**kwargs,
161157
)
162-
for fn in data
158+
for fn, ranges in data.items()
163159
]
164160

165161

@@ -197,7 +193,7 @@ def _get_parquet_byte_ranges(
197193
if isinstance(engine, str):
198194
engine = _set_engine(engine)
199195

200-
# Pass to specialized function if metadata is defined
196+
# Pass to a specialized function if metadata is defined
201197
if metadata is not None:
202198
# Use the provided parquet metadata object
203199
# to avoid transferring/parsing footer metadata
@@ -212,63 +208,54 @@ def _get_parquet_byte_ranges(
212208
filters=filters,
213209
)
214210

215-
# Get file sizes asynchronously
216-
file_sizes = fs.sizes(paths)
217-
218211
# Populate global paths, starts, & ends
219-
result = {}
220-
data_paths = []
221-
data_starts = []
222-
data_ends = []
223-
add_header_magic = True
224212
if columns is None and row_groups is None and filters is None:
225213
# We are NOT selecting specific columns or row-groups.
226214
#
227215
# We can avoid sampling the footers, and just transfer
228216
# all file data with cat_ranges
229-
for i, path in enumerate(paths):
230-
result[path] = {}
231-
data_paths.append(path)
232-
data_starts.append(0)
233-
data_ends.append(file_sizes[i])
234-
add_header_magic = False # "Magic" should already be included
217+
result = {path: {(0, len(data)): data} for path, data in fs.cat(paths).items()}
235218
else:
236219
# We ARE selecting specific columns or row-groups.
237220
#
221+
# Get file sizes asynchronously
222+
file_sizes = fs.sizes(paths)
223+
data_paths = []
224+
data_starts = []
225+
data_ends = []
238226
# Gather file footers.
239227
# We just take the last `footer_sample_size` bytes of each
240228
# file (or the entire file if it is smaller than that)
241-
footer_starts = []
242-
footer_ends = []
243-
for i, path in enumerate(paths):
244-
footer_ends.append(file_sizes[i])
245-
sample_size = max(0, file_sizes[i] - footer_sample_size)
246-
footer_starts.append(sample_size)
247-
footer_samples = fs.cat_ranges(paths, footer_starts, footer_ends)
229+
footer_starts = [
230+
max(0, file_size - footer_sample_size) for file_size in file_sizes
231+
]
232+
footer_samples = fs.cat_ranges(paths, footer_starts, file_sizes)
248233

249234
# Check our footer samples and re-sample if necessary.
250-
missing_footer_starts = footer_starts.copy()
251-
large_footer = 0
235+
large_footer = []
252236
for i, path in enumerate(paths):
253237
footer_size = int.from_bytes(footer_samples[i][-8:-4], "little")
254238
real_footer_start = file_sizes[i] - (footer_size + 8)
255239
if real_footer_start < footer_starts[i]:
256-
missing_footer_starts[i] = real_footer_start
257-
large_footer = max(large_footer, (footer_size + 8))
240+
large_footer.append((i, real_footer_start))
258241
if large_footer:
259242
warnings.warn(
260243
f"Not enough data was used to sample the parquet footer. "
261244
f"Try setting footer_sample_size >= {large_footer}."
262245
)
263-
for i, block in enumerate(
264-
fs.cat_ranges(
265-
paths,
266-
missing_footer_starts,
267-
footer_starts,
268-
)
269-
):
246+
path0 = [paths[i] for i, _ in large_footer]
247+
starts = [_[1] for _ in large_footer]
248+
ends = [file_sizes[i] - footer_sample_size for i, _ in large_footer]
249+
data = fs.cat_ranges(path0, starts, ends)
250+
for i, (path, start, block) in enumerate(zip(path0, starts, data)):
270251
footer_samples[i] = block + footer_samples[i]
271-
footer_starts[i] = missing_footer_starts[i]
252+
footer_starts[i] = start
253+
result = {
254+
path: {(start, size): data}
255+
for path, start, size, data in zip(
256+
paths, footer_starts, file_sizes, footer_samples
257+
)
258+
}
272259

273260
# Calculate required byte ranges for each path
274261
for i, path in enumerate(paths):
@@ -284,9 +271,6 @@ def _get_parquet_byte_ranges(
284271
data_paths += [path] * len(path_data_starts)
285272
data_starts += path_data_starts
286273
data_ends += path_data_ends
287-
result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = (
288-
footer_samples[i]
289-
)
290274

291275
# Merge adjacent offset ranges
292276
data_paths, data_starts, data_ends = merge_offset_ranges(
@@ -295,19 +279,14 @@ def _get_parquet_byte_ranges(
295279
data_ends,
296280
max_gap=max_gap,
297281
max_block=max_block,
298-
sort=False, # Should already be sorted
282+
sort=True,
299283
)
300284

301-
# Start by populating `result` with footer samples
302-
for i, path in enumerate(paths):
303-
result[path] = {(footer_starts[i], footer_ends[i]): footer_samples[i]}
285+
# Transfer the data byte-ranges into local memory
286+
_transfer_ranges(fs, result, data_paths, data_starts, data_ends)
304287

305-
# Transfer the data byte-ranges into local memory
306-
_transfer_ranges(fs, result, data_paths, data_starts, data_ends)
307-
308-
# Add b"PAR1" to header if necessary
309-
if add_header_magic:
310-
_add_header_magic(result)
288+
# Add b"PAR1" to headers
289+
_add_header_magic(result)
311290

312291
return result
313292

@@ -362,7 +341,7 @@ def _transfer_ranges(fs, blocks, paths, starts, ends):
362341

363342
def _add_header_magic(data):
364343
# Add b"PAR1" to file headers
365-
for path in list(data.keys()):
344+
for path in list(data):
366345
add_magic = True
367346
for k in data[path]:
368347
if k[0] == 0 and k[1] >= 4:
@@ -419,9 +398,6 @@ def __init__(self):
419398

420399
self.fp = fp
421400

422-
def _row_group_filename(self, row_group, pf):
423-
return pf.row_group_filename(row_group)
424-
425401
def _parquet_byte_ranges(
426402
self,
427403
columns,
@@ -476,7 +452,7 @@ def _parquet_byte_ranges(
476452
# specific row-groups
477453
if row_group_indices is None or r in row_group_indices:
478454
# Find the target parquet-file path for `row_group`
479-
fn = self._row_group_filename(row_group, pf)
455+
fn = pf.row_group_filename(row_group)
480456

481457
for column in row_group.columns:
482458
name = column.meta_data.path_in_schema
@@ -515,9 +491,6 @@ def __init__(self):
515491

516492
self.pq = pq
517493

518-
def _row_group_filename(self, row_group, metadata):
519-
raise NotImplementedError
520-
521494
def _parquet_byte_ranges(
522495
self,
523496
columns,
@@ -530,6 +503,7 @@ def _parquet_byte_ranges(
530503
if metadata is not None:
531504
raise ValueError("metadata input not supported for PyarrowEngine")
532505
if filters:
506+
# there must be a way!
533507
raise NotImplementedError
534508

535509
data_starts, data_ends = [], []
@@ -555,7 +529,7 @@ def _parquet_byte_ranges(
555529
column_set |= set(md_index)
556530
if column_set is not None:
557531
column_set = [
558-
_ if isinstance(_, list) else _.split(".") for _ in column_set
532+
_[:1] if isinstance(_, list) else _.split(".")[:1] for _ in column_set
559533
]
560534

561535
# Loop through column chunks to add required byte ranges
@@ -580,15 +554,15 @@ def _parquet_byte_ranges(
580554
]
581555
if _ is not None
582556
)
583-
if footer_start is None or file_offset0 < footer_start:
557+
if file_offset0 < footer_start:
584558
data_starts.append(file_offset0)
585559
data_ends.append(
586560
min(
587561
meta["total_compressed_size"] + file_offset0,
588-
footer_start
589-
or (meta["total_compressed_size"] + file_offset0),
562+
footer_start,
590563
)
591564
)
565+
592566
data_starts.append(footer_start)
593567
data_ends.append(footer_start + len(footer))
594568
return data_starts, data_ends

fsspec/tests/test_parquet.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,17 +207,31 @@ def test_multiple(tmpdir):
207207
assert expect.equals(result)
208208

209209

210-
@pytest.mark.parametrize("n", [100, 10_000, 1_000_000])
210+
@pytest.mark.parametrize("n", [1_000, 1_000_000])
211211
def test_nested(n, tmpdir, engine):
212212
path = os.path.join(str(tmpdir), "test.parquet")
213213
pa = pytest.importorskip("pyarrow")
214214
flat = pa.array([random.random() for _ in range(n)])
215-
a = random.random()
216-
b = random.random()
217-
nested = pa.array([{"a": a, "b": b} for _ in range(n)])
215+
nested = pa.array([{"a": random.random(), "b": random.random()} for _ in range(n)])
216+
data = [float(_[0]) for _ in nested]
218217
table = pa.table({"flat": flat, "nested": nested})
219218
pq.write_table(table, path)
220219
with open_parquet_file(path, columns=["nested.a"], engine=engine) as fh:
221220
col = pd.read_parquet(fh, engine=engine, columns=["nested.a"])
222221
name = "a" if engine == "pyarrow" else "nested.a"
223-
assert (col[name] == a).all()
222+
assert (col[name] == data).all()
223+
224+
225+
@PYARROW_MARK
226+
def test_nested_arrow_nodict(tmpdir):
227+
pa = pytest.importorskip("pyarrow")
228+
n = 1_000_000
229+
path = os.path.join(str(tmpdir), "test.parquet")
230+
flat = pa.array([random.random() for _ in range(n)])
231+
nested = pa.array([{"a": random.random(), "b": random.random()} for _ in range(n)])
232+
data = [float(_[0]) for _ in nested]
233+
table = pa.table({"flat": flat, "nested": nested})
234+
pq.write_table(table, path, use_dictionary=False)
235+
with open_parquet_file(path, columns=["nested"], engine="pyarrow") as fh:
236+
col = pd.read_parquet(fh, engine="pyarrow", columns=["nested.a"])
237+
assert (col["a"] == data).all()

fsspec/utils.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,16 @@ def merge_offset_ranges(
566566
)
567567
)
568568
)
569+
remove = []
570+
for i, (path, start, end) in enumerate(zip(paths, starts, ends)):
571+
if any(
572+
p == path and start >= s and end <= e and i != i2
573+
for i2, (p, s, e) in enumerate(zip(paths, starts, ends))
574+
):
575+
remove.append(i)
576+
paths = [p for i, p in enumerate(paths) if i not in remove]
577+
starts = [s for i, s in enumerate(starts) if i not in remove]
578+
ends = [e for i, e in enumerate(ends) if i not in remove]
569579

570580
if paths:
571581
# Loop through the coupled `paths`, `starts`, and
@@ -587,7 +597,7 @@ def merge_offset_ranges(
587597
new_starts.append(starts[i])
588598
new_ends.append(ends[i])
589599
else:
590-
# Merge with previous block by updating the
600+
# Merge with the previous block by updating the
591601
# last element of `ends`
592602
new_ends[-1] = ends[i]
593603
return new_paths, new_starts, new_ends

0 commit comments

Comments
 (0)