Skip to content

Commit 39b0f01

Browse files
authored
Add parquet-sampling configuration options (rapidsai#19423)
Closes rapidsai#19389 Adds `max_footer_samples` and `max_row_group_samples` configuration options to control metadata/row-group sampling. Although these configuration options are only *used* by the streaming executor, it felt more natural to add these to `ParquetOptions` (since they are definitely Parquet specific). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Tom Augspurger (https://github.com/TomAugspurger) - Matthew Murray (https://github.com/Matt711) URL: rapidsai#19423
1 parent 2fd8d4c commit 39b0f01

File tree

4 files changed

+89
-47
lines changed

4 files changed

+89
-47
lines changed

python/cudf_polars/cudf_polars/experimental/io.py

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def from_scan(ir: Scan, config_options: ConfigOptions) -> ScanPartitionPlan:
116116
)
117117

118118
blocksize: int = config_options.executor.target_partition_size
119-
column_stats = _extract_scan_stats(ir)
119+
column_stats = _extract_scan_stats(ir, config_options)
120120
column_sizes: list[int] = []
121121
for name, cs in column_stats.items():
122122
storage_size = cs.source_info.storage_size(name)
@@ -593,13 +593,13 @@ class ParquetMetadata:
593593
----------
594594
paths
595595
Parquet-dataset paths.
596-
max_file_samples
597-
Maximum number of files to sample for metadata.
596+
max_footer_samples
597+
Maximum number of file footers to sample metadata from.
598598
"""
599599

600600
__slots__ = (
601601
"column_names",
602-
"max_file_samples",
602+
"max_footer_samples",
603603
"mean_size_per_file",
604604
"num_row_groups_per_file",
605605
"paths",
@@ -609,8 +609,8 @@ class ParquetMetadata:
609609

610610
paths: tuple[str, ...]
611611
"""Parquet-dataset paths."""
612-
max_file_samples: int
613-
"""Maximum number of files to sample for metadata."""
612+
max_footer_samples: int
613+
"""Maximum number of file footers to sample metadata from."""
614614
row_count: ColumnStat[int]
615615
"""Total row-count estimate."""
616616
num_row_groups_per_file: tuple[int, ...]
@@ -622,15 +622,17 @@ class ParquetMetadata:
622622
sample_paths: tuple[str, ...]
623623
"""Sampled file paths."""
624624

625-
def __init__(self, paths: tuple[str, ...], max_file_samples: int):
625+
def __init__(self, paths: tuple[str, ...], max_footer_samples: int):
626626
self.paths = paths
627-
self.max_file_samples = max_file_samples
627+
self.max_footer_samples = max_footer_samples
628628
self.row_count = ColumnStat[int]()
629629
self.num_row_groups_per_file = ()
630630
self.mean_size_per_file = {}
631631
self.column_names = ()
632-
stride = max(1, int(len(paths) / max_file_samples)) if max_file_samples else 1
633-
self.sample_paths = paths[: stride * max_file_samples : stride]
632+
stride = (
633+
max(1, int(len(paths) / max_footer_samples)) if max_footer_samples else 1
634+
)
635+
self.sample_paths = paths[: stride * max_footer_samples : stride]
634636

635637
if not self.sample_paths:
636638
# No paths to sample from
@@ -685,29 +687,29 @@ class ParquetSourceInfo(DataSourceInfo):
685687
----------
686688
paths
687689
Parquet-dataset paths.
688-
max_file_samples
689-
Maximum number of files to sample metadata from.
690-
max_rg_samples
690+
max_footer_samples
691+
Maximum number of file footers to sample metadata from.
692+
max_row_group_samples
691693
Maximum number of row-groups to sample data from.
692694
"""
693695

694696
def __init__(
695697
self,
696698
paths: tuple[str, ...],
697-
max_file_samples: int,
698-
max_rg_samples: int,
699+
max_footer_samples: int,
700+
max_row_group_samples: int,
699701
):
700702
self.paths = paths
701-
self.max_file_samples = max_file_samples
702-
self.max_rg_samples = max_rg_samples
703+
self.max_footer_samples = max_footer_samples
704+
self.max_row_group_samples = max_row_group_samples
703705
# Helper attributes
704706
self._key_columns: set[str] = set() # Used to fuse lazy row-group sampling
705707
self._unique_stats: dict[str, UniqueStats] = {}
706708

707709
@functools.cached_property
708710
def metadata(self) -> ParquetMetadata:
709711
"""Return Parquet metadata."""
710-
return ParquetMetadata(self.paths, self.max_file_samples)
712+
return ParquetMetadata(self.paths, self.max_footer_samples)
711713

712714
@property
713715
def row_count(self) -> ColumnStat[int]:
@@ -717,7 +719,7 @@ def row_count(self) -> ColumnStat[int]:
717719
def _sample_row_groups(self) -> None:
718720
"""Estimate unique-value statistics from a row-group sample."""
719721
sample_paths = self.metadata.sample_paths
720-
if not sample_paths or self.max_rg_samples < 1:
722+
if not sample_paths or self.max_row_group_samples < 1:
721723
# No row-groups to sample from
722724
return
723725

@@ -742,14 +744,14 @@ def _sample_row_groups(self) -> None:
742744
for rg_id in range(num_rgs):
743745
n += 1
744746
samples[path].append(rg_id)
745-
if n == self.max_rg_samples:
747+
if n == self.max_row_group_samples:
746748
break
747-
if n == self.max_rg_samples:
749+
if n == self.max_row_group_samples:
748750
break
749751

750-
exact = sampled_file_count == len(self.paths) and self.max_rg_samples >= sum(
751-
num_row_groups_per_file
752-
)
752+
exact = sampled_file_count == len(
753+
self.paths
754+
) and self.max_row_group_samples >= sum(num_row_groups_per_file)
753755

754756
options = plc.io.parquet.ParquetReaderOptions.builder(
755757
plc.io.SourceInfo(list(samples))
@@ -809,22 +811,23 @@ def add_unique_stats_column(self, column: str) -> None:
809811
@functools.cache
810812
def _sample_pq_stats(
811813
paths: tuple[str, ...],
812-
max_file_samples: int,
813-
max_rg_samples: int,
814+
max_footer_samples: int,
815+
max_row_group_samples: int,
814816
) -> ParquetSourceInfo:
815817
"""Return Parquet datasource information."""
816-
return ParquetSourceInfo(paths, max_file_samples, max_rg_samples)
818+
return ParquetSourceInfo(paths, max_footer_samples, max_row_group_samples)
817819

818820

819821
def _extract_scan_stats(
820-
ir: Scan, *, max_file_samples: int = 3, max_rg_samples: int = 1
822+
ir: Scan,
823+
config_options: ConfigOptions,
821824
) -> dict[str, ColumnStats]:
822825
"""Extract base ColumnStats for a Scan node."""
823826
if ir.typ == "parquet":
824-
# TODO: Add max_file_samples and max_rg_samples
825-
# to the ConfigOption system.
826827
source_info = _sample_pq_stats(
827-
tuple(ir.paths), max_file_samples, max_rg_samples
828+
tuple(ir.paths),
829+
config_options.parquet_options.max_footer_samples,
830+
config_options.parquet_options.max_row_group_samples,
828831
)
829832
return {
830833
name: ColumnStats(

python/cudf_polars/cudf_polars/utils/config.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ class ParquetOptions:
160160
pass_read_limit
161161
Limit on the amount of memory used for reading and decompressing data
162162
or 0 if there is no limit.
163+
max_footer_samples
164+
Maximum number of file footers to sample for metadata. This
165+
option is currently used by the streaming executor to gather
166+
datasource statistics before generating a physical plan. Set to
167+
0 to avoid metadata sampling. Default is 3.
168+
max_row_group_samples
169+
Maximum number of row-groups to sample for unique-value statistics.
170+
This option may be used by the streaming executor to optimize
171+
the physical plan. Default is 1.
172+
173+
Set to 0 to avoid row-group sampling. Note that row-group sampling
174+
will also be skipped if ``max_footer_samples`` is 0.
163175
"""
164176

165177
_env_prefix = "CUDF_POLARS__PARQUET_OPTIONS"
@@ -179,6 +191,16 @@ class ParquetOptions:
179191
f"{_env_prefix}__PASS_READ_LIMIT", int, default=0
180192
)
181193
)
194+
max_footer_samples: int = dataclasses.field(
195+
default_factory=_make_default_factory(
196+
f"{_env_prefix}__MAX_FOOTER_SAMPLES", int, default=3
197+
)
198+
)
199+
max_row_group_samples: int = dataclasses.field(
200+
default_factory=_make_default_factory(
201+
f"{_env_prefix}__MAX_ROW_GROUP_SAMPLES", int, default=1
202+
)
203+
)
182204

183205
def __post_init__(self) -> None: # noqa: D105
184206
if not isinstance(self.chunked, bool):
@@ -187,6 +209,10 @@ def __post_init__(self) -> None: # noqa: D105
187209
raise TypeError("chunk_read_limit must be an int")
188210
if not isinstance(self.pass_read_limit, int):
189211
raise TypeError("pass_read_limit must be an int")
212+
if not isinstance(self.max_footer_samples, int):
213+
raise TypeError("max_footer_samples must be an int")
214+
if not isinstance(self.max_row_group_samples, int):
215+
raise TypeError("max_row_group_samples must be an int")
190216

191217

192218
def default_blocksize(scheduler: str) -> int:

python/cudf_polars/tests/experimental/test_scan.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ def test_split_scan_predicate(tmp_path, df, mask):
8888

8989
@pytest.mark.parametrize("n_files", [1, 3])
9090
@pytest.mark.parametrize("row_group_size", [None, 10_000])
91-
@pytest.mark.parametrize("max_file_samples", [3, 0])
92-
@pytest.mark.parametrize("max_rg_samples", [1, 0])
91+
@pytest.mark.parametrize("max_footer_samples", [3, 0])
92+
@pytest.mark.parametrize("max_row_group_samples", [1, 0])
9393
def test_source_statistics(
9494
tmp_path,
9595
df,
9696
n_files,
9797
row_group_size,
98-
max_file_samples,
99-
max_rg_samples,
98+
max_footer_samples,
99+
max_row_group_samples,
100100
):
101101
from cudf_polars.experimental.io import (
102102
_clear_source_info_cache,
@@ -119,26 +119,26 @@ def test_source_statistics(
119119
"target_partition_size": 10_000,
120120
"scheduler": DEFAULT_SCHEDULER,
121121
},
122+
parquet_options={
123+
"max_footer_samples": max_footer_samples,
124+
"max_row_group_samples": max_row_group_samples,
125+
},
122126
)
123127
ir = Translator(q._ldf.visit(), engine).translate_ir()
124-
column_stats = _extract_scan_stats(
125-
ir,
126-
max_file_samples=max_file_samples,
127-
max_rg_samples=max_rg_samples,
128-
)
128+
column_stats = _extract_scan_stats(ir, ConfigOptions.from_polars_engine(engine))
129129

130130
# Source info is the same for all columns
131131
source_info = column_stats["x"].source_info
132132
assert source_info is column_stats["y"].source_info
133133
assert source_info is column_stats["z"].source_info
134-
if max_file_samples:
134+
if max_footer_samples:
135135
assert source_info.row_count.value == df.height
136136
assert source_info.row_count.exact
137137
else:
138138
assert source_info.row_count.value is None
139139

140140
# Storage stats should be available
141-
if max_file_samples:
141+
if max_footer_samples:
142142
assert source_info.storage_size("x").value > 0
143143
assert source_info.storage_size("y").value > 0
144144
else:
@@ -153,21 +153,21 @@ def test_source_statistics(
153153
# source._unique_stats should be empty
154154
assert set(source_info._unique_stats) == set()
155155

156-
if max_file_samples and max_rg_samples:
156+
if max_footer_samples and max_row_group_samples:
157157
assert source_info.unique_stats("x").count.value == df.height
158158
assert source_info.unique_stats("x").fraction.value == 1.0
159159
else:
160160
assert source_info.unique_stats("x").count.value is None
161161
assert source_info.unique_stats("x").fraction.value is None
162162

163163
# source_info._unique_stats should only contain 'x'
164-
if max_file_samples and max_rg_samples:
164+
if max_footer_samples and max_row_group_samples:
165165
assert set(source_info._unique_stats) == {"x"}
166166
else:
167167
assert set(source_info._unique_stats) == set()
168168

169169
# Check add_unique_stats_column behavior
170-
if max_file_samples and max_rg_samples:
170+
if max_footer_samples and max_row_group_samples:
171171
# Can add a "bad"/missing key column
172172
source_info.add_unique_stats_column("foo")
173173
assert set(source_info._unique_stats) == {"x"}
@@ -198,7 +198,7 @@ def test_source_statistics_csv(tmp_path, df):
198198
},
199199
)
200200
ir = Translator(q._ldf.visit(), engine).translate_ir()
201-
column_stats = _extract_scan_stats(ir)
201+
column_stats = _extract_scan_stats(ir, ConfigOptions.from_polars_engine(engine))
202202

203203
# Source info should be empty for CSV
204204
source_info = column_stats["x"].source_info

python/cudf_polars/tests/test_config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,17 @@ def test_parquet_options_from_env(monkeypatch: pytest.MonkeyPatch) -> None:
301301
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__CHUNKED", "0")
302302
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__CHUNK_READ_LIMIT", "100")
303303
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__PASS_READ_LIMIT", "200")
304+
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__MAX_FOOTER_SAMPLES", "0")
305+
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__MAX_ROW_GROUP_SAMPLES", "0")
304306

305307
# Test default
306308
engine = pl.GPUEngine()
307309
config = ConfigOptions.from_polars_engine(engine)
308310
assert config.parquet_options.chunked is False
309311
assert config.parquet_options.chunk_read_limit == 100
310312
assert config.parquet_options.pass_read_limit == 200
313+
assert config.parquet_options.max_footer_samples == 0
314+
assert config.parquet_options.max_row_group_samples == 0
311315

312316
with monkeypatch.context() as m:
313317
m.setenv("CUDF_POLARS__PARQUET_OPTIONS__CHUNKED", "foo")
@@ -393,7 +397,16 @@ def test_cardinality_factor_compat() -> None:
393397
)
394398

395399

396-
@pytest.mark.parametrize("option", ["chunked", "chunk_read_limit", "pass_read_limit"])
400+
@pytest.mark.parametrize(
401+
"option",
402+
[
403+
"chunked",
404+
"chunk_read_limit",
405+
"pass_read_limit",
406+
"max_footer_samples",
407+
"max_row_group_samples",
408+
],
409+
)
397410
def test_validate_parquet_options(option: str) -> None:
398411
with pytest.raises(TypeError, match=f"{option} must be"):
399412
ConfigOptions.from_polars_engine(

0 commit comments

Comments
 (0)