Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
T = TypeVar("T")


logger = logging.getLogger("fsspec")
logger = logging.getLogger("fsspec.caching")

Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
Expand Down Expand Up @@ -662,6 +662,7 @@ def nblocks(self, value):
pass

def _fetch(self, start: int | None, stop: int | None) -> bytes:
logger.debug("Known parts request %s %s", start, stop)
if start is None:
start = 0
if stop is None:
Expand Down
56 changes: 37 additions & 19 deletions fsspec/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ def _parquet_byte_ranges(
# Input row_groups contains row-group indices
row_group_indices = row_groups
row_groups = pf.row_groups
if column_set is not None:
column_set = [
_ if isinstance(_, list) else _.split(".") for _ in column_set
]

# Loop through column chunks to add required byte ranges
for r, row_group in enumerate(row_groups):
Expand All @@ -475,10 +479,9 @@ def _parquet_byte_ranges(
fn = self._row_group_filename(row_group, pf)

for column in row_group.columns:
name = column.meta_data.path_in_schema[0]
# Skip this column if we are targeting a
# specific columns
if column_set is None or name in column_set:
name = column.meta_data.path_in_schema
# Skip this column if we are targeting specific columns
if column_set is None or _cmp(name, column_set):
file_offset0 = column.meta_data.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.meta_data.data_page_offset
Expand Down Expand Up @@ -550,6 +553,10 @@ def _parquet_byte_ranges(
if not isinstance(ind, dict)
]
column_set |= set(md_index)
if column_set is not None:
column_set = [
_ if isinstance(_, list) else _.split(".") for _ in column_set
]

# Loop through column chunks to add required byte ranges
for r in range(md.num_row_groups):
Expand All @@ -559,22 +566,33 @@ def _parquet_byte_ranges(
row_group = md.row_group(r)
for c in range(row_group.num_columns):
column = row_group.column(c)
name = column.path_in_schema
# Skip this column if we are targeting a
# specific columns
split_name = name.split(".")[0]
if (
column_set is None
or name in column_set
or split_name in column_set
):
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_compressed_size
if file_offset0 < footer_start:
name = column.path_in_schema.split(".")
# Skip this column if we are targeting specific columns
if column_set is None or _cmp(name, column_set):
meta = column.to_dict()
# Any offset could be the first one
file_offset0 = min(
_
for _ in [
meta.get("dictionary_page_offset"),
meta.get("data_page_offset"),
meta.get("index_page_offset"),
]
if _ is not None
)
if footer_start is None or file_offset0 < footer_start:
data_starts.append(file_offset0)
data_ends.append(
min(file_offset0 + num_bytes, footer_start)
min(
meta["total_compressed_size"] + file_offset0,
footer_start
or (meta["total_compressed_size"] + file_offset0),
)
)
data_starts.append(footer_start)
data_ends.append(footer_start + len(footer))
return data_starts, data_ends


def _cmp(name, column_set):
return any(all(a == b for a, b in zip(name, _)) for _ in column_set)
64 changes: 22 additions & 42 deletions fsspec/tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import random

import pytest

Expand All @@ -11,13 +12,13 @@
except ImportError:
pq = None

from fsspec.core import url_to_fs
from fsspec.parquet import (
_get_parquet_byte_ranges,
open_parquet_file,
open_parquet_files,
)

pd = pytest.importorskip("pandas")

# Define `engine` fixture
FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found")
PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found")
Expand All @@ -43,7 +44,6 @@ def test_open_parquet_file(
tmpdir, engine, columns, max_gap, max_block, footer_sample_size, range_index
):
# Pandas required for this test
pd = pytest.importorskip("pandas")
if columns == ["z"] and engine == "fastparquet":
columns = ["z.a"] # fastparquet is more specific

Expand All @@ -52,9 +52,9 @@ def test_open_parquet_file(
nrows = 40
df = pd.DataFrame(
{
"x": [i * 7 % 5 for i in range(nrows)],
"y": [[0, i] for i in range(nrows)], # list
"z": [{"a": i, "b": "cat"} for i in range(nrows)], # struct
"x": [i * 7 % 5 for i in range(nrows)],
},
index=pd.Index([10 * i for i in range(nrows)], name="myindex"),
)
Expand All @@ -66,40 +66,6 @@ def test_open_parquet_file(
# "Traditional read" (without `open_parquet_file`)
expect = pd.read_parquet(path, columns=columns, engine=engine)

# Use `_get_parquet_byte_ranges` to re-write a
# place-holder file with all bytes NOT required
# to read `columns` set to b"0". The purpose of
# this step is to make sure the read will fail
# if the correct bytes have not been accurately
# selected by `_get_parquet_byte_ranges`. If this
# test were reading from remote storage, we would
# not need this logic to capture errors.
fs = url_to_fs(path)[0]
data = _get_parquet_byte_ranges(
[path],
fs,
columns=columns,
engine=engine,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
)[path]
file_size = fs.size(path)
with open(path, "wb") as f:
f.write(b"0" * file_size)

if footer_sample_size == 8 and columns is not None:
# We know 8 bytes is too small to include
# the footer metadata, so there should NOT
# be a key for the last 8 bytes of the file
bad_key = (file_size - 8, file_size)
assert bad_key not in data

for (start, stop), byte_data in data.items():
f.seek(start)
f.write(byte_data)

# Read back the modified file with `open_parquet_file`
with open_parquet_file(
path,
columns=columns,
Expand Down Expand Up @@ -151,10 +117,9 @@ def test_open_parquet_file(
)


@pytest.mark.filterwarnings("ignore:.*Not enough data.*")
@FASTPARQUET_MARK
def test_with_filter(tmpdir):
import pandas as pd

df = pd.DataFrame(
{
"a": [10, 1, 2, 3, 7, 8, 9],
Expand All @@ -180,10 +145,9 @@ def test_with_filter(tmpdir):
pd.testing.assert_frame_equal(expect, result)


@pytest.mark.filterwarnings("ignore:.*Not enough data.*")
@FASTPARQUET_MARK
def test_multiple(tmpdir):
import pandas as pd

df = pd.DataFrame(
{
"a": [10, 1, 2, 3, 7, 8, 9],
Expand Down Expand Up @@ -238,3 +202,19 @@ def test_multiple(tmpdir):
dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs]
result = pd.concat(dfs).reset_index(drop=True)
assert expect.equals(result)


@pytest.mark.parametrize("n", [100, 10_000, 1_000_000])
def test_nested(n, tmpdir, engine):
path = os.path.join(str(tmpdir), "test.parquet")
pa = pytest.importorskip("pyarrow")
flat = pa.array([random.random() for _ in range(n)])
a = random.random()
b = random.random()
nested = pa.array([{"a": a, "b": b} for _ in range(n)])
table = pa.table({"flat": flat, "nested": nested})
pq.write_table(table, path)
with open_parquet_file(path, columns=["nested.a"], engine=engine) as fh:
col = pd.read_parquet(fh, engine=engine, columns=["nested.a"])
name = "a" if engine == "pyarrow" else "nested.a"
assert (col[name] == a).all()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ test_full = [
'notebook',
'numpy',
'ocifs',
'pandas',
'pandas <3.0.0',
'panel',
'paramiko',
'pyarrow',
Expand Down
Loading