Skip to content

Commit cedf831

Browse files
kszucslhoestq
andauthored
feat: use content defined chunking (#7589)
* feat: use content defined chunking in `io.parquet.ParquetDatasetReader` * ci: use nightly pyarrow wheels * feat: use content defined chunking in `arrow_writer.ParquetWriter` * ci: try to pass the pyarrow nightly wheel as additional arg to the doc builder workflow * ci: trigger builds on push no matter the branch * ci: pass the extra index url in the check_code_quality job * ci: specify pyarrow constraint as pyarrow>=21.0.0.dev * ci: restore branch filters in the doc build workflow * ci: install pyarrow from nightly channel in a separate command * ci: missing --system for uv pip install * chore: initialize features variable * chore: set the default max batch size to pyarrow\'s default * chore: rename cdc_options argument to use_content_defined_chunking * chore: always store the cdc parameters as metadata * test: cover more input parameter values for ParquetWriter * test: cover more input parameter values for ParquetDatasetWriter * ci: pin arrow=21.0.0 since it is released now; restore CI workarounds * chore: use Union typehint * 100MB per row group * fix tests * again * write page index --------- Co-authored-by: Quentin Lhoest <[email protected]>
1 parent 4c4503e commit cedf831

File tree

10 files changed

+320
-56
lines changed

10 files changed

+320
-56
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ jobs:
6464
run: uv pip install --system --upgrade pyarrow huggingface-hub "dill<0.3.9"
6565
- name: Install dependencies (minimum versions)
6666
if: ${{ matrix.deps_versions != 'deps-latest' }}
67-
run: uv pip install --system pyarrow==15.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1
67+
run: uv pip install --system pyarrow==21.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1
6868
- name: Test with pytest
6969
run: |
7070
python -m pytest -rfExX -m ${{ matrix.test }} -n 2 --dist loadfile -sv ./tests/

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@
110110
# We use numpy>=1.17 to have np.random.Generator (Dataset shuffling)
111111
"numpy>=1.17",
112112
# Backend and serialization.
113-
# Minimum 15.0.0 to be able to cast dictionary types to their underlying types
114-
"pyarrow>=15.0.0",
113+
# Minimum 21.0.0 to support `use_content_defined_chunking` in ParquetWriter
114+
"pyarrow>=21.0.0",
115115
# For smart caching dataset processing
116116
"dill>=0.3.0,<0.3.9", # tmp pin until dill has official support for determinism see https://github.com/uqfoundation/dill/issues/19
117117
# For performance gains with apache arrow

src/datasets/arrow_dataset.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5256,7 +5256,8 @@ def to_parquet(
52565256
or a BinaryIO, where the dataset will be saved to in the specified format.
52575257
batch_size (`int`, *optional*):
52585258
Size of the batch to load in memory and write at once.
5259-
Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
5259+
By default it aims for row groups with maximum uncompressed byte size of "100MB",
5260+
defined by `datasets.config.MAX_ROW_GROUP_SIZE`.
52605261
storage_options (`dict`, *optional*):
52615262
Key/value pairs to be passed on to the file-system backend, if any.
52625263
@@ -5354,11 +5355,11 @@ def extra_nbytes_visitor(array, feature):
53545355
table = self.with_format("arrow")[:1000]
53555356
table_visitor(table, extra_nbytes_visitor)
53565357

5357-
extra_nbytes = extra_nbytes * len(self.data) / len(table)
5358+
extra_nbytes = extra_nbytes * len(self.data) // len(table)
53585359
dataset_nbytes = dataset_nbytes + extra_nbytes
53595360

53605361
if self._indices is not None:
5361-
dataset_nbytes = dataset_nbytes * len(self._indices) / len(self.data)
5362+
dataset_nbytes = dataset_nbytes * len(self._indices) // len(self.data)
53625363
return dataset_nbytes
53635364

53645365
@staticmethod
@@ -5509,6 +5510,7 @@ def _push_parquet_shards_to_hub_single(
55095510
create_pr: Optional[bool],
55105511
num_shards: int,
55115512
embed_external_files: bool,
5513+
writer_batch_size: int,
55125514
):
55135515
div = num_shards // num_jobs
55145516
mod = num_shards % num_jobs
@@ -5525,20 +5527,18 @@ def _push_parquet_shards_to_hub_single(
55255527
additions: list[CommitOperationAdd] = []
55265528
for index, shard in index_shards:
55275529
if embed_external_files:
5528-
from .io.parquet import get_writer_batch_size
5529-
55305530
format = shard.format
55315531
shard = shard.with_format("arrow")
55325532
shard = shard.map(
55335533
embed_table_storage,
55345534
batched=True,
5535-
batch_size=get_writer_batch_size(shard.features),
5535+
batch_size=writer_batch_size,
55365536
keep_in_memory=True,
55375537
)
55385538
shard = shard.with_format(**format)
55395539
shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet"
55405540
buffer = BytesIO()
5541-
shard.to_parquet(buffer)
5541+
shard.to_parquet(buffer, batch_size=writer_batch_size)
55425542
parquet_content = buffer.getvalue()
55435543
uploaded_size += len(parquet_content)
55445544
del buffer
@@ -5575,7 +5575,12 @@ def _push_parquet_shards_to_hub(
55755575
uploaded_size (`int`): number of uploaded bytes to the repository
55765576
dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression
55775577
"""
5578+
from .arrow_writer import get_writer_batch_size_from_data_size, get_writer_batch_size_from_features
5579+
55785580
dataset_nbytes = self._estimate_nbytes()
5581+
writer_batch_size = get_writer_batch_size_from_features(self.features) or get_writer_batch_size_from_data_size(
5582+
len(self), dataset_nbytes
5583+
)
55795584

55805585
# Find decodable columns, because if there are any, we need to:
55815586
# embed the bytes from the files in the shards
@@ -5607,6 +5612,7 @@ def _push_parquet_shards_to_hub(
56075612
"create_pr": create_pr,
56085613
"num_shards": num_shards,
56095614
"embed_external_files": embed_external_files,
5615+
"writer_batch_size": writer_batch_size,
56105616
}
56115617
for job_id in range(num_jobs)
56125618
]

src/datasets/arrow_writer.py

Lines changed: 111 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,67 @@
4343
from .keyhash import DuplicatedKeysError, KeyHasher
4444
from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast
4545
from .utils import logging
46-
from .utils.py_utils import asdict, first_non_null_non_empty_value
46+
from .utils.py_utils import asdict, convert_file_size_to_int, first_non_null_non_empty_value
4747

4848

4949
logger = logging.get_logger(__name__)
5050

5151
type_ = type # keep python's type function
5252

5353

54-
def get_writer_batch_size(features: Optional[Features]) -> Optional[int]:
54+
def get_arrow_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]:
5555
"""
56-
Get the writer_batch_size that defines the maximum row group size in the parquet files.
57-
The default in `datasets` is 1,000 but we lower it to 100 for image/audio datasets and 10 for videos.
56+
Get the writer_batch_size that defines the maximum record batch size in the arrow files based on configuration values.
57+
The default value is 100 for image/audio datasets and 10 for videos.
58+
This allows to avoid overflows in arrow buffers.
59+
60+
Args:
61+
features (`datasets.Features` or `None`):
62+
Dataset Features from `datasets`.
63+
Returns:
64+
writer_batch_size (`Optional[int]`):
65+
Writer batch size to pass to a dataset builder.
66+
If `None`, then it will use the `datasets` default, i.e. `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
67+
"""
68+
if not features:
69+
return None
70+
71+
batch_size = np.inf
72+
73+
def set_batch_size(feature: FeatureType) -> None:
74+
nonlocal batch_size
75+
if isinstance(feature, Image) and config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS is not None:
76+
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS)
77+
elif isinstance(feature, Audio) and config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS is not None:
78+
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS)
79+
elif isinstance(feature, Video) and config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS is not None:
80+
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS)
81+
elif (
82+
isinstance(feature, Value)
83+
and feature.dtype == "binary"
84+
and config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS is not None
85+
):
86+
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS)
87+
88+
_visit(features, set_batch_size)
89+
90+
return None if batch_size is np.inf else batch_size
91+
92+
93+
def get_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]:
94+
"""
95+
Get the writer_batch_size that defines the maximum row group size in the parquet files based on configuration values.
96+
By default these are not set, but it can be helpful to hard set those values in some cases.
5897
This allows to optimize random access to parquet file, since accessing 1 row requires
5998
to read its entire row group.
6099
61-
This can be improved to get optimized size for querying/iterating
62-
but at least it matches the dataset viewer expectations on HF.
63-
64100
Args:
65101
features (`datasets.Features` or `None`):
66102
Dataset Features from `datasets`.
67103
Returns:
68104
writer_batch_size (`Optional[int]`):
69-
Writer batch size to pass to a dataset builder.
70-
If `None`, then it will use the `datasets` default.
105+
Writer batch size to pass to a parquet writer.
106+
If `None`, then it will use the `datasets` default, i.e. aiming for row groups of 100MB.
71107
"""
72108
if not features:
73109
return None
@@ -76,20 +112,48 @@ def get_writer_batch_size(features: Optional[Features]) -> Optional[int]:
76112

77113
def set_batch_size(feature: FeatureType) -> None:
78114
nonlocal batch_size
79-
if isinstance(feature, Image):
115+
if isinstance(feature, Image) and config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS is not None:
80116
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS)
81-
elif isinstance(feature, Audio):
117+
elif isinstance(feature, Audio) and config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS is not None:
82118
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS)
83-
elif isinstance(feature, Video):
119+
elif isinstance(feature, Video) and config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS is not None:
84120
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS)
85-
elif isinstance(feature, Value) and feature.dtype == "binary":
121+
elif (
122+
isinstance(feature, Value)
123+
and feature.dtype == "binary"
124+
and config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS is not None
125+
):
86126
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS)
87127

88128
_visit(features, set_batch_size)
89129

90130
return None if batch_size is np.inf else batch_size
91131

92132

133+
def get_writer_batch_size_from_data_size(num_rows: int, num_bytes: int) -> int:
134+
"""
135+
Get the writer_batch_size that defines the maximum row group size in the parquet files.
136+
The default in `datasets` is aiming for row groups of maximum 100MB uncompressed.
137+
This allows to optimize random access to parquet file, since accessing 1 row requires
138+
to read its entire row group.
139+
140+
This can be improved to get optimized size for querying/iterating
141+
but at least it matches the dataset viewer expectations on HF.
142+
143+
Args:
144+
num_rows (`int`):
145+
Number of rows in the dataset.
146+
num_bytes (`int`):
147+
Number of bytes in the dataset.
148+
For dataset with external files to embed (image, audio, videos), this can also be an
149+
estimate from `dataset._estimate_nbytes()`.
150+
Returns:
151+
writer_batch_size (`Optional[int]`):
152+
Writer batch size to pass to a parquet writer.
153+
"""
154+
return max(10, num_rows * convert_file_size_to_int(config.MAX_ROW_GROUP_SIZE) // num_bytes)
155+
156+
93157
class SchemaInferenceError(ValueError):
94158
pass
95159

@@ -342,8 +406,6 @@ def __init__(
342406
class ArrowWriter:
343407
"""Shuffles and writes Examples to Arrow files."""
344408

345-
_WRITER_CLASS = pa.RecordBatchStreamWriter
346-
347409
def __init__(
348410
self,
349411
schema: Optional[pa.Schema] = None,
@@ -397,7 +459,9 @@ def __init__(
397459
self.fingerprint = fingerprint
398460
self.disable_nullable = disable_nullable
399461
self.writer_batch_size = (
400-
writer_batch_size or get_writer_batch_size(self._features) or config.DEFAULT_MAX_BATCH_SIZE
462+
writer_batch_size
463+
or get_arrow_writer_batch_size_from_features(self._features)
464+
or config.DEFAULT_MAX_BATCH_SIZE
401465
)
402466
self.update_features = update_features
403467
self.with_metadata = with_metadata
@@ -431,8 +495,9 @@ def close(self):
431495
if self._closable_stream and not self.stream.closed:
432496
self.stream.close() # This also closes self.pa_writer if it is opened
433497

434-
def _build_writer(self, inferred_schema: pa.Schema):
498+
def _build_schema(self, inferred_schema: pa.Schema):
435499
schema = self.schema
500+
features = self._features
436501
inferred_features = Features.from_arrow_schema(inferred_schema)
437502
if self._features is not None:
438503
if self.update_features: # keep original features it they match, or update them
@@ -442,19 +507,24 @@ def _build_writer(self, inferred_schema: pa.Schema):
442507
if name in fields:
443508
if inferred_field == fields[name]:
444509
inferred_features[name] = self._features[name]
445-
self._features = inferred_features
510+
features = inferred_features
446511
schema: pa.Schema = inferred_schema
447512
else:
448-
self._features = inferred_features
513+
features = inferred_features
449514
schema: pa.Schema = inferred_features.arrow_schema
515+
450516
if self.disable_nullable:
451517
schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in schema)
452518
if self.with_metadata:
453-
schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=self._features), self.fingerprint))
519+
schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=features), self.fingerprint))
454520
else:
455521
schema = schema.with_metadata({})
456-
self._schema = schema
457-
self.pa_writer = self._WRITER_CLASS(self.stream, schema)
522+
523+
return schema, features
524+
525+
def _build_writer(self, inferred_schema: pa.Schema):
526+
self._schema, self._features = self._build_schema(inferred_schema)
527+
self.pa_writer = pa.RecordBatchStreamWriter(self.stream, self._schema)
458528

459529
@property
460530
def schema(self):
@@ -675,4 +745,22 @@ def finalize(self, close_stream=True):
675745

676746

677747
class ParquetWriter(ArrowWriter):
678-
_WRITER_CLASS = pq.ParquetWriter
748+
def __init__(self, *args, use_content_defined_chunking=True, write_page_index=True, **kwargs):
749+
super().__init__(*args, **kwargs)
750+
if use_content_defined_chunking is True:
751+
use_content_defined_chunking = config.DEFAULT_CDC_OPTIONS
752+
self.use_content_defined_chunking = use_content_defined_chunking
753+
self.write_page_index = write_page_index
754+
755+
def _build_writer(self, inferred_schema: pa.Schema):
756+
self._schema, self._features = self._build_schema(inferred_schema)
757+
self.pa_writer = pq.ParquetWriter(
758+
self.stream,
759+
self._schema,
760+
use_content_defined_chunking=self.use_content_defined_chunking,
761+
write_page_index=self.write_page_index,
762+
)
763+
if self.use_content_defined_chunking is not False:
764+
self.pa_writer.add_key_value_metadata(
765+
{"content_defined_chunking": json.dumps(self.use_content_defined_chunking)}
766+
)

src/datasets/config.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,17 +185,28 @@
185185
# https://github.com/apache/arrow/blob/master/docs/source/cpp/arrays.rst#size-limitations-and-recommendations)
186186
DEFAULT_MAX_BATCH_SIZE = 1000
187187

188+
DEFAULT_CDC_OPTIONS = {"min_chunk_size": 256 * 1024, "max_chunk_size": 1024 * 1024, "norm_level": 0}
189+
188190
# Size of the preloaded record batch in `Dataset.__iter__`
189191
ARROW_READER_BATCH_SIZE_IN_DATASET_ITER = 10
190192

191-
# Max shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare)
193+
# Max uncompressed shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare)
192194
MAX_SHARD_SIZE = "500MB"
193195

196+
# Max uncompressed row group size in bytes (e.g. for parquet files in push_to_hub or download_and_prepare)
197+
MAX_ROW_GROUP_SIZE = "100MB"
198+
194199
# Parquet configuration
195-
PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = 100
196-
PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = 100
197-
PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = 100
198-
PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = 10
200+
PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = None
201+
PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = None
202+
PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = None
203+
PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = None
204+
205+
# Arrow configuration
206+
ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS = 100
207+
ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS = 100
208+
ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS = 100
209+
ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS = 10
199210

200211
# Offline mode
201212
_offline = os.environ.get("HF_DATASETS_OFFLINE")

0 commit comments

Comments
 (0)