Skip to content

Commit cc7f283

Browse files
committed
Merge branch 'master' into fix/hdfs-size-unseekable
2 parents 5c7934c + 6e11963 commit cc7f283

File tree

4 files changed

+66
-64
lines changed

4 files changed

+66
-64
lines changed

fsspec/caching.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
T = TypeVar("T")
2626

2727

28-
logger = logging.getLogger("fsspec")
28+
logger = logging.getLogger("fsspec.caching")
2929

3030
Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
3131
MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
@@ -662,6 +662,7 @@ def nblocks(self, value):
662662
pass
663663

664664
def _fetch(self, start: int | None, stop: int | None) -> bytes:
665+
logger.debug("Known parts request %s %s", start, stop)
665666
if start is None:
666667
start = 0
667668
if stop is None:

fsspec/parquet.py

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,10 @@ def _parquet_byte_ranges(
465465
# Input row_groups contains row-group indices
466466
row_group_indices = row_groups
467467
row_groups = pf.row_groups
468+
if column_set is not None:
469+
column_set = [
470+
_ if isinstance(_, list) else _.split(".") for _ in column_set
471+
]
468472

469473
# Loop through column chunks to add required byte ranges
470474
for r, row_group in enumerate(row_groups):
@@ -475,10 +479,9 @@ def _parquet_byte_ranges(
475479
fn = self._row_group_filename(row_group, pf)
476480

477481
for column in row_group.columns:
478-
name = column.meta_data.path_in_schema[0]
479-
# Skip this column if we are targeting a
480-
# specific columns
481-
if column_set is None or name in column_set:
482+
name = column.meta_data.path_in_schema
483+
# Skip this column if we are targeting specific columns
484+
if column_set is None or _cmp(name, column_set):
482485
file_offset0 = column.meta_data.dictionary_page_offset
483486
if file_offset0 is None:
484487
file_offset0 = column.meta_data.data_page_offset
@@ -550,6 +553,10 @@ def _parquet_byte_ranges(
550553
if not isinstance(ind, dict)
551554
]
552555
column_set |= set(md_index)
556+
if column_set is not None:
557+
column_set = [
558+
_ if isinstance(_, list) else _.split(".") for _ in column_set
559+
]
553560

554561
# Loop through column chunks to add required byte ranges
555562
for r in range(md.num_row_groups):
@@ -559,22 +566,33 @@ def _parquet_byte_ranges(
559566
row_group = md.row_group(r)
560567
for c in range(row_group.num_columns):
561568
column = row_group.column(c)
562-
name = column.path_in_schema
563-
# Skip this column if we are targeting a
564-
# specific columns
565-
split_name = name.split(".")[0]
566-
if (
567-
column_set is None
568-
or name in column_set
569-
or split_name in column_set
570-
):
571-
file_offset0 = column.dictionary_page_offset
572-
if file_offset0 is None:
573-
file_offset0 = column.data_page_offset
574-
num_bytes = column.total_compressed_size
575-
if file_offset0 < footer_start:
569+
name = column.path_in_schema.split(".")
570+
# Skip this column if we are targeting specific columns
571+
if column_set is None or _cmp(name, column_set):
572+
meta = column.to_dict()
573+
# Any offset could be the first one
574+
file_offset0 = min(
575+
_
576+
for _ in [
577+
meta.get("dictionary_page_offset"),
578+
meta.get("data_page_offset"),
579+
meta.get("index_page_offset"),
580+
]
581+
if _ is not None
582+
)
583+
if footer_start is None or file_offset0 < footer_start:
576584
data_starts.append(file_offset0)
577585
data_ends.append(
578-
min(file_offset0 + num_bytes, footer_start)
586+
min(
587+
meta["total_compressed_size"] + file_offset0,
588+
footer_start
589+
or (meta["total_compressed_size"] + file_offset0),
590+
)
579591
)
592+
data_starts.append(footer_start)
593+
data_ends.append(footer_start + len(footer))
580594
return data_starts, data_ends
595+
596+
597+
def _cmp(name, column_set):
598+
return any(all(a == b for a, b in zip(name, _)) for _ in column_set)

fsspec/tests/test_parquet.py

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import random
23

34
import pytest
45

@@ -11,15 +12,18 @@
1112
except ImportError:
1213
pq = None
1314

14-
from fsspec.core import url_to_fs
1515
from fsspec.parquet import (
16-
_get_parquet_byte_ranges,
1716
open_parquet_file,
1817
open_parquet_files,
1918
)
2019

20+
pd = pytest.importorskip("pandas")
21+
pd_gt_3 = pd.__version__ > "3"
22+
2123
# Define `engine` fixture
22-
FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found")
24+
FASTPARQUET_MARK = pytest.mark.skipif(
25+
pd_gt_3 or not fastparquet, reason="fastparquet not found"
26+
)
2327
PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found")
2428

2529

@@ -43,7 +47,6 @@ def test_open_parquet_file(
4347
tmpdir, engine, columns, max_gap, max_block, footer_sample_size, range_index
4448
):
4549
# Pandas required for this test
46-
pd = pytest.importorskip("pandas")
4750
if columns == ["z"] and engine == "fastparquet":
4851
columns = ["z.a"] # fastparquet is more specific
4952

@@ -52,9 +55,9 @@ def test_open_parquet_file(
5255
nrows = 40
5356
df = pd.DataFrame(
5457
{
55-
"x": [i * 7 % 5 for i in range(nrows)],
5658
"y": [[0, i] for i in range(nrows)], # list
5759
"z": [{"a": i, "b": "cat"} for i in range(nrows)], # struct
60+
"x": [i * 7 % 5 for i in range(nrows)],
5861
},
5962
index=pd.Index([10 * i for i in range(nrows)], name="myindex"),
6063
)
@@ -66,40 +69,6 @@ def test_open_parquet_file(
6669
# "Traditional read" (without `open_parquet_file`)
6770
expect = pd.read_parquet(path, columns=columns, engine=engine)
6871

69-
# Use `_get_parquet_byte_ranges` to re-write a
70-
# place-holder file with all bytes NOT required
71-
# to read `columns` set to b"0". The purpose of
72-
# this step is to make sure the read will fail
73-
# if the correct bytes have not been accurately
74-
# selected by `_get_parquet_byte_ranges`. If this
75-
# test were reading from remote storage, we would
76-
# not need this logic to capture errors.
77-
fs = url_to_fs(path)[0]
78-
data = _get_parquet_byte_ranges(
79-
[path],
80-
fs,
81-
columns=columns,
82-
engine=engine,
83-
max_gap=max_gap,
84-
max_block=max_block,
85-
footer_sample_size=footer_sample_size,
86-
)[path]
87-
file_size = fs.size(path)
88-
with open(path, "wb") as f:
89-
f.write(b"0" * file_size)
90-
91-
if footer_sample_size == 8 and columns is not None:
92-
# We know 8 bytes is too small to include
93-
# the footer metadata, so there should NOT
94-
# be a key for the last 8 bytes of the file
95-
bad_key = (file_size - 8, file_size)
96-
assert bad_key not in data
97-
98-
for (start, stop), byte_data in data.items():
99-
f.seek(start)
100-
f.write(byte_data)
101-
102-
# Read back the modified file with `open_parquet_file`
10372
with open_parquet_file(
10473
path,
10574
columns=columns,
@@ -151,10 +120,9 @@ def test_open_parquet_file(
151120
)
152121

153122

123+
@pytest.mark.filterwarnings("ignore:.*Not enough data.*")
154124
@FASTPARQUET_MARK
155125
def test_with_filter(tmpdir):
156-
import pandas as pd
157-
158126
df = pd.DataFrame(
159127
{
160128
"a": [10, 1, 2, 3, 7, 8, 9],
@@ -180,10 +148,9 @@ def test_with_filter(tmpdir):
180148
pd.testing.assert_frame_equal(expect, result)
181149

182150

151+
@pytest.mark.filterwarnings("ignore:.*Not enough data.*")
183152
@FASTPARQUET_MARK
184153
def test_multiple(tmpdir):
185-
import pandas as pd
186-
187154
df = pd.DataFrame(
188155
{
189156
"a": [10, 1, 2, 3, 7, 8, 9],
@@ -238,3 +205,19 @@ def test_multiple(tmpdir):
238205
dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs]
239206
result = pd.concat(dfs).reset_index(drop=True)
240207
assert expect.equals(result)
208+
209+
210+
@pytest.mark.parametrize("n", [100, 10_000, 1_000_000])
211+
def test_nested(n, tmpdir, engine):
212+
path = os.path.join(str(tmpdir), "test.parquet")
213+
pa = pytest.importorskip("pyarrow")
214+
flat = pa.array([random.random() for _ in range(n)])
215+
a = random.random()
216+
b = random.random()
217+
nested = pa.array([{"a": a, "b": b} for _ in range(n)])
218+
table = pa.table({"flat": flat, "nested": nested})
219+
pq.write_table(table, path)
220+
with open_parquet_file(path, columns=["nested.a"], engine=engine) as fh:
221+
col = pd.read_parquet(fh, engine=engine, columns=["nested.a"])
222+
name = "a" if engine == "pyarrow" else "nested.a"
223+
assert (col[name] == a).all()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ test_full = [
9797
'notebook',
9898
'numpy',
9999
'ocifs',
100-
'pandas',
100+
'pandas <3.0.0',
101101
'panel',
102102
'paramiko',
103103
'pyarrow',

0 commit comments

Comments
 (0)