Skip to content

Commit e820b8b

Browse files
Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* (#285)
* fc Signed-off-by: Praateek <praateekm@gmail.com> * review comments Signed-off-by: Praateek <praateekm@gmail.com> * make blocksize work with parquet Signed-off-by: Praateek <praateekm@gmail.com> * filetype Signed-off-by: Praateek <praateekm@gmail.com> * fix merge Signed-off-by: Praateek <praateekm@gmail.com> * add test cases Signed-off-by: Praateek <praateekm@gmail.com> * add test file Signed-off-by: Praateek <praateekm@gmail.com> * failing test for select_columns Signed-off-by: Praateek <praateekm@gmail.com> * rename func name Signed-off-by: Praateek <praateekm@gmail.com> * add test case for different columns Signed-off-by: Praateek <praateekm@gmail.com> * improve test for different_cols Signed-off-by: Praateek <praateekm@gmail.com> * .. Signed-off-by: Praateek <praateekm@gmail.com> * review comments + add warnings for inconsistent schemas Signed-off-by: Praateek <praateekm@gmail.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * fix tests Signed-off-by: Praateek <praateekm@gmail.com> --------- Signed-off-by: Praateek <praateekm@gmail.com> Signed-off-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com>
1 parent c54826a commit e820b8b

File tree

7 files changed

+814
-56
lines changed

7 files changed

+814
-56
lines changed

docs/user-guide/bestpractices.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ Handling GPU Out-of-Memory (OOM) Errors
2424
NeMo Curator is designed to be scalable with large amounts of text data, but OOM errors occur when the available GPU memory is insufficient for a given task.
2525
To help avoid these issues and ensure efficient processing, here are some strategies for managing memory usage and mitigating OOM challenges.
2626

27+
Controlling Partition Sizes
28+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
29+
30+
The user should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks.
31+
32+
#. The ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. However, for `parquet` files, it is currently only available when ``add_filename=False``.
33+
34+
#. For the ``blocksize`` parameter, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set ``blocksize="1GB"``.
35+
36+
2737
Utilize RMM Options
2838
~~~~~~~~~~~~~~~~~~~
2939
`RAPIDS Memory Manager (RMM) <https://github.com/rapidsai/rmm>`_ is a package that enables you to allocate device memory in a highly configurable way.
@@ -59,6 +69,7 @@ Alternatively, you can set these flags while initializing your own Dask client,
5969
6070
client = Client(cluster)
6171
72+
6273
Fuzzy Deduplication Guidelines
6374
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
6475
Fuzzy deduplication is one of the most computationally expensive algorithms within the NeMo Curator pipeline.

nemo_curator/_compat.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@
2323
# When mocking with autodoc the dask version is not there
2424
_dask_version = parse_version("2024.06.0")
2525

26+
27+
try:
28+
import dask_cudf
29+
30+
_dask_cudf_version = parse_version(dask_cudf.__version__)
31+
except (ImportError, TypeError):
32+
# When mocking with autodoc the dask version is not there
33+
_dask_cudf_version = parse_version("2024.06.0")
34+
2635
try:
2736
import cudf
2837

@@ -40,6 +49,7 @@
4049
DASK_SHUFFLE_METHOD_ARG = _dask_version > parse_version("2024.1.0")
4150
DASK_P2P_ERROR = _dask_version < parse_version("2023.10.0")
4251
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parse_version("2023.12.0")
52+
DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA = _dask_version > parse_version("2024.12")
4353

4454
# Query-planning check (and cache)
4555
_DASK_QUERY_PLANNING_ENABLED = None

nemo_curator/datasets/doc_dataset.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def read_json(
5050
cls,
5151
input_files: Union[str, List[str]],
5252
backend: Literal["pandas", "cudf"] = "pandas",
53-
files_per_partition: int = 1,
53+
files_per_partition: Optional[int] = None,
54+
blocksize: Optional[str] = "1gb",
5455
add_filename: bool = False,
5556
input_meta: Union[str, dict] = None,
5657
columns: Optional[List[str]] = None,
@@ -74,8 +75,9 @@ def read_json(
7475
input_files=input_files,
7576
file_type="jsonl",
7677
backend=backend,
77-
files_per_partition=files_per_partition,
7878
add_filename=add_filename,
79+
files_per_partition=files_per_partition,
80+
blocksize=blocksize,
7981
input_meta=input_meta,
8082
columns=columns,
8183
**kwargs,
@@ -87,8 +89,9 @@ def read_parquet(
8789
cls,
8890
input_files: Union[str, List[str]],
8991
backend: Literal["pandas", "cudf"] = "pandas",
90-
files_per_partition: int = 1,
91-
add_filename: bool = False,
92+
files_per_partition: Optional[int] = None,
93+
blocksize: Optional[str] = "1gb",
94+
add_filename=False,
9295
columns: Optional[List[str]] = None,
9396
**kwargs,
9497
) -> "DocumentDataset":
@@ -109,8 +112,9 @@ def read_parquet(
109112
input_files=input_files,
110113
file_type="parquet",
111114
backend=backend,
112-
files_per_partition=files_per_partition,
113115
add_filename=add_filename,
116+
files_per_partition=files_per_partition,
117+
blocksize=blocksize,
114118
columns=columns,
115119
**kwargs,
116120
)
@@ -121,8 +125,6 @@ def read_pickle(
121125
cls,
122126
input_files: Union[str, List[str]],
123127
backend: Literal["pandas", "cudf"] = "pandas",
124-
files_per_partition: int = 1,
125-
add_filename: bool = False,
126128
columns: Optional[List[str]] = None,
127129
**kwargs,
128130
) -> "DocumentDataset":
@@ -142,8 +144,6 @@ def read_pickle(
142144
input_files=input_files,
143145
file_type="pickle",
144146
backend=backend,
145-
files_per_partition=files_per_partition,
146-
add_filename=add_filename,
147147
columns=columns,
148148
**kwargs,
149149
)
@@ -234,8 +234,9 @@ def _read_json_or_parquet(
234234
input_files: Union[str, List[str]],
235235
file_type: str,
236236
backend: Literal["cudf", "pandas"],
237-
files_per_partition: int,
238237
add_filename: bool,
238+
files_per_partition: Optional[int] = None,
239+
blocksize: Optional[str] = None,
239240
input_meta: Union[str, dict] = None,
240241
columns: Optional[List[str]] = None,
241242
**kwargs,
@@ -267,6 +268,7 @@ def _read_json_or_parquet(
267268
file_type=file_type,
268269
backend=backend,
269270
files_per_partition=files_per_partition,
271+
blocksize=blocksize,
270272
add_filename=add_filename,
271273
input_meta=input_meta,
272274
columns=columns,
@@ -286,6 +288,7 @@ def _read_json_or_parquet(
286288
file_type=file_type,
287289
backend=backend,
288290
files_per_partition=files_per_partition,
291+
blocksize=blocksize,
289292
add_filename=add_filename,
290293
input_meta=input_meta,
291294
columns=columns,
@@ -311,6 +314,7 @@ def _read_json_or_parquet(
311314
file_type=file_type,
312315
backend=backend,
313316
files_per_partition=files_per_partition,
317+
blocksize=blocksize,
314318
add_filename=add_filename,
315319
input_meta=input_meta,
316320
columns=columns,

0 commit comments

Comments
 (0)