Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fe67bb7
feat: use content defined chunking in `io.parquet.ParquetDatasetReader`
kszucs May 29, 2025
66d77d9
ci: use nightly pyarrow wheels
kszucs May 30, 2025
9f866a4
feat: use content defined chunking in `arrow_writer.ParquetWriter`
kszucs May 30, 2025
51c3135
ci: try to pass the pyarrow nightly wheel as additional arg to the do…
kszucs May 30, 2025
a7fcd4a
ci: trigger builds on push no matter the branch
kszucs May 30, 2025
ff733a3
ci: pass the extra index url in the check_code_quality job
kszucs May 30, 2025
ac148b4
ci: specify pyarrow constraint as pyarrow>=21.0.0.dev
kszucs May 30, 2025
a9fda12
ci: restore branch filters in the doc build workflow
kszucs May 30, 2025
7c66ba1
ci: install pyarrow from nightly channel in a separate command
kszucs May 30, 2025
6de8d5d
ci: missing --system for uv pip install
kszucs May 30, 2025
ae35e1e
chore: initialize features variable
kszucs May 30, 2025
9e9939c
chore: set the default max batch size to pyarrow\'s default
kszucs Jun 17, 2025
565d260
chore: rename cdc_options argument to use_content_defined_chunking
kszucs Jun 17, 2025
ee1e73b
chore: always store the cdc parameters as metadata
kszucs Jul 25, 2025
b306a37
test: cover more input parameter values for ParquetWriter
kszucs Jul 25, 2025
00a8c54
test: cover more input parameter values for ParquetDatasetWriter
kszucs Jul 25, 2025
e0facc0
ci: pin arrow=21.0.0 since it is released now; restore CI workarounds
kszucs Jul 25, 2025
2b3acd5
chore: use Union typehint
kszucs Jul 25, 2025
b46d13a
100MB per row group
lhoestq Sep 4, 2025
e87fb7e
fix tests
lhoestq Sep 8, 2025
87ff53b
again
lhoestq Sep 8, 2025
1fbff05
write page index
lhoestq Sep 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
run: uv pip install --system --upgrade pyarrow huggingface-hub "dill<0.3.9"
- name: Install dependencies (minimum versions)
if: ${{ matrix.deps_versions != 'deps-latest' }}
run: uv pip install --system pyarrow==15.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1
run: uv pip install --system pyarrow==21.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1
- name: Test with pytest
run: |
python -m pytest -rfExX -m ${{ matrix.test }} -n 2 --dist loadfile -sv ./tests/
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
# We use numpy>=1.17 to have np.random.Generator (Dataset shuffling)
"numpy>=1.17",
# Backend and serialization.
# Minimum 15.0.0 to be able to cast dictionary types to their underlying types
"pyarrow>=15.0.0",
# Minimum 21.0.0 to support `use_content_defined_chunking` in ParquetWriter
"pyarrow>=21.0.0",
# For smart caching dataset processing
"dill>=0.3.0,<0.3.9", # tmp pin until dill has official support for determinism see https://github.com/uqfoundation/dill/issues/19
# For performance gains with apache arrow
Expand Down
20 changes: 13 additions & 7 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5245,7 +5245,8 @@ def to_parquet(
or a BinaryIO, where the dataset will be saved to in the specified format.
batch_size (`int`, *optional*):
Size of the batch to load in memory and write at once.
Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
By default it aims for row groups with maximum uncompressed byte size of "100MB",
defined by `datasets.config.MAX_ROW_GROUP_SIZE`.
storage_options (`dict`, *optional*):
Key/value pairs to be passed on to the file-system backend, if any.

Expand Down Expand Up @@ -5343,11 +5344,11 @@ def extra_nbytes_visitor(array, feature):
table = self.with_format("arrow")[:1000]
table_visitor(table, extra_nbytes_visitor)

extra_nbytes = extra_nbytes * len(self.data) / len(table)
extra_nbytes = extra_nbytes * len(self.data) // len(table)
dataset_nbytes = dataset_nbytes + extra_nbytes

if self._indices is not None:
dataset_nbytes = dataset_nbytes * len(self._indices) / len(self.data)
dataset_nbytes = dataset_nbytes * len(self._indices) // len(self.data)
return dataset_nbytes

@staticmethod
Expand Down Expand Up @@ -5498,6 +5499,7 @@ def _push_parquet_shards_to_hub_single(
create_pr: Optional[bool],
num_shards: int,
embed_external_files: bool,
writer_batch_size: int,
):
div = num_shards // num_jobs
mod = num_shards % num_jobs
Expand All @@ -5514,20 +5516,18 @@ def _push_parquet_shards_to_hub_single(
additions: list[CommitOperationAdd] = []
for index, shard in index_shards:
if embed_external_files:
from .io.parquet import get_writer_batch_size

format = shard.format
shard = shard.with_format("arrow")
shard = shard.map(
embed_table_storage,
batched=True,
batch_size=get_writer_batch_size(shard.features),
batch_size=writer_batch_size,
keep_in_memory=True,
)
shard = shard.with_format(**format)
shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet"
buffer = BytesIO()
shard.to_parquet(buffer)
shard.to_parquet(buffer, batch_size=writer_batch_size)
parquet_content = buffer.getvalue()
uploaded_size += len(parquet_content)
del buffer
Expand Down Expand Up @@ -5564,7 +5564,12 @@ def _push_parquet_shards_to_hub(
uploaded_size (`int`): number of uploaded bytes to the repository
dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression
"""
from .arrow_writer import get_writer_batch_size_from_data_size, get_writer_batch_size_from_features

dataset_nbytes = self._estimate_nbytes()
writer_batch_size = get_writer_batch_size_from_features(self.features) or get_writer_batch_size_from_data_size(
len(self), dataset_nbytes
)

# Find decodable columns, because if there are any, we need to:
# embed the bytes from the files in the shards
Expand Down Expand Up @@ -5596,6 +5601,7 @@ def _push_parquet_shards_to_hub(
"create_pr": create_pr,
"num_shards": num_shards,
"embed_external_files": embed_external_files,
"writer_batch_size": writer_batch_size,
}
for job_id in range(num_jobs)
]
Expand Down
134 changes: 111 additions & 23 deletions src/datasets/arrow_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,67 @@
from .keyhash import DuplicatedKeysError, KeyHasher
from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast
from .utils import logging
from .utils.py_utils import asdict, first_non_null_non_empty_value
from .utils.py_utils import asdict, convert_file_size_to_int, first_non_null_non_empty_value


logger = logging.get_logger(__name__)

type_ = type # keep python's type function


def get_writer_batch_size(features: Optional[Features]) -> Optional[int]:
def get_arrow_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]:
"""
Get the writer_batch_size that defines the maximum row group size in the parquet files.
The default in `datasets` is 1,000 but we lower it to 100 for image/audio datasets and 10 for videos.
Get the writer_batch_size that defines the maximum record batch size in the arrow files based on configuration values.
The default value is 100 for image/audio datasets and 10 for videos.
This allows to avoid overflows in arrow buffers.

Args:
features (`datasets.Features` or `None`):
Dataset Features from `datasets`.
Returns:
writer_batch_size (`Optional[int]`):
Writer batch size to pass to a dataset builder.
If `None`, then it will use the `datasets` default, i.e. `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
"""
if not features:
return None

batch_size = np.inf

def set_batch_size(feature: FeatureType) -> None:
nonlocal batch_size
if isinstance(feature, Image) and config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS is not None:
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS)
elif isinstance(feature, Audio) and config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS is not None:
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS)
elif isinstance(feature, Video) and config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS is not None:
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS)
elif (
isinstance(feature, Value)
and feature.dtype == "binary"
and config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS is not None
):
batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS)

_visit(features, set_batch_size)

return None if batch_size is np.inf else batch_size


def get_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]:
"""
Get the writer_batch_size that defines the maximum row group size in the parquet files based on configuration values.
By default these are not set, but it can be helpful to hard set those values in some cases.
This allows to optimize random access to parquet file, since accessing 1 row requires
to read its entire row group.

This can be improved to get optimized size for querying/iterating
but at least it matches the dataset viewer expectations on HF.

Args:
features (`datasets.Features` or `None`):
Dataset Features from `datasets`.
Returns:
writer_batch_size (`Optional[int]`):
Writer batch size to pass to a dataset builder.
If `None`, then it will use the `datasets` default.
Writer batch size to pass to a parquet writer.
If `None`, then it will use the `datasets` default, i.e. aiming for row groups of 100MB.
"""
if not features:
return None
Expand All @@ -76,20 +112,48 @@ def get_writer_batch_size(features: Optional[Features]) -> Optional[int]:

def set_batch_size(feature: FeatureType) -> None:
nonlocal batch_size
if isinstance(feature, Image):
if isinstance(feature, Image) and config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS is not None:
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS)
elif isinstance(feature, Audio):
elif isinstance(feature, Audio) and config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS is not None:
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS)
elif isinstance(feature, Video):
elif isinstance(feature, Video) and config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS is not None:
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS)
elif isinstance(feature, Value) and feature.dtype == "binary":
elif (
isinstance(feature, Value)
and feature.dtype == "binary"
and config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS is not None
):
batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS)

_visit(features, set_batch_size)

return None if batch_size is np.inf else batch_size


def get_writer_batch_size_from_data_size(num_rows: int, num_bytes: int) -> int:
"""
Get the writer_batch_size that defines the maximum row group size in the parquet files.
The default in `datasets` is aiming for row groups of maximum 100MB uncompressed.
This allows to optimize random access to parquet file, since accessing 1 row requires
to read its entire row group.

This can be improved to get optimized size for querying/iterating
but at least it matches the dataset viewer expectations on HF.

Args:
num_rows (`int`):
Number of rows in the dataset.
num_bytes (`int`):
Number of bytes in the dataset.
For dataset with external files to embed (image, audio, videos), this can also be an
estimate from `dataset._estimate_nbytes()`.
Returns:
writer_batch_size (`Optional[int]`):
Writer batch size to pass to a parquet writer.
"""
return max(10, num_rows * convert_file_size_to_int(config.MAX_ROW_GROUP_SIZE) // num_bytes)


class SchemaInferenceError(ValueError):
pass

Expand Down Expand Up @@ -342,8 +406,6 @@ def __init__(
class ArrowWriter:
"""Shuffles and writes Examples to Arrow files."""

_WRITER_CLASS = pa.RecordBatchStreamWriter

def __init__(
self,
schema: Optional[pa.Schema] = None,
Expand Down Expand Up @@ -397,7 +459,9 @@ def __init__(
self.fingerprint = fingerprint
self.disable_nullable = disable_nullable
self.writer_batch_size = (
writer_batch_size or get_writer_batch_size(self._features) or config.DEFAULT_MAX_BATCH_SIZE
writer_batch_size
or get_arrow_writer_batch_size_from_features(self._features)
or config.DEFAULT_MAX_BATCH_SIZE
)
self.update_features = update_features
self.with_metadata = with_metadata
Expand Down Expand Up @@ -431,8 +495,9 @@ def close(self):
if self._closable_stream and not self.stream.closed:
self.stream.close() # This also closes self.pa_writer if it is opened

def _build_writer(self, inferred_schema: pa.Schema):
def _build_schema(self, inferred_schema: pa.Schema):
schema = self.schema
features = self._features
inferred_features = Features.from_arrow_schema(inferred_schema)
if self._features is not None:
if self.update_features: # keep original features it they match, or update them
Expand All @@ -442,19 +507,24 @@ def _build_writer(self, inferred_schema: pa.Schema):
if name in fields:
if inferred_field == fields[name]:
inferred_features[name] = self._features[name]
self._features = inferred_features
features = inferred_features
schema: pa.Schema = inferred_schema
else:
self._features = inferred_features
features = inferred_features
schema: pa.Schema = inferred_features.arrow_schema

if self.disable_nullable:
schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in schema)
if self.with_metadata:
schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=self._features), self.fingerprint))
schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=features), self.fingerprint))
else:
schema = schema.with_metadata({})
self._schema = schema
self.pa_writer = self._WRITER_CLASS(self.stream, schema)

return schema, features

def _build_writer(self, inferred_schema: pa.Schema):
self._schema, self._features = self._build_schema(inferred_schema)
self.pa_writer = pa.RecordBatchStreamWriter(self.stream, self._schema)

@property
def schema(self):
Expand Down Expand Up @@ -675,4 +745,22 @@ def finalize(self, close_stream=True):


class ParquetWriter(ArrowWriter):
_WRITER_CLASS = pq.ParquetWriter
def __init__(self, *args, use_content_defined_chunking=True, write_page_index=True, **kwargs):
super().__init__(*args, **kwargs)
if use_content_defined_chunking is True:
use_content_defined_chunking = config.DEFAULT_CDC_OPTIONS
self.use_content_defined_chunking = use_content_defined_chunking
self.write_page_index = write_page_index

def _build_writer(self, inferred_schema: pa.Schema):
self._schema, self._features = self._build_schema(inferred_schema)
self.pa_writer = pq.ParquetWriter(
self.stream,
self._schema,
use_content_defined_chunking=self.use_content_defined_chunking,
write_page_index=self.write_page_index,
)
if self.use_content_defined_chunking is not False:
self.pa_writer.add_key_value_metadata(
{"content_defined_chunking": json.dumps(self.use_content_defined_chunking)}
)
21 changes: 16 additions & 5 deletions src/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,28 @@
# https://github.com/apache/arrow/blob/master/docs/source/cpp/arrays.rst#size-limitations-and-recommendations)
DEFAULT_MAX_BATCH_SIZE = 1000

DEFAULT_CDC_OPTIONS = {"min_chunk_size": 256 * 1024, "max_chunk_size": 1024 * 1024, "norm_level": 0}

# Size of the preloaded record batch in `Dataset.__iter__`
ARROW_READER_BATCH_SIZE_IN_DATASET_ITER = 10

# Max shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare)
# Max uncompressed shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare)
MAX_SHARD_SIZE = "500MB"

# Max uncompressed row group size in bytes (e.g. for parquet files in push_to_hub or download_and_prepare)
MAX_ROW_GROUP_SIZE = "100MB"

# Parquet configuration
PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = 100
PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = 100
PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = 100
PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = 10
PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = None
PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = None
PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = None
PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = None

# Arrow configuration
ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS = 100
ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS = 100
ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS = 100
ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS = 10

# Offline mode
_offline = os.environ.get("HF_DATASETS_OFFLINE")
Expand Down
Loading
Loading