Skip to content

Commit f642628

Browse files
Removal logic for fuzzy / exact (no class abstraction) (#509)
1 parent b318d61 commit f642628

File tree

10 files changed

+496
-77
lines changed

10 files changed

+496
-77
lines changed

docs/user-guide/gpudeduplication.rst

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,19 @@ After ensuring your dataset has a unique ID field (or creating one with the code
6363
from nemo_curator.datasets import DocumentDataset
6464
6565
# Initialize the deduplication object
66-
ExactDups = ExactDuplicates(id_field="my_id", text_field="text")
66+
exact_duplicates = ExactDuplicates(
67+
id_field="my_id",
68+
text_field="text",
69+
perform_removal=True,
70+
cache_dir="/path/to/dedup_outputs", # Recommended to specify a cache_dir if perform_removal=True
71+
)
6772
6873
dataset = DocumentDataset.read_parquet(
6974
input_files="/path/to/parquet/data",
7075
backend="cudf", # or "pandas" for CPU
7176
)
72-
73-
duplicate_docs = ExactDups(dataset)
77+
# Users who have specified perform_removal=False can split as following
78+
duplicate_docs = exact_duplicates.identify_duplicates(dataset)
7479
7580
"""
7681
Sample output:
@@ -82,9 +87,14 @@ After ensuring your dataset has a unique ID field (or creating one with the code
8287
107 doc_prefix-52271 0f763a2937d57b9d96bf9f220e55f2bd
8388
"""
8489
90+
deduplicated_dataset = exact_duplicates.remove(dataset, duplicate_docs)
91+
92+
# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
93+
# deduplicated_dataset = exact_duplicates(dataset)
94+
95+
8596
.. tip::
86-
A more comprehensive example, including how to remove documents from a corpus using the list of
87-
duplicate IDs generated from the exact deduplication step above, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.
97+
A more comprehensive example, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.
8898

8999
""""""""""""
90100
CLI Utility
@@ -187,6 +197,7 @@ Python API
187197
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
188198
id_field="my_id",
189199
text_field="text",
200+
perform_removal=False, # dictates if deduplicated dataset or IDs of duplicates are returned
190201
seed=42,
191202
char_ngrams=24,
192203
num_buckets=20,
@@ -203,6 +214,7 @@ Python API
203214
cache_dir: /path/to/dedup_outputs
204215
id_field: my_id
205216
text_field: text
217+
perform_removal: False
206218
seed: 42
207219
char_ngrams: 24
208220
num_buckets: 20
@@ -226,14 +238,15 @@ Python API
226238
from nemo_curator.datasets import DocumentDataset
227239
228240
# Initialize the deduplication object
229-
FuzzyDups = FuzzyDuplicates(config=config, logger="./")
241+
fuzzy_duplicates = FuzzyDuplicates(config=config, logger="./")
230242
231243
dataset = DocumentDataset.read_json(
232244
input_files="/path/to/jsonl/data",
233245
backend="cudf", # FuzzyDuplicates only supports datasets with the cuDF backend.
234246
)
235247
236-
duplicate_docs = FuzzyDups(dataset)
248+
# Users who have specified perform_removal=False can split as following
249+
duplicate_docs = fuzzy_duplicates.identify_duplicates(dataset)
237250
"""
238251
Sample output:
239252
my_id group
@@ -244,10 +257,15 @@ Python API
244257
4 doc_prefix-42050 154
245258
"""
246259
260+
deduplicated_dataset = fuzzy_duplicates.remove(dataset, duplicate_docs)
261+
262+
# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
263+
# deduplicated_dataset = fuzzy_duplicates(dataset)
264+
265+
247266
.. tip::
248267

249-
- A more comprehensive example for the above, including how to remove documents from a corpus using the list of
250-
duplicate IDs generated from fuzzy deduplication, can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
268+
- A comprehensive example can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
251269
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
252270
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
253271
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.

examples/exact_deduplication.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
from nemo_curator.datasets import DocumentDataset
1919
from nemo_curator.modules import ExactDuplicates
20-
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
21-
from nemo_curator.utils.file_utils import get_all_files_paths_under
20+
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
2221
from nemo_curator.utils.script_utils import ArgumentHelper
2322

2423

@@ -40,36 +39,33 @@ def main(args):
4039
client.run(pre_imports)
4140

4241
t0 = time.time()
43-
input_dataset = DocumentDataset.read_json(dataset_dir, backend=backend)
42+
input_dataset = DocumentDataset.read_json(
43+
dataset_dir, backend=backend, blocksize="1GiB", files_per_partition=None
44+
)
4445

4546
exact_dup = ExactDuplicates(
4647
logger=log_dir,
4748
id_field=dataset_id_field,
4849
text_field=dataset_text_field,
50+
# Decides whether output of the module is deduplicated dataset or duplicates
51+
# If true, you should set cache_dir for performance improvement
52+
perform_removal=False,
4953
# cache_dir=output_dir # Optionally write the output to disk
5054
)
5155

52-
duplicates = exact_dup(dataset=input_dataset)
56+
# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
57+
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
58+
# It will behave by calling .identify_duplicates() and .remove() in sequence.
59+
duplicates = exact_dup(
60+
dataset=input_dataset
61+
) # or exact_dup.identify_duplicates(input_dataset)
5362

5463
# If caching, result is a path to the output dataset.
5564
if isinstance(duplicates, str):
5665
duplicates = DocumentDataset.read_parquet(duplicates, backend=backend)
5766

5867
# It's easy to apply dataframe operations to the dataset by using the underlying df.
59-
60-
# By default all duplicate id's are included in the result
61-
# keep 1 document from each group of duplcates and mark the others to remove
62-
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
63-
docs_to_remove = duplicates.df.map_partitions(
64-
lambda x: x[x._hashes.duplicated(keep="first")]
65-
)
66-
67-
# When there are few duplicates we can compute the results to a list and use `isin`.
68-
result = input_dataset.df[
69-
~input_dataset.df[dataset_id_field].isin(
70-
docs_to_remove[dataset_id_field].compute()
71-
)
72-
]
68+
result = exact_dup.remove(input_dataset, duplicates)
7369
write_to_disk(result, output_dir, output_type="parquet")
7470
print(time.time() - t0)
7571

examples/fuzzy_deduplication.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ def main(args):
6868
cache_dir=cache_dir,
6969
id_field=dataset_id_field,
7070
text_field=dataset_text_field,
71+
# Decides whether output of the module is a deduplicated dataset or the IDs of the duplicates
72+
perform_removal=False,
7173
seed=42,
7274
char_ngrams=24,
7375
num_buckets=20,
@@ -77,26 +79,20 @@ def main(args):
7779
false_positive_check=False,
7880
)
7981
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
80-
duplicates = fuzzy_dup(dataset=input_dataset)
82+
83+
# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
84+
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
85+
# It will behave by calling .identify_duplicates() and .remove() in sequence.
86+
duplicates = fuzzy_dup(
87+
dataset=input_dataset
88+
) # or fuzzy_dup.identify_duplicates(input_dataset)
8189

8290
if duplicates is None:
8391
print("No duplicates found")
8492
print(f"Time taken:{time.time() - t0}s")
8593
return
8694

87-
# By default all duplicate id's and the group they belong to are included in the result
88-
# keep 1 document from each group of duplcates and mark the others to remove
89-
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
90-
docs_to_remove = duplicates.df.map_partitions(
91-
lambda x: x[x.group.duplicated(keep="first")]
92-
)
93-
94-
# When there are few duplicates we can compute the results to a list and use `isin`.
95-
result = input_dataset.df[
96-
~input_dataset.df[dataset_id_field].isin(
97-
docs_to_remove[dataset_id_field].compute()
98-
)
99-
]
95+
result = fuzzy_dup.remove(input_dataset, duplicates)
10096
write_to_disk(result, output_dir, output_type=filetype)
10197
print(f"Time taken:{time.time() - t0}s")
10298

nemo_curator/modules/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class FuzzyDuplicatesConfig(BaseConfig):
4444
but might lead to memory pressures and related errors.
4545
id_field: Column in the Dataset denoting document ID.
4646
text_field: Column in the Dataset denoting document content.
47+
perform_removal: Boolean value to specify whether calling the module should remove the duplicates from
48+
the original dataset, or return the list of IDs denoting duplicates.
4749
profile_dir: str, Default None
4850
If specified directory to write dask profile
4951
cache_dir: str, Default None
@@ -64,6 +66,7 @@ class FuzzyDuplicatesConfig(BaseConfig):
6466
profile_dir: Optional[str] = None
6567
id_field: str = "id"
6668
text_field: str = "text"
69+
perform_removal: bool = False
6770

6871
# Minhash + LSH Config
6972
seed: int = 42
@@ -131,6 +134,11 @@ def __post_init__(self):
131134
if not 1 <= self.buckets_per_shuffle <= self.num_buckets:
132135
raise ValueError("Buckets per shuffle must be between [1, num_buckets]")
133136

137+
if not self.perform_removal:
138+
warnings.warn(
139+
"In future releases (starting with 0.8.0) the default will be True."
140+
)
141+
134142

135143
@dataclass
136144
class SemDedupConfig(BaseConfig):

nemo_curator/modules/exact_dedup.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import time
1919
import warnings
2020
from contextlib import nullcontext
21-
from datetime import datetime
2221
from hashlib import md5
2322
from typing import Optional, Union
2423

@@ -31,6 +30,7 @@
3130
from nemo_curator.log import create_logger
3231
from nemo_curator.modules.base import BaseModule
3332
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
33+
from nemo_curator.utils.duplicates_removal import remove_duplicates
3434
from nemo_curator.utils.gpu_utils import is_cudf_type
3535

3636

@@ -45,6 +45,7 @@ def __init__(
4545
id_field: str = "id",
4646
text_field: str = "text",
4747
hash_method: str = "md5",
48+
perform_removal: bool = False,
4849
profile_dir: Optional[str] = None,
4950
cache_dir: Optional[str] = None,
5051
):
@@ -66,9 +67,17 @@ def __init__(
6667
raise ValueError(
6768
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}"
6869
)
70+
6971
self.hash_method = hash_method
7072
self.id_field = id_field
7173
self.text_field = text_field
74+
self.perform_removal = perform_removal
75+
if not self.perform_removal:
76+
warnings.warn(
77+
"In future releases (starting with 0.8.0) the default will be True."
78+
)
79+
if self.perform_removal and cache_dir is None:
80+
warnings.warn("cache_dir is recommended to remove duplicates.")
7281
if cache_dir is None and profile_dir is not None:
7382
warnings.warn(
7483
"cache_dir for intermediate outputs is required to generate profiles"
@@ -137,7 +146,7 @@ def hash_documents(
137146
# TODO: Generalize ty using self.hash_method
138147
return df.apply(lambda x: md5(x.encode()).hexdigest())
139148

140-
def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
149+
def identify_duplicates(self, dataset: DocumentDataset) -> DocumentDataset:
141150
"""
142151
Find document ID's for exact duplicates in a given DocumentDataset
143152
Parameters
@@ -168,10 +177,38 @@ def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
168177
self._logger.info(
169178
f"Time taken for Exact Dedup Computation = {time.time() - t0}s and output written at {write_path}"
170179
)
171-
if is_cudf_type(result):
172-
import dask_cudf
180+
backend = "cudf" if is_cudf_type(result) else "pandas"
181+
return DocumentDataset.read_parquet(
182+
write_path,
183+
backend=backend,
184+
# We read with files_per_partition=1 so that groups are read in whole (and do not exist across partitions)
185+
files_per_partition=1,
186+
blocksize=None,
187+
)
173188

174-
result_dataset = dask_cudf.read_parquet(write_path, split_row_groups=False)
175-
else:
176-
result_dataset = dd.read_parquet(write_path)
177-
return DocumentDataset(result_dataset)
189+
def remove(
190+
self, dataset: DocumentDataset, duplicates_to_remove: Optional[DocumentDataset]
191+
) -> DocumentDataset:
192+
"""
193+
Remove exact duplicates from a given DocumentDataset
194+
Parameters
195+
----------
196+
dataset: DocumentDataset
197+
The input datset to remove exact duplicates
198+
Returns
199+
-------
200+
DocumentDataset containing only non-duplicate documents
201+
"""
202+
result = remove_duplicates(
203+
left=dataset.df,
204+
duplicates=duplicates_to_remove.df,
205+
id_field=self.id_field,
206+
group_field="_hashes",
207+
)
208+
return DocumentDataset(result)
209+
210+
def call(self, dataset: DocumentDataset) -> DocumentDataset:
211+
duplicates = self.identify_duplicates(dataset)
212+
if self.perform_removal:
213+
return self.remove(dataset, duplicates)
214+
return duplicates

0 commit comments

Comments
 (0)