Skip to content

Commit 9330cac

Browse files
committed
chore: rename cdc_options argument to use_content_defined_chunking
1 parent 777c14d commit 9330cac

File tree

4 files changed

+30
-14
lines changed

4 files changed

+30
-14
lines changed

src/datasets/arrow_writer.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -678,11 +678,17 @@ def finalize(self, close_stream=True):
678678

679679

680680
class ParquetWriter(ArrowWriter):
681-
def __init__(self, *args, cdc_options=None, **kwargs):
681+
def __init__(self, *args, use_content_defined_chunking=None, **kwargs):
682682
super().__init__(*args, **kwargs)
683-
self.cdc_options = config.DEFAULT_CDC_OPTIONS if cdc_options is None else cdc_options
683+
self.use_content_defined_chunking = (
684+
config.DEFAULT_CDC_OPTIONS if use_content_defined_chunking is None else use_content_defined_chunking
685+
)
684686

685687
def _build_writer(self, inferred_schema: pa.Schema):
686688
self._schema, self._features = self._build_schema(inferred_schema)
687-
self.pa_writer = pq.ParquetWriter(self.stream, self._schema, use_content_defined_chunking=self.cdc_options)
688-
self.pa_writer.add_key_value_metadata({"content_defined_chunking": json.dumps(self.cdc_options)})
689+
self.pa_writer = pq.ParquetWriter(
690+
self.stream, self._schema, use_content_defined_chunking=self.use_content_defined_chunking
691+
)
692+
self.pa_writer.add_key_value_metadata(
693+
{"content_defined_chunking": json.dumps(self.use_content_defined_chunking)}
694+
)

src/datasets/io/parquet.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,33 +77,43 @@ def __init__(
7777
dataset: Dataset,
7878
path_or_buf: Union[PathLike, BinaryIO],
7979
batch_size: Optional[int] = None,
80-
cdc_options: Optional[dict] = None,
8180
storage_options: Optional[dict] = None,
81+
use_content_defined_chunking: Optional[dict] = None,
8282
**parquet_writer_kwargs,
8383
):
8484
self.dataset = dataset
8585
self.path_or_buf = path_or_buf
86-
self.cdc_options = cdc_options
8786
self.batch_size = batch_size or get_writer_batch_size(dataset.features)
8887
self.storage_options = storage_options or {}
8988
self.parquet_writer_kwargs = parquet_writer_kwargs
89+
self.use_content_defined_chunking = use_content_defined_chunking
9090

9191
def write(self) -> int:
9292
batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE
93-
cdc_options = self.cdc_options if self.cdc_options else config.DEFAULT_CDC_OPTIONS
93+
use_content_defined_chunking = (
94+
self.use_content_defined_chunking if self.use_content_defined_chunking else config.DEFAULT_CDC_OPTIONS
95+
)
9496

9597
if isinstance(self.path_or_buf, (str, bytes, os.PathLike)):
9698
with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer:
9799
written = self._write(
98-
file_obj=buffer, batch_size=batch_size, cdc_options=cdc_options, **self.parquet_writer_kwargs
100+
file_obj=buffer,
101+
batch_size=batch_size,
102+
use_content_defined_chunking=use_content_defined_chunking,
103+
**self.parquet_writer_kwargs,
99104
)
100105
else:
101106
written = self._write(
102-
file_obj=self.path_or_buf, batch_size=batch_size, cdc_options=cdc_options, **self.parquet_writer_kwargs
107+
file_obj=self.path_or_buf,
108+
batch_size=batch_size,
109+
use_content_defined_chunking=use_content_defined_chunking,
110+
**self.parquet_writer_kwargs,
103111
)
104112
return written
105113

106-
def _write(self, file_obj: BinaryIO, batch_size: int, cdc_options: dict, **parquet_writer_kwargs) -> int:
114+
def _write(
115+
self, file_obj: BinaryIO, batch_size: int, use_content_defined_chunking: bool | dict, **parquet_writer_kwargs
116+
) -> int:
107117
"""Writes the pyarrow table as Parquet to a binary file handle.
108118
109119
Caller is responsible for opening and closing the handle.
@@ -113,7 +123,7 @@ def _write(self, file_obj: BinaryIO, batch_size: int, cdc_options: dict, **parqu
113123
schema = self.dataset.features.arrow_schema
114124

115125
writer = pq.ParquetWriter(
116-
file_obj, schema=schema, use_content_defined_chunking=cdc_options, **parquet_writer_kwargs
126+
file_obj, schema=schema, use_content_defined_chunking=use_content_defined_chunking, **parquet_writer_kwargs
117127
)
118128

119129
for offset in hf_tqdm(
@@ -130,6 +140,6 @@ def _write(self, file_obj: BinaryIO, batch_size: int, cdc_options: dict, **parqu
130140
written += batch.nbytes
131141

132142
# TODO(kszucs): we may want to persist multiple parameters
133-
writer.add_key_value_metadata({"content_defined_chunking": json.dumps(cdc_options)})
143+
writer.add_key_value_metadata({"content_defined_chunking": json.dumps(use_content_defined_chunking)})
134144
writer.close()
135145
return written

tests/io/test_parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def test_parquet_write_uses_content_defined_chunking(dataset, tmp_path):
231231
)
232232
def test_parquet_writer_persist_cdc_options_as_metadata(dataset, tmp_path, cdc_options, expected_options):
233233
# write the dataset to parquet with the default CDC options
234-
writer = ParquetDatasetWriter(dataset, tmp_path / "foo.parquet", cdc_options=cdc_options)
234+
writer = ParquetDatasetWriter(dataset, tmp_path / "foo.parquet", use_content_defined_chunking=cdc_options)
235235
assert writer.write() > 0
236236

237237
# read the parquet KV metadata

tests/test_arrow_writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def test_parquet_writer_write():
349349
def test_parquet_write_uses_content_defined_chunking(cdc_options, expected_options):
350350
output = pa.BufferOutputStream()
351351
with patch("pyarrow.parquet.ParquetWriter", wraps=pq.ParquetWriter) as MockWriter:
352-
with ParquetWriter(stream=output, cdc_options=cdc_options) as writer:
352+
with ParquetWriter(stream=output, use_content_defined_chunking=cdc_options) as writer:
353353
writer.write({"col_1": "foo", "col_2": 1})
354354
writer.write({"col_1": "bar", "col_2": 2})
355355
writer.finalize()

0 commit comments

Comments
 (0)