Skip to content

Commit 4e18df1

Browse files
authored
Add parquet scan options and docs (#7801)
parquet scan options and docs
1 parent 6f2502c commit 4e18df1

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

src/datasets/packaged_modules/parquet/parquet.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,63 @@
1515

1616
@dataclass
1717
class ParquetConfig(datasets.BuilderConfig):
18-
"""BuilderConfig for Parquet."""
18+
"""
19+
BuilderConfig for Parquet.
20+
21+
Args:
22+
batch_size (`int`, *optional*):
23+
Size of the RecordBatches to iterate on.
24+
The default is the row group size (defined by the first row group).
25+
columns (`list[str]`, *optional*)
26+
List of columns to load, the other ones are ignored.
27+
All columns are loaded by default.
28+
features: (`Features`, *optional*):
29+
Cast the data to `features`.
30+
filters (`Union[pyarrow.dataset.Expression, list[tuple], list[list[tuple]]]`, *optional*):
31+
Return only the rows matching the filter.
32+
If possible the predicate will be pushed down to exploit the partition information
33+
or internal metadata found in the data source, e.g. Parquet statistics.
34+
Otherwise filters the loaded RecordBatches before yielding them.
35+
fragment_scan_options (`pyarrow.dataset.ParquetFragmentScanOptions`)
36+
Scan-specific options for Parquet fragments.
37+
This is especially useful to configure buffering and caching.
38+
39+
Example:
40+
41+
Load a subset of columns:
42+
43+
```python
44+
>>> ds = load_dataset(parquet_dataset_id, columns=["col_0", "col_1"])
45+
```
46+
47+
Stream data and efficiently filter data, possibly skipping entire files or row groups:
48+
49+
```python
50+
>>> filters = [("col_0", "==", 0)]
51+
>>> ds = load_dataset(parquet_dataset_id, streaming=True, filters=filters)
52+
```
53+
54+
Increase the minimum request size when streaming from 32MiB (default) to 128MiB and enable prefetching:
55+
56+
```python
57+
>>> import pyarrow
58+
>>> import pyarrow.dataset
59+
>>> fragment_scan_options = pyarrow.dataset.ParquetFragmentScanOptions(
60+
... cache_options=pyarrow.CacheOptions(
61+
... prefetch_limit=1,
62+
... range_size_limit=128 << 20
63+
... ),
64+
... )
65+
>>> ds = load_dataset(parquet_dataset_id, streaming=True, fragment_scan_options=fragment_scan_options)
66+
```
67+
68+
"""
1969

2070
batch_size: Optional[int] = None
2171
columns: Optional[list[str]] = None
2272
features: Optional[datasets.Features] = None
2373
filters: Optional[Union[ds.Expression, list[tuple], list[list[tuple]]]] = None
74+
fragment_scan_options: Optional[ds.ParquetFragmentScanOptions] = None
2475

2576
def __post_init__(self):
2677
super().__post_init__()
@@ -84,9 +135,10 @@ def _generate_tables(self, files):
84135
if isinstance(self.config.filters, list)
85136
else self.config.filters
86137
)
138+
parquet_file_format = ds.ParquetFileFormat(default_fragment_scan_options=self.config.fragment_scan_options)
87139
for file_idx, file in enumerate(itertools.chain.from_iterable(files)):
88140
with open(file, "rb") as f:
89-
parquet_fragment = ds.ParquetFileFormat().make_fragment(f)
141+
parquet_fragment = parquet_file_format.make_fragment(f)
90142
if parquet_fragment.row_groups:
91143
batch_size = self.config.batch_size or parquet_fragment.row_groups[0].num_rows
92144
try:

0 commit comments

Comments
 (0)