Skip to content

Commit fcf83cc

Browse files
committed
Implement multi-file
1 parent 9a89a0d commit fcf83cc

File tree

2 files changed

+126
-44
lines changed

2 files changed

+126
-44
lines changed

fsspec/parquet.py

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import io
22
import json
33
import warnings
4+
from typing import Literal
5+
6+
import fsspec
47

58
from .core import url_to_fs
69
from .spec import AbstractBufferedFile
@@ -20,19 +23,19 @@ def _fetch_range(self, start, end):
2023
raise NotImplementedError
2124

2225

23-
def open_parquet_file(
24-
path,
25-
mode="rb",
26-
fs=None,
26+
def open_parquet_files(
27+
path: list[str],
28+
mode: Literal["rb"] = "rb",
29+
fs: None | fsspec.AbstractFileSystem = None,
2730
metadata=None,
28-
columns=None,
29-
row_groups=None,
30-
storage_options=None,
31-
engine="auto",
32-
max_gap=64_000,
33-
max_block=256_000_000,
34-
footer_sample_size=1_000_000,
35-
filters=None,
31+
columns: None | list[str] = None,
32+
row_groups: None | list[int] = None,
33+
storage_options: None | dict = None,
34+
engine: str = "auto",
35+
max_gap: int = 64_000,
36+
max_block: int = 256_000_000,
37+
footer_sample_size: int = 1_000_000,
38+
filters: None | list[list[list[str]]] = None,
3639
**kwargs,
3740
):
3841
"""
@@ -100,7 +103,12 @@ def open_parquet_file(
100103
# Make sure we have an `AbstractFileSystem` object
101104
# to work with
102105
if fs is None:
103-
fs = url_to_fs(path, **(storage_options or {}))[0]
106+
path0 = path
107+
if isinstance(path, (list, tuple)):
108+
path = path[0]
109+
fs, path = url_to_fs(path, **(storage_options or {}))
110+
else:
111+
path0 = path
104112

105113
# For now, `columns == []` not supported, is the same
106114
# as all columns
@@ -110,10 +118,21 @@ def open_parquet_file(
110118
# Set the engine
111119
engine = _set_engine(engine)
112120

113-
# Fetch the known byte ranges needed to read
114-
# `columns` and/or `row_groups`
121+
if isinstance(path0, (list, tuple)):
122+
paths = path0
123+
elif "*" in path:
124+
paths = fs.glob(path)
125+
elif path0.endswith("/"): # or fs.isdir(path):
126+
paths = [
127+
_
128+
for _ in fs.find(path, withdirs=False, detail=False)
129+
if _.endswith((".parquet", ".parq"))
130+
]
131+
else:
132+
paths = [path]
133+
115134
data = _get_parquet_byte_ranges(
116-
[path],
135+
paths,
117136
fs,
118137
metadata=metadata,
119138
columns=columns,
@@ -125,23 +144,34 @@ def open_parquet_file(
125144
filters=filters,
126145
)
127146

128-
# Extract file name from `data`
129-
fn = next(iter(data)) if data else path
130-
131147
# Call self.open with "parts" caching
132148
options = kwargs.pop("cache_options", {}).copy()
133-
return AlreadyBufferedFile(
134-
fs=None,
135-
path=fn,
136-
mode=mode,
137-
cache_type="parts",
138-
cache_options={
139-
**options,
140-
"data": data.get(fn, {}),
141-
},
142-
size=max(_[1] for _ in data.get(fn, {})),
143-
**kwargs,
144-
)
149+
return [
150+
AlreadyBufferedFile(
151+
fs=None,
152+
path=fn,
153+
mode=mode,
154+
cache_type="parts",
155+
cache_options={
156+
**options,
157+
"data": data.get(fn, {}),
158+
},
159+
size=max(_[1] for _ in data.get(fn, {})),
160+
**kwargs,
161+
)
162+
for fn in data
163+
]
164+
165+
166+
def open_parquet_file(*args, **kwargs):
167+
"""Create files tailed to reading specific parts of parquet files
168+
169+
Please see ``open_parquet_files`` for details of the arguments. The
170+
difference is, this function always returns a single ``AleadyBufferedFile``,
171+
whereas `open_parquet_files`` always returns a list of files, even if
172+
there are one or zero matching parquet files.
173+
"""
174+
return open_parquet_files(*args, **kwargs)[0]
145175

146176

147177
def _get_parquet_byte_ranges(
@@ -242,18 +272,6 @@ def _get_parquet_byte_ranges(
242272

243273
# Calculate required byte ranges for each path
244274
for i, path in enumerate(paths):
245-
# Deal with small-file case.
246-
# Just include all remaining bytes of the file
247-
# in a single range.
248-
# if file_sizes[i] < max_block:
249-
# if footer_starts[i] > 0:
250-
# # Only need to transfer the data if the
251-
# # footer sample isn't already the whole file
252-
# data_paths.append(path)
253-
# data_starts.append(0)
254-
# data_ends.append(footer_starts[i])
255-
# continue
256-
257275
# Use "engine" to collect data byte ranges
258276
path_data_starts, path_data_ends = engine._parquet_byte_ranges(
259277
columns,

fsspec/tests/test_parquet.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
pq = None
1313

1414
from fsspec.core import url_to_fs
15-
from fsspec.parquet import _get_parquet_byte_ranges, open_parquet_file
15+
from fsspec.parquet import (
16+
_get_parquet_byte_ranges,
17+
open_parquet_file,
18+
open_parquet_files,
19+
)
1620

1721
# Define `engine` fixture
1822
FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found")
@@ -174,3 +178,63 @@ def test_with_filter(tmpdir):
174178

175179
result = pd.read_parquet(f, engine="fastparquet", filters=[["b", "==", "b"]])
176180
pd.testing.assert_frame_equal(expect, result)
181+
182+
183+
@FASTPARQUET_MARK
184+
def test_multiple(tmpdir):
185+
import pandas as pd
186+
187+
df = pd.DataFrame(
188+
{
189+
"a": [10, 1, 2, 3, 7, 8, 9],
190+
"b": ["a", "a", "a", "b", "b", "b", "b"],
191+
}
192+
)
193+
fn = os.path.join(str(tmpdir), "test.parquet/")
194+
df.to_parquet(
195+
fn,
196+
engine="fastparquet",
197+
row_group_offsets=[0, 3],
198+
stats=True,
199+
file_scheme="hive",
200+
) # partition_on="b"
201+
202+
# path ending in "/"
203+
expect = pd.read_parquet(fn, engine="fastparquet")[["a"]]
204+
ofs = open_parquet_files(
205+
fn,
206+
engine="fastparquet",
207+
columns=["a"],
208+
max_gap=1,
209+
max_block=1,
210+
footer_sample_size=8,
211+
)
212+
dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs]
213+
result = pd.concat(dfs).reset_index(drop=True)
214+
assert expect.equals(result)
215+
216+
# glob
217+
ofs = open_parquet_files(
218+
fn + "*.parquet",
219+
engine="fastparquet",
220+
columns=["a"],
221+
max_gap=1,
222+
max_block=1,
223+
footer_sample_size=8,
224+
)
225+
dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs]
226+
result = pd.concat(dfs).reset_index(drop=True)
227+
assert expect.equals(result)
228+
229+
# explicit
230+
ofs = open_parquet_files(
231+
[f"{fn}part.0.parquet", f"{fn}part.1.parquet"],
232+
engine="fastparquet",
233+
columns=["a"],
234+
max_gap=1,
235+
max_block=1,
236+
footer_sample_size=8,
237+
)
238+
dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs]
239+
result = pd.concat(dfs).reset_index(drop=True)
240+
assert expect.equals(result)

0 commit comments

Comments
 (0)