diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a4a9041fd28..5bbf0c9b40f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: run: uv pip install --system --upgrade pyarrow huggingface-hub "dill<0.3.9" - name: Install dependencies (minimum versions) if: ${{ matrix.deps_versions != 'deps-latest' }} - run: uv pip install --system pyarrow==15.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1 + run: uv pip install --system pyarrow==21.0.0 huggingface-hub==0.24.7 transformers dill==0.3.1.1 - name: Test with pytest run: | python -m pytest -rfExX -m ${{ matrix.test }} -n 2 --dist loadfile -sv ./tests/ diff --git a/setup.py b/setup.py index bbf9da7215d..3209ab76c9e 100644 --- a/setup.py +++ b/setup.py @@ -110,8 +110,8 @@ # We use numpy>=1.17 to have np.random.Generator (Dataset shuffling) "numpy>=1.17", # Backend and serialization. - # Minimum 15.0.0 to be able to cast dictionary types to their underlying types - "pyarrow>=15.0.0", + # Minimum 21.0.0 to support `use_content_defined_chunking` in ParquetWriter + "pyarrow>=21.0.0", # For smart caching dataset processing "dill>=0.3.0,<0.3.9", # tmp pin until dill has official support for determinism see https://github.com/uqfoundation/dill/issues/19 # For performance gains with apache arrow diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index 81dbe7634b8..93dc414cb76 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -5245,7 +5245,8 @@ def to_parquet( or a BinaryIO, where the dataset will be saved to in the specified format. batch_size (`int`, *optional*): Size of the batch to load in memory and write at once. - Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + By default it aims for row groups with maximum uncompressed byte size of "100MB", + defined by `datasets.config.MAX_ROW_GROUP_SIZE`. storage_options (`dict`, *optional*): Key/value pairs to be passed on to the file-system backend, if any. @@ -5343,11 +5344,11 @@ def extra_nbytes_visitor(array, feature): table = self.with_format("arrow")[:1000] table_visitor(table, extra_nbytes_visitor) - extra_nbytes = extra_nbytes * len(self.data) / len(table) + extra_nbytes = extra_nbytes * len(self.data) // len(table) dataset_nbytes = dataset_nbytes + extra_nbytes if self._indices is not None: - dataset_nbytes = dataset_nbytes * len(self._indices) / len(self.data) + dataset_nbytes = dataset_nbytes * len(self._indices) // len(self.data) return dataset_nbytes @staticmethod @@ -5498,6 +5499,7 @@ def _push_parquet_shards_to_hub_single( create_pr: Optional[bool], num_shards: int, embed_external_files: bool, + writer_batch_size: int, ): div = num_shards // num_jobs mod = num_shards % num_jobs @@ -5514,20 +5516,18 @@ def _push_parquet_shards_to_hub_single( additions: list[CommitOperationAdd] = [] for index, shard in index_shards: if embed_external_files: - from .io.parquet import get_writer_batch_size - format = shard.format shard = shard.with_format("arrow") shard = shard.map( embed_table_storage, batched=True, - batch_size=get_writer_batch_size(shard.features), + batch_size=writer_batch_size, keep_in_memory=True, ) shard = shard.with_format(**format) shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet" buffer = BytesIO() - shard.to_parquet(buffer) + shard.to_parquet(buffer, batch_size=writer_batch_size) parquet_content = buffer.getvalue() uploaded_size += len(parquet_content) del buffer @@ -5564,7 +5564,12 @@ def _push_parquet_shards_to_hub( uploaded_size (`int`): number of uploaded bytes to the repository dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression """ + from .arrow_writer import get_writer_batch_size_from_data_size, get_writer_batch_size_from_features + dataset_nbytes = self._estimate_nbytes() + writer_batch_size = get_writer_batch_size_from_features(self.features) or get_writer_batch_size_from_data_size( + len(self), dataset_nbytes + ) # Find decodable columns, because if there are any, we need to: # embed the bytes from the files in the shards @@ -5596,6 +5601,7 @@ def _push_parquet_shards_to_hub( "create_pr": create_pr, "num_shards": num_shards, "embed_external_files": embed_external_files, + "writer_batch_size": writer_batch_size, } for job_id in range(num_jobs) ] diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 72fba8d54a7..2fb9e97ce43 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -43,7 +43,7 @@ from .keyhash import DuplicatedKeysError, KeyHasher from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast from .utils import logging -from .utils.py_utils import asdict, first_non_null_non_empty_value +from .utils.py_utils import asdict, convert_file_size_to_int, first_non_null_non_empty_value logger = logging.get_logger(__name__) @@ -51,23 +51,59 @@ type_ = type # keep python's type function -def get_writer_batch_size(features: Optional[Features]) -> Optional[int]: +def get_arrow_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]: """ - Get the writer_batch_size that defines the maximum row group size in the parquet files. - The default in `datasets` is 1,000 but we lower it to 100 for image/audio datasets and 10 for videos. + Get the writer_batch_size that defines the maximum record batch size in the arrow files based on configuration values. + The default value is 100 for image/audio datasets and 10 for videos. + This allows to avoid overflows in arrow buffers. + + Args: + features (`datasets.Features` or `None`): + Dataset Features from `datasets`. + Returns: + writer_batch_size (`Optional[int]`): + Writer batch size to pass to a dataset builder. + If `None`, then it will use the `datasets` default, i.e. `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + """ + if not features: + return None + + batch_size = np.inf + + def set_batch_size(feature: FeatureType) -> None: + nonlocal batch_size + if isinstance(feature, Image) and config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS is not None: + batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS) + elif isinstance(feature, Audio) and config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS is not None: + batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS) + elif isinstance(feature, Video) and config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS is not None: + batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS) + elif ( + isinstance(feature, Value) + and feature.dtype == "binary" + and config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS is not None + ): + batch_size = min(batch_size, config.ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS) + + _visit(features, set_batch_size) + + return None if batch_size is np.inf else batch_size + + +def get_writer_batch_size_from_features(features: Optional[Features]) -> Optional[int]: + """ + Get the writer_batch_size that defines the maximum row group size in the parquet files based on configuration values. + By default these are not set, but it can be helpful to hard set those values in some cases. This allows to optimize random access to parquet file, since accessing 1 row requires to read its entire row group. - This can be improved to get optimized size for querying/iterating - but at least it matches the dataset viewer expectations on HF. - Args: features (`datasets.Features` or `None`): Dataset Features from `datasets`. Returns: writer_batch_size (`Optional[int]`): - Writer batch size to pass to a dataset builder. - If `None`, then it will use the `datasets` default. + Writer batch size to pass to a parquet writer. + If `None`, then it will use the `datasets` default, i.e. aiming for row groups of 100MB. """ if not features: return None @@ -76,13 +112,17 @@ def get_writer_batch_size(features: Optional[Features]) -> Optional[int]: def set_batch_size(feature: FeatureType) -> None: nonlocal batch_size - if isinstance(feature, Image): + if isinstance(feature, Image) and config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS is not None: batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS) - elif isinstance(feature, Audio): + elif isinstance(feature, Audio) and config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS is not None: batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS) - elif isinstance(feature, Video): + elif isinstance(feature, Video) and config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS is not None: batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS) - elif isinstance(feature, Value) and feature.dtype == "binary": + elif ( + isinstance(feature, Value) + and feature.dtype == "binary" + and config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS is not None + ): batch_size = min(batch_size, config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS) _visit(features, set_batch_size) @@ -90,6 +130,30 @@ def set_batch_size(feature: FeatureType) -> None: return None if batch_size is np.inf else batch_size +def get_writer_batch_size_from_data_size(num_rows: int, num_bytes: int) -> int: + """ + Get the writer_batch_size that defines the maximum row group size in the parquet files. + The default in `datasets` is aiming for row groups of maximum 100MB uncompressed. + This allows to optimize random access to parquet file, since accessing 1 row requires + to read its entire row group. + + This can be improved to get optimized size for querying/iterating + but at least it matches the dataset viewer expectations on HF. + + Args: + num_rows (`int`): + Number of rows in the dataset. + num_bytes (`int`): + Number of bytes in the dataset. + For dataset with external files to embed (image, audio, videos), this can also be an + estimate from `dataset._estimate_nbytes()`. + Returns: + writer_batch_size (`Optional[int]`): + Writer batch size to pass to a parquet writer. + """ + return max(10, num_rows * convert_file_size_to_int(config.MAX_ROW_GROUP_SIZE) // num_bytes) + + class SchemaInferenceError(ValueError): pass @@ -342,8 +406,6 @@ def __init__( class ArrowWriter: """Shuffles and writes Examples to Arrow files.""" - _WRITER_CLASS = pa.RecordBatchStreamWriter - def __init__( self, schema: Optional[pa.Schema] = None, @@ -397,7 +459,9 @@ def __init__( self.fingerprint = fingerprint self.disable_nullable = disable_nullable self.writer_batch_size = ( - writer_batch_size or get_writer_batch_size(self._features) or config.DEFAULT_MAX_BATCH_SIZE + writer_batch_size + or get_arrow_writer_batch_size_from_features(self._features) + or config.DEFAULT_MAX_BATCH_SIZE ) self.update_features = update_features self.with_metadata = with_metadata @@ -431,8 +495,9 @@ def close(self): if self._closable_stream and not self.stream.closed: self.stream.close() # This also closes self.pa_writer if it is opened - def _build_writer(self, inferred_schema: pa.Schema): + def _build_schema(self, inferred_schema: pa.Schema): schema = self.schema + features = self._features inferred_features = Features.from_arrow_schema(inferred_schema) if self._features is not None: if self.update_features: # keep original features it they match, or update them @@ -442,19 +507,24 @@ def _build_writer(self, inferred_schema: pa.Schema): if name in fields: if inferred_field == fields[name]: inferred_features[name] = self._features[name] - self._features = inferred_features + features = inferred_features schema: pa.Schema = inferred_schema else: - self._features = inferred_features + features = inferred_features schema: pa.Schema = inferred_features.arrow_schema + if self.disable_nullable: schema = pa.schema(pa.field(field.name, field.type, nullable=False) for field in schema) if self.with_metadata: - schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=self._features), self.fingerprint)) + schema = schema.with_metadata(self._build_metadata(DatasetInfo(features=features), self.fingerprint)) else: schema = schema.with_metadata({}) - self._schema = schema - self.pa_writer = self._WRITER_CLASS(self.stream, schema) + + return schema, features + + def _build_writer(self, inferred_schema: pa.Schema): + self._schema, self._features = self._build_schema(inferred_schema) + self.pa_writer = pa.RecordBatchStreamWriter(self.stream, self._schema) @property def schema(self): @@ -675,4 +745,22 @@ def finalize(self, close_stream=True): class ParquetWriter(ArrowWriter): - _WRITER_CLASS = pq.ParquetWriter + def __init__(self, *args, use_content_defined_chunking=True, write_page_index=True, **kwargs): + super().__init__(*args, **kwargs) + if use_content_defined_chunking is True: + use_content_defined_chunking = config.DEFAULT_CDC_OPTIONS + self.use_content_defined_chunking = use_content_defined_chunking + self.write_page_index = write_page_index + + def _build_writer(self, inferred_schema: pa.Schema): + self._schema, self._features = self._build_schema(inferred_schema) + self.pa_writer = pq.ParquetWriter( + self.stream, + self._schema, + use_content_defined_chunking=self.use_content_defined_chunking, + write_page_index=self.write_page_index, + ) + if self.use_content_defined_chunking is not False: + self.pa_writer.add_key_value_metadata( + {"content_defined_chunking": json.dumps(self.use_content_defined_chunking)} + ) diff --git a/src/datasets/config.py b/src/datasets/config.py index 69e9e13ba86..02b3ea02f03 100644 --- a/src/datasets/config.py +++ b/src/datasets/config.py @@ -185,17 +185,28 @@ # https://github.com/apache/arrow/blob/master/docs/source/cpp/arrays.rst#size-limitations-and-recommendations) DEFAULT_MAX_BATCH_SIZE = 1000 +DEFAULT_CDC_OPTIONS = {"min_chunk_size": 256 * 1024, "max_chunk_size": 1024 * 1024, "norm_level": 0} + # Size of the preloaded record batch in `Dataset.__iter__` ARROW_READER_BATCH_SIZE_IN_DATASET_ITER = 10 -# Max shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare) +# Max uncompressed shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare) MAX_SHARD_SIZE = "500MB" +# Max uncompressed row group size in bytes (e.g. for parquet files in push_to_hub or download_and_prepare) +MAX_ROW_GROUP_SIZE = "100MB" + # Parquet configuration -PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = 100 -PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = 100 -PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = 100 -PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = 10 +PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = None +PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = None +PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = None +PARQUET_ROW_GROUP_SIZE_FOR_VIDEO_DATASETS = None + +# Arrow configuration +ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS = 100 +ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS = 100 +ARROW_RECORD_BATCH_SIZE_FOR_BINARY_DATASETS = 100 +ARROW_RECORD_BATCH_SIZE_FOR_VIDEO_DATASETS = 10 # Offline mode _offline = os.environ.get("HF_DATASETS_OFFLINE") diff --git a/src/datasets/io/parquet.py b/src/datasets/io/parquet.py index d34f5110204..b7b13b6b8a4 100644 --- a/src/datasets/io/parquet.py +++ b/src/datasets/io/parquet.py @@ -1,3 +1,4 @@ +import json import os from typing import BinaryIO, Optional, Union @@ -5,7 +6,7 @@ import pyarrow.parquet as pq from .. import Dataset, Features, NamedSplit, config -from ..arrow_writer import get_writer_batch_size +from ..arrow_writer import get_writer_batch_size_from_data_size, get_writer_batch_size_from_features from ..formatting import query_table from ..packaged_modules import _PACKAGED_DATASETS_MODULES from ..packaged_modules.parquet.parquet import Parquet @@ -77,22 +78,38 @@ def __init__( path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, storage_options: Optional[dict] = None, + use_content_defined_chunking: Union[bool, dict] = True, + write_page_index: bool = True, **parquet_writer_kwargs, ): self.dataset = dataset self.path_or_buf = path_or_buf - self.batch_size = batch_size or get_writer_batch_size(dataset.features) + self.batch_size = ( + batch_size + or get_writer_batch_size_from_features(dataset.features) + or get_writer_batch_size_from_data_size(len(dataset), dataset._estimate_nbytes()) + ) self.storage_options = storage_options or {} self.parquet_writer_kwargs = parquet_writer_kwargs + if use_content_defined_chunking is True: + use_content_defined_chunking = config.DEFAULT_CDC_OPTIONS + self.use_content_defined_chunking = use_content_defined_chunking + self.write_page_index = write_page_index def write(self) -> int: - batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE - if isinstance(self.path_or_buf, (str, bytes, os.PathLike)): with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer: - written = self._write(file_obj=buffer, batch_size=batch_size, **self.parquet_writer_kwargs) + written = self._write( + file_obj=buffer, + batch_size=self.batch_size, + **self.parquet_writer_kwargs, + ) else: - written = self._write(file_obj=self.path_or_buf, batch_size=batch_size, **self.parquet_writer_kwargs) + written = self._write( + file_obj=self.path_or_buf, + batch_size=self.batch_size, + **self.parquet_writer_kwargs, + ) return written def _write(self, file_obj: BinaryIO, batch_size: int, **parquet_writer_kwargs) -> int: @@ -104,7 +121,13 @@ def _write(self, file_obj: BinaryIO, batch_size: int, **parquet_writer_kwargs) - _ = parquet_writer_kwargs.pop("path_or_buf", None) schema = self.dataset.features.arrow_schema - writer = pq.ParquetWriter(file_obj, schema=schema, **parquet_writer_kwargs) + writer = pq.ParquetWriter( + file_obj, + schema=schema, + use_content_defined_chunking=self.use_content_defined_chunking, + write_page_index=self.write_page_index, + **parquet_writer_kwargs, + ) for offset in hf_tqdm( range(0, len(self.dataset), batch_size), @@ -118,5 +141,10 @@ def _write(self, file_obj: BinaryIO, batch_size: int, **parquet_writer_kwargs) - ) writer.write_table(batch) written += batch.nbytes + + # TODO(kszucs): we may want to persist multiple parameters + if self.use_content_defined_chunking is not False: + writer.add_key_value_metadata({"content_defined_chunking": json.dumps(self.use_content_defined_chunking)}) + writer.close() return written diff --git a/src/datasets/iterable_dataset.py b/src/datasets/iterable_dataset.py index 1550b8f2db2..07681d0cc6e 100644 --- a/src/datasets/iterable_dataset.py +++ b/src/datasets/iterable_dataset.py @@ -3741,12 +3741,12 @@ def to_parquet( ``` """ - from .io.parquet import get_writer_batch_size + from .arrow_writer import get_arrow_writer_batch_size_from_features - batch_size = get_writer_batch_size(self.features) + batch_size = get_arrow_writer_batch_size_from_features(self.features) or config.DEFAULT_MAX_BATCH_SIZE table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=batch_size))) return Dataset(table, fingerprint="unset").to_parquet( - path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs + path_or_buf, storage_options=storage_options, **parquet_writer_kwargs ) def _push_parquet_shards_to_hub_single( @@ -3762,7 +3762,7 @@ def _push_parquet_shards_to_hub_single( # max_shard_size: Optional[Union[int, str]] = None, # TODO(QL): add arg num_shards: int, embed_external_files: bool, - ) -> tuple[list[CommitOperationAdd], int, int]: + ) -> Iterable[tuple[list[CommitOperationAdd], int, int]]: """Pushes the dataset shards as Parquet files to the hub. Returns: @@ -3788,13 +3788,13 @@ def _push_parquet_shards_to_hub_single( additions: list[CommitOperationAdd] = [] for index, shard in index_shards: if embed_external_files: - from .io.parquet import get_writer_batch_size + from .arrow_writer import get_arrow_writer_batch_size_from_features shard = shard.with_format("arrow") shard = shard.map( partial(embed_table_storage, token_per_repo_id=self._token_per_repo_id), batched=True, - batch_size=get_writer_batch_size(shard.features), + batch_size=get_arrow_writer_batch_size_from_features(shard.features), ) shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet" buffer = BytesIO() diff --git a/tests/io/data/test_image_rgb.jpg b/tests/io/data/test_image_rgb.jpg new file mode 100644 index 00000000000..e131e8ecdf3 Binary files /dev/null and b/tests/io/data/test_image_rgb.jpg differ diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index c01781972f5..79edf30ee29 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -1,11 +1,15 @@ +import json +import unittest.mock + import fsspec import pyarrow.parquet as pq import pytest from datasets import Audio, Dataset, DatasetDict, Features, IterableDatasetDict, List, NamedSplit, Value, config +from datasets.arrow_writer import get_arrow_writer_batch_size_from_features from datasets.features.image import Image from datasets.info import DatasetInfo -from datasets.io.parquet import ParquetDatasetReader, ParquetDatasetWriter, get_writer_batch_size +from datasets.io.parquet import ParquetDatasetReader, ParquetDatasetWriter from ..utils import assert_arrow_memory_doesnt_increase, assert_arrow_memory_increases @@ -199,6 +203,57 @@ def test_parquet_write(dataset, tmp_path): assert dataset.data.table == output_table +def test_parquet_write_uses_content_defined_chunking(dataset, tmp_path): + assert config.DEFAULT_CDC_OPTIONS == { + "min_chunk_size": 256 * 1024, # 256 KiB + "max_chunk_size": 1024 * 1024, # 1 MiB + "norm_level": 0, + } + + with unittest.mock.patch("pyarrow.parquet.ParquetWriter") as MockWriter: + writer = ParquetDatasetWriter(dataset, tmp_path / "foo.parquet") + writer.write() + assert MockWriter.call_count == 1 + _, kwargs = MockWriter.call_args + # Save or check the arguments as needed + assert "use_content_defined_chunking" in kwargs + assert kwargs["use_content_defined_chunking"] == config.DEFAULT_CDC_OPTIONS + + +def test_parquet_writer_persist_cdc_options_as_metadata(dataset, tmp_path): + def write_and_get_metadata(**kwargs): + # write the dataset to parquet with the default CDC options + writer = ParquetDatasetWriter(dataset, tmp_path / "foo.parquet", **kwargs) + assert writer.write() > 0 + + # read the parquet KV metadata + metadata = pq.read_metadata(tmp_path / "foo.parquet") + key_value_metadata = metadata.metadata + + return key_value_metadata + + # by default no arguments are passed, same as passing True using the default options + for key_value_metadata in [write_and_get_metadata(), write_and_get_metadata(use_content_defined_chunking=True)]: + assert b"content_defined_chunking" in key_value_metadata + json_encoded_options = key_value_metadata[b"content_defined_chunking"].decode("utf-8") + assert json.loads(json_encoded_options) == config.DEFAULT_CDC_OPTIONS + + # passing False disables the content defined chunking and doesn't persist the options in metadata + key_value_metadata = write_and_get_metadata(use_content_defined_chunking=False) + assert b"content_defined_chunking" not in key_value_metadata + + # passing custom options, using the custom options + custom_cdc_options = { + "min_chunk_size": 128 * 1024, # 128 KiB + "max_chunk_size": 512 * 1024, # 512 KiB + "norm_level": 1, + } + key_value_metadata = write_and_get_metadata(use_content_defined_chunking=custom_cdc_options) + assert b"content_defined_chunking" in key_value_metadata + json_encoded_options = key_value_metadata[b"content_defined_chunking"].decode("utf-8") + assert json.loads(json_encoded_options) == custom_cdc_options + + def test_dataset_to_parquet_keeps_features(shared_datadir, tmp_path): image_path = str(shared_datadir / "test_image_rgb.jpg") data = {"image": [image_path]} @@ -218,12 +273,12 @@ def test_dataset_to_parquet_keeps_features(shared_datadir, tmp_path): "feature, expected", [ (Features({"foo": Value("int32")}), None), - (Features({"image": Image(), "foo": Value("int32")}), config.PARQUET_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS), - (Features({"nested": List(Audio())}), config.PARQUET_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS), + (Features({"image": Image(), "foo": Value("int32")}), config.ARROW_RECORD_BATCH_SIZE_FOR_IMAGE_DATASETS), + (Features({"nested": List(Audio())}), config.ARROW_RECORD_BATCH_SIZE_FOR_AUDIO_DATASETS), ], ) -def test_get_writer_batch_size(feature, expected): - assert get_writer_batch_size(feature) == expected +def test_get_arrow_writer_batch_size_from_features(feature, expected): + assert get_arrow_writer_batch_size_from_features(feature) == expected def test_dataset_to_parquet_fsspec(dataset, mockfs): diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py index a98b5a9c42f..39afcf931db 100644 --- a/tests/test_arrow_writer.py +++ b/tests/test_arrow_writer.py @@ -1,4 +1,5 @@ import copy +import json import os import tempfile from unittest import TestCase @@ -9,6 +10,7 @@ import pyarrow.parquet as pq import pytest +from datasets import config from datasets.arrow_writer import ArrowWriter, OptimizedTypedSequence, ParquetWriter, TypedSequence from datasets.features import Array2D, ClassLabel, Features, Image, Value from datasets.features.features import Array2DExtensionType, cast_to_python_objects @@ -334,6 +336,80 @@ def test_parquet_writer_write(): assert pa_table.to_pydict() == {"col_1": ["foo", "bar"], "col_2": [1, 2]} +def test_parquet_writer_uses_content_defined_chunking(): + def write_and_get_argument_and_metadata(**kwargs): + output = pa.BufferOutputStream() + with patch("pyarrow.parquet.ParquetWriter", wraps=pq.ParquetWriter) as MockWriter: + with ParquetWriter(stream=output, **kwargs) as writer: + writer.write({"col_1": "foo", "col_2": 1}) + writer.write({"col_1": "bar", "col_2": 2}) + writer.finalize() + assert MockWriter.call_count == 1 + _, kwargs = MockWriter.call_args + assert "use_content_defined_chunking" in kwargs + + # read metadata from the output stream + with pa.input_stream(output.getvalue()) as stream: + metadata = pq.read_metadata(stream) + key_value_metadata = metadata.metadata + + return kwargs["use_content_defined_chunking"], key_value_metadata + + # not passing the use_content_defined_chunking argument, using the default + passed_arg, key_value_metadata = write_and_get_argument_and_metadata() + assert passed_arg == config.DEFAULT_CDC_OPTIONS + assert b"content_defined_chunking" in key_value_metadata + json_encoded_options = key_value_metadata[b"content_defined_chunking"].decode("utf-8") + assert json.loads(json_encoded_options) == config.DEFAULT_CDC_OPTIONS + + # passing True, using the default options + passed_arg, key_value_metadata = write_and_get_argument_and_metadata(use_content_defined_chunking=True) + assert passed_arg == config.DEFAULT_CDC_OPTIONS + assert b"content_defined_chunking" in key_value_metadata + json_encoded_options = key_value_metadata[b"content_defined_chunking"].decode("utf-8") + assert json.loads(json_encoded_options) == config.DEFAULT_CDC_OPTIONS + + # passing False, not using content defined chunking + passed_arg, key_value_metadata = write_and_get_argument_and_metadata(use_content_defined_chunking=False) + assert passed_arg is False + assert b"content_defined_chunking" not in key_value_metadata + + # passing custom options, using the custom options + custom_cdc_options = { + "min_chunk_size": 128 * 1024, # 128 KiB + "max_chunk_size": 512 * 1024, # 512 KiB + "norm_level": 1, + } + passed_arg, key_value_metadata = write_and_get_argument_and_metadata( + use_content_defined_chunking=custom_cdc_options + ) + assert passed_arg == custom_cdc_options + assert b"content_defined_chunking" in key_value_metadata + json_encoded_options = key_value_metadata[b"content_defined_chunking"].decode("utf-8") + assert json.loads(json_encoded_options) == custom_cdc_options + + # passing None or wrong options raise by pyarrow + with pytest.raises(TypeError): + write_and_get_argument_and_metadata(use_content_defined_chunking=None) + with pytest.raises(TypeError): + write_and_get_argument_and_metadata(use_content_defined_chunking="invalid_options") + with pytest.raises(ValueError): + write_and_get_argument_and_metadata(use_content_defined_chunking={"invalid_option": 1}) + + +def test_parquet_writer_writes_page_index(): + output = pa.BufferOutputStream() + with patch("pyarrow.parquet.ParquetWriter", wraps=pq.ParquetWriter) as MockWriter: + with ParquetWriter(stream=output) as writer: + writer.write({"col_1": "foo", "col_2": 1}) + writer.write({"col_1": "bar", "col_2": 2}) + writer.finalize() + assert MockWriter.call_count == 1 + _, kwargs = MockWriter.call_args + assert "write_page_index" in kwargs + assert kwargs["write_page_index"] + + @require_pil @pytest.mark.parametrize("embed_local_files", [False, True]) def test_writer_embed_local_files(tmp_path, embed_local_files):