Skip to content

Commit ca30808

Browse files
authored
Add Partition On Logic (#519)
* add partition_on logic Signed-off-by: Vibhu Jawa <[email protected]> * Add Docstring based on Sarah's review Signed-off-by: Vibhu Jawa <[email protected]> * Apply Praateek's suggestion and skip test with using pytest.mark.gpu Signed-off-by: Vibhu Jawa <[email protected]> * Apply Praateek's suggestion and force index=False Signed-off-by: Vibhu Jawa <[email protected]> --------- Signed-off-by: Vibhu Jawa <[email protected]>
1 parent 97aa372 commit ca30808

File tree

3 files changed

+223
-9
lines changed

3 files changed

+223
-9
lines changed

nemo_curator/datasets/doc_dataset.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,36 @@ def to_json(
160160
output_path: str,
161161
write_to_filename: Union[bool, str] = False,
162162
keep_filename_column: bool = False,
163+
partition_on: Optional[str] = None,
163164
):
164165
"""
165-
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
166+
Writes the dataset to the specified path in JSONL format.
166167
168+
If `write_to_filename` is True, the DataFrame is expected to have a column
169+
that specifies the filename for each document. This column can be named
170+
`file_name` by default, or a custom name if `write_to_filename` is a string.
171+
172+
Args:
173+
output_path (str): The directory or file path where the dataset will be written.
174+
write_to_filename (Union[bool, str]): Determines how filenames are handled.
175+
- If True, uses the `file_name` column in the DataFrame to determine filenames.
176+
- If a string, uses that string as the column name for filenames.
177+
- If False, writes all data to the specified `output_path`.
178+
keep_filename_column (bool): If True, retains the filename column in the output.
179+
If False, the filename column is dropped from the output.
180+
partition_on (Optional[str]): The column name used to partition the data.
181+
If specified, data is partitioned based on unique values in this column,
182+
with each partition written to a separate directory.
183+
184+
For more details, refer to the `write_to_disk` function in
185+
`nemo_curator.utils.distributed_utils`.
167186
"""
168187
write_to_disk(
169188
df=self.df,
170189
output_path=output_path,
171190
write_to_filename=write_to_filename,
172191
keep_filename_column=keep_filename_column,
192+
partition_on=partition_on,
173193
output_type="jsonl",
174194
)
175195

@@ -178,16 +198,36 @@ def to_parquet(
178198
output_path: str,
179199
write_to_filename: Union[bool, str] = False,
180200
keep_filename_column: bool = False,
201+
partition_on: Optional[str] = None,
181202
):
182203
"""
183-
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
204+
Writes the dataset to the specified path in Parquet format.
184205
206+
If `write_to_filename` is True, the DataFrame is expected to have a column
207+
that specifies the filename for each document. This column can be named
208+
`file_name` by default, or a custom name if `write_to_filename` is a string.
209+
210+
Args:
211+
output_path (str): The directory or file path where the dataset will be written.
212+
write_to_filename (Union[bool, str]): Determines how filenames are handled.
213+
- If True, uses the `file_name` column in the DataFrame to determine filenames.
214+
- If a string, uses that string as the column name for filenames.
215+
- If False, writes all data to the specified `output_path`.
216+
keep_filename_column (bool): If True, retains the filename column in the output.
217+
If False, the filename column is dropped from the output.
218+
partition_on (Optional[str]): The column name used to partition the data.
219+
If specified, data is partitioned based on unique values in this column,
220+
with each partition written to a separate directory.
221+
222+
For more details, refer to the `write_to_disk` function in
223+
`nemo_curator.utils.distributed_utils`.
185224
"""
186225
write_to_disk(
187226
df=self.df,
188227
output_path=output_path,
189228
write_to_filename=write_to_filename,
190229
keep_filename_column=keep_filename_column,
230+
partition_on=partition_on,
191231
output_type="parquet",
192232
)
193233

nemo_curator/utils/distributed_utils.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,7 @@ def single_partition_write_with_filename(
748748
orient="records",
749749
lines=True,
750750
force_ascii=False,
751+
index=False, # Only index=False is supported for orient="records"
751752
)
752753
else:
753754
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
@@ -759,6 +760,7 @@ def single_partition_write_with_filename(
759760
orient="records",
760761
lines=True,
761762
force_ascii=False,
763+
index=False, # Only index=False is supported for orient="records"
762764
)
763765

764766
elif output_type == "parquet":
@@ -843,6 +845,7 @@ def write_to_disk(
843845
write_to_filename: Union[bool, str] = False,
844846
keep_filename_column: bool = False,
845847
output_type: str = "jsonl",
848+
partition_on: Optional[str] = None,
846849
):
847850
"""
848851
This function writes a Dask DataFrame to the specified file path.
@@ -857,6 +860,9 @@ def write_to_disk(
857860
If str, uses that as the filename column to write to.
858861
keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists.
859862
output_type: The type of output file to write. Can be "jsonl" or "parquet".
863+
partition_on: The column name to partition the data on.
864+
If specified, the data will be partitioned based on the unique values in this column,
865+
and each partition will be written to a separate directory
860866
"""
861867

862868
filename_col = _resolve_filename_col(write_to_filename)
@@ -879,6 +885,11 @@ def write_to_disk(
879885
f"write_using_filename is True but no {filename_col} column found in DataFrame"
880886
)
881887

888+
if partition_on is not None and write_to_filename:
889+
raise ValueError(
890+
"Cannot use both partition_on and write_to_filename parameters simultaneously. "
891+
)
892+
882893
if is_cudf_type(df):
883894
import cudf
884895

@@ -904,7 +915,12 @@ def write_to_disk(
904915
# output_path is a directory
905916
else:
906917
if output_type == "jsonl" or output_type == "parquet":
907-
_write_to_jsonl_or_parquet(df, output_path, output_type)
918+
_write_to_jsonl_or_parquet(
919+
df,
920+
output_path=output_path,
921+
output_type=output_type,
922+
partition_on=partition_on,
923+
)
908924
elif output_type == "bitext":
909925
if write_to_filename:
910926
os.makedirs(output_path, exist_ok=True)
@@ -938,16 +954,50 @@ def _write_to_jsonl_or_parquet(
938954
df,
939955
output_path: str,
940956
output_type: Literal["jsonl", "parquet"] = "jsonl",
957+
partition_on: Optional[str] = None,
941958
):
942959
if output_type == "jsonl":
943-
if is_cudf_type(df):
944-
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
945-
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
946-
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
960+
if partition_on is not None:
961+
unique_values = (
962+
df[partition_on]
963+
.unique()
964+
.to_backend(backend="pandas")
965+
.compute()
966+
.to_list()
967+
)
968+
for value in unique_values:
969+
os.makedirs(output_path, exist_ok=True)
970+
partition_output_path = os.path.join(
971+
output_path, f"{partition_on}={value}"
972+
)
973+
df[df[partition_on] == value].to_json(
974+
partition_output_path,
975+
orient="records",
976+
lines=True,
977+
force_ascii=False,
978+
index=False, # Only index=False is supported for orient="records"
979+
)
947980
else:
948-
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
981+
if is_cudf_type(df):
982+
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
983+
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
984+
df.to_json(
985+
output_path,
986+
orient="records",
987+
lines=True,
988+
force_ascii=False,
989+
index=False,
990+
) # Only index=False is supported for orient="records"
991+
else:
992+
df.to_json(
993+
output_path,
994+
orient="records",
995+
lines=True,
996+
force_ascii=False,
997+
index=False,
998+
) # Only index=False is supported for orient="records"
949999
elif output_type == "parquet":
950-
df.to_parquet(output_path, write_index=False)
1000+
df.to_parquet(output_path, write_index=False, partition_on=partition_on)
9511001
else:
9521002
raise ValueError(f"Unknown output type: {output_type}")
9531003

tests/test_io.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,127 @@ def test_write_single_jsonl_file(self, tmp_path):
293293

294294
result = DocumentDataset.read_json(output_path)
295295
assert json_df.equals(result.df.compute())
296+
297+
298+
class TestPartitionOn:
299+
def test_partition_on_and_write_to_filename_error(self, tmp_path):
300+
"""Verify that using partition_on and write_to_filename together raises an error."""
301+
df = pd.DataFrame(
302+
{
303+
"id": [1, 2, 3],
304+
"file_name": ["f1", "f1", "f1"],
305+
"category": ["A", "B", "A"],
306+
}
307+
)
308+
ddf = dd.from_pandas(df, npartitions=1)
309+
dataset = DocumentDataset(ddf)
310+
with pytest.raises(
311+
ValueError,
312+
match="Cannot use both partition_on and write_to_filename parameters simultaneously.",
313+
):
314+
dataset.to_json(
315+
output_path=str(tmp_path / "output"),
316+
write_to_filename=True, # Intentionally provided to trigger the error
317+
partition_on="category",
318+
)
319+
320+
@pytest.mark.parametrize(
321+
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
322+
)
323+
@pytest.mark.parametrize(
324+
"category_values",
325+
[
326+
["A", "B", "A", "B"],
327+
[10, 20, 10, 20],
328+
[1.0, 2.0, 1.0, 2.0],
329+
],
330+
)
331+
def test_write_to_disk_with_partition_on_jsonl(
332+
self, tmp_path, backend, category_values
333+
):
334+
"""
335+
Test writing a partitioned JSONL dataset.
336+
337+
The function is expected to create subdirectories in the output directory
338+
with names of the form 'category=<value>' for each unique partition column value.
339+
"""
340+
df = pd.DataFrame(
341+
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
342+
)
343+
ddf = dd.from_pandas(df, npartitions=2)
344+
ddf = ddf.to_backend(backend)
345+
output_dir = tmp_path / "output_jsonl"
346+
dataset = DocumentDataset(ddf)
347+
dataset.to_json(output_path=str(output_dir), partition_on="category")
348+
# Check that the output directory contains subdirectories for each partition.
349+
# Unique partition values (as strings) to be used in the directory names.
350+
unique_partitions = {str(x) for x in category_values}
351+
for part in unique_partitions:
352+
expected_dir = output_dir / f"category={part}"
353+
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"
354+
355+
# For each partition directory, load the JSONL files and verify that all records have the correct partition value.
356+
# (Here we assume the files are written with extension ".part")
357+
for part_dir in output_dir.glob("category=*"):
358+
# The partition value is taken from the directory name.
359+
partition_value = part_dir.name.split("=")[-1]
360+
jsonl_files = list(part_dir.glob("*.part"))
361+
assert (
362+
jsonl_files
363+
), f"No JSONL files found in partition directory {part_dir}"
364+
for file in jsonl_files:
365+
with open(file, "r") as f:
366+
for line in f:
367+
record = json.loads(line)
368+
if "category" in record:
369+
# Compare as strings, to work with both integer and string partition values.
370+
assert (
371+
str(record["category"]) == partition_value
372+
), f"Record partition value {record['category']} does not match directory {partition_value}"
373+
374+
@pytest.mark.parametrize(
375+
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
376+
)
377+
@pytest.mark.parametrize(
378+
"category_values",
379+
[
380+
["A", "B", "A", "B"],
381+
[10, 20, 10, 20],
382+
[1.0, 2.0, 1.0, 2.0],
383+
],
384+
)
385+
def test_write_to_disk_with_partition_on_parquet(
386+
self, tmp_path, backend, category_values
387+
):
388+
"""
389+
Test writing a partitioned Parquet dataset.
390+
391+
The test writes a DataFrame partitioned on the 'category' column and then reads it back
392+
using dd.read_parquet. The output is compared (after sorting) to the original DataFrame.
393+
"""
394+
395+
df = pd.DataFrame(
396+
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
397+
)
398+
ddf = dd.from_pandas(df, npartitions=2)
399+
ddf = ddf.to_backend(backend)
400+
output_dir = tmp_path / "output_parquet"
401+
dataset = DocumentDataset(ddf)
402+
dataset.to_parquet(output_path=str(output_dir), partition_on="category")
403+
404+
# Check that the output directory contains subdirectories for each partition.
405+
# Unique partition values (as strings) to be used in the directory names.
406+
unique_partitions = {str(x) for x in category_values}
407+
for part in unique_partitions:
408+
expected_dir = output_dir / f"category={part}"
409+
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"
410+
411+
ddf_loaded = dd.read_parquet(str(output_dir))
412+
df_loaded = ddf_loaded.compute().reset_index(drop=True)
413+
df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype)
414+
# To ensure a fair comparison, sort the dataframes by 'id' and reindex.
415+
pd.testing.assert_frame_equal(
416+
df.sort_values("id").reset_index(drop=True),
417+
df_loaded.sort_values("id").reset_index(drop=True)[df.columns],
418+
check_dtype=False,
419+
)

0 commit comments

Comments
 (0)