Skip to content

Commit 3c3cc98

Browse files
authored
Adding fuzzy and semantic dedupe (#428)
Signed-off-by: Rucha Apte <[email protected]>
1 parent 079d46f commit 3c3cc98

File tree

5 files changed

+185
-23
lines changed

5 files changed

+185
-23
lines changed

tutorials/dapt-curation/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ The tutorial follows the steps below:<br>
3737
- Heuristic-based quality filtering (Number of lines, worc count, top N-grams, etc.)
3838
- Fix unicode errors via ftfy
3939
- PII redaction
40+
- GPU accelerated fuzzy and semanctic deduplication
4041
- Step 6: Save the filtered and curated data <br>
4142
- Step 7: Blend datasets and shuffle
4243

@@ -45,8 +46,10 @@ The tutorial follows the steps below:<br>
4546

4647
After installing the NeMo Curator package, install the dependencies and run:
4748

48-
`pip install -r code/requirements.txt`
49-
50-
`python code/main.py`
49+
```bash
50+
pip install -r code/requirements.txt
51+
cd code
52+
python main.py
53+
```
5154

5255
This will download chip-design related datasets and begin the data curation pipeline.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Configuration file for semdantic dedup
2+
cache_dir: "workspace/semdedup_cache/text"
3+
num_files: 16
4+
5+
# Embeddings configuration
6+
embeddings_save_loc: "embeddings"
7+
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
8+
embedding_batch_size: 128
9+
10+
# Clustering configuration
11+
clustering_save_loc: "clustering_results"
12+
n_clusters: 20
13+
seed: 1234
14+
max_iter: 100
15+
kmeans_with_cos_dist: false
16+
17+
# Semdedup configuration
18+
which_to_keep: "hard"
19+
largest_cluster_size_to_process: 100000
20+
sim_metric: "cosine"
21+
22+
# Extract dedup configuration
23+
eps_thresholds:
24+
- 0.1
25+
- 0.01
26+
27+
# Which threshold to use for extracting deduped data
28+
eps_to_extract: 0.1

tutorials/dapt-curation/code/main.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
CodeLineCountFilter,
2828
TextLineCountFilter,
2929
clean_and_unify,
30-
dedupe,
30+
exact_dedupe,
3131
filter_code,
3232
filter_text,
33+
fuzzy_dedupe,
3334
redact_code,
35+
rm_dir,
36+
semantic_dedupe,
3437
)
3538

3639
import nemo_curator as nc
@@ -48,6 +51,7 @@
4851

4952
SCRIPT_DIR_PATH = os.path.dirname(os.path.abspath(__file__))
5053
DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data")
54+
CONFIG_DIR = os.path.join(SCRIPT_DIR_PATH, "configs")
5155

5256

5357
def download_sources(
@@ -117,7 +121,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
117121
args (Any): Command-line arguments.
118122
jsonl_dir (str): Directory path where the JSONL files are stored.
119123
"""
120-
print("Running the curation pipeline...")
121124
# Initialize the Dask cluster.
122125
client = get_client(**ArgumentHelper.parse_client_args(args))
123126

@@ -129,7 +132,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
129132
TextLineCountFilter(), text_field="file_type_count", score_type=bool
130133
),
131134
filter_text,
132-
dedupe,
135+
exact_dedupe,
133136
]
134137
)
135138

@@ -141,7 +144,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
141144
CodeLineCountFilter(), text_field="file_type_count", score_type=bool
142145
),
143146
filter_code,
144-
dedupe,
147+
exact_dedupe,
145148
redact_code,
146149
]
147150
)
@@ -167,17 +170,54 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
167170
+ orig_dataset_code.df["line_count"].astype(str)
168171
)
169172

173+
print("Executing the curation pipeline...")
170174
dataset_text = curation_steps_text(orig_dataset_text)
171-
dataset_text = dataset_text.persist()
172-
173-
print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
174-
print(f"After dataprep: {len(dataset_text.df)}")
175-
176175
dataset_code = curation_steps_code(orig_dataset_code)
177-
dataset_code = dataset_code.persist()
178176

177+
print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
178+
print(f"After dataprep for text files: {len(dataset_text.df)}")
179179
print(f"Original dataset length for code files: {len(orig_dataset_code.df)}")
180-
print(f"After dataprep: {len(dataset_code.df)}")
180+
print(f"After dataprep length for code files: {len(dataset_code.df)}")
181+
182+
if args.device == "gpu":
183+
print("Executing the semantic dedupe pipeline...")
184+
gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf"))
185+
gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf"))
186+
sem_dedupe_config_yaml_path = os.path.join(
187+
CONFIG_DIR, "text_semantic_dedupe_config.yaml"
188+
)
189+
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "semantic_dedupe", "text")
190+
rm_dir(CACHE_DIR)
191+
duplicates = semantic_dedupe(
192+
dataset=gpu_dataset_text,
193+
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
194+
cache=CACHE_DIR,
195+
)
196+
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
197+
semantic_dataset_text = DocumentDataset(
198+
gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)]
199+
)
200+
print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}")
201+
202+
print("Executing the fuzzy dedupe pipeline...")
203+
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
204+
rm_dir(CACHE_DIR)
205+
fuzzy_dataset_text = fuzzy_dedupe(
206+
dataset=semantic_dataset_text, cache=CACHE_DIR
207+
)
208+
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
209+
rm_dir(CACHE_DIR)
210+
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)
211+
212+
dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
213+
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
214+
print(f"After fuzzy dedupe for text files: {len(dataset_text.df)}")
215+
print(f"After fuzzy dedupe: {len(dataset_code.df)}")
216+
217+
final_dataset_text = dataset_text.persist()
218+
final_dataset_code = dataset_code.persist()
219+
220+
print("Writing the results to disk...")
181221

182222
# Overwrite existing files in the curated directory.
183223
out_path = os.path.join(DATA_DIR, "curated")
@@ -186,15 +226,18 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
186226
shutil.rmtree(out_path)
187227

188228
os.makedirs(out_path)
189-
dataset_text.to_json(out_path, write_to_filename=True)
190-
dataset_code.to_json(out_path, write_to_filename=True)
229+
final_dataset_text.to_json(out_path, write_to_filename=True)
230+
final_dataset_code.to_json(out_path, write_to_filename=True)
231+
232+
print("Writing results to disk completed")
191233

192234
# Split the dataset by file category and save curated files (optional - to create blended datasets)
235+
print("Split dataset by metadata")
193236
separated_data_text = separate_by_metadata(
194-
dataset_text.df, out_path, "category"
237+
final_dataset_text.df, out_path, "category"
195238
).compute()
196239
separated_data_code = separate_by_metadata(
197-
dataset_code.df, out_path, "category"
240+
final_dataset_code.df, out_path, "category"
198241
).compute()
199242

200243
client.close()
@@ -239,6 +282,7 @@ def main():
239282
# Download all the sources and get the list of text and code files.
240283
text_files, code_files = download_sources(100, 100, 100)
241284
run_curation_pipeline(args, text_files, code_files)
285+
print("Data Curation completed")
242286

243287
# blend and shuffle datasets
244288
root_path = os.path.join(DATA_DIR, "curated")
@@ -250,7 +294,9 @@ def main():
250294
]
251295
dataset_weights = [1.0, 4.0, 4.0, 1.0]
252296
target_size = 20
297+
253298
blend_and_shuffle(args, dataset_paths, dataset_weights, target_size)
299+
print("Data Blending completed")
254300

255301

256302
if __name__ == "__main__":
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
arxiv
1+
arxiv==2.1.0
22
arxiv-downloader
33
cchardet
4+
nltk==3.8.1
45
poppler-utils
56
unstructured[all-docs]==0.14.5
67
unstructured[pdf]

tutorials/dapt-curation/code/utils.py

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,23 @@
1313
# limitations under the License.
1414

1515
import json
16+
import os
1617
import re
1718

1819
import dask.dataframe as dd
1920
import pandas as pd
20-
21-
from nemo_curator import ExactDuplicates, Modify, ScoreFilter, Sequential
21+
import yaml
22+
23+
from nemo_curator import (
24+
ExactDuplicates,
25+
FuzzyDuplicates,
26+
FuzzyDuplicatesConfig,
27+
Modify,
28+
ScoreFilter,
29+
SemDedup,
30+
SemDedupConfig,
31+
Sequential,
32+
)
2233
from nemo_curator.datasets import DocumentDataset
2334
from nemo_curator.filters import (
2435
DocumentFilter,
@@ -37,7 +48,10 @@
3748
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter
3849
from nemo_curator.pii.constants import DEFAULT_LANGUAGE, DEFAULT_MAX_DOC_SIZE
3950
from nemo_curator.utils.distributed_utils import get_client
40-
from nemo_curator.utils.file_utils import get_all_files_paths_under
51+
from nemo_curator.utils.file_utils import (
52+
expand_outdir_and_mkdir,
53+
get_all_files_paths_under,
54+
)
4155

4256

4357
class QuotationUnifier(DocumentModifier):
@@ -259,7 +273,7 @@ def func2(row):
259273
return redacted_dataset
260274

261275

262-
def dedupe(dataset: DocumentDataset) -> DocumentDataset:
276+
def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
263277
"""
264278
Remove exact duplicates from the given DocumentDataset.
265279
@@ -282,6 +296,71 @@ def dedupe(dataset: DocumentDataset) -> DocumentDataset:
282296
return DocumentDataset(deduped)
283297

284298

299+
def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
300+
"""
301+
Removes near-duplicate documents and code lines
302+
303+
Args:
304+
dataset (DocumentDataset): The dataset containing documents.
305+
type (str): Document type to process.
306+
307+
Returns:
308+
DocumentDataset: The deduplicated dataset.
309+
"""
310+
fuzzy_dedup_config = FuzzyDuplicatesConfig(
311+
cache_dir=cache,
312+
id_field="id",
313+
text_field="text",
314+
seed=42,
315+
char_ngrams=20,
316+
num_buckets=20,
317+
hashes_per_bucket=13,
318+
use_64_bit_hash=False,
319+
buckets_per_shuffle=5,
320+
false_positive_check=False,
321+
num_anchors=2,
322+
jaccard_threshold=0.8,
323+
)
324+
fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)
325+
duplicates = fuzzy_dup(dataset)
326+
327+
docs_to_remove = duplicates.df.map_partitions(
328+
lambda x: x[x.group.duplicated(keep="first")]
329+
)
330+
331+
# When there are few duplicates we can compute the results to a list and use `isin`.
332+
duplicate_ids = docs_to_remove.compute().id.to_arrow().to_pylist()
333+
dataset_df = dataset.df
334+
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
335+
return DocumentDataset(deduped)
336+
337+
338+
def semantic_dedupe(
339+
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
340+
):
341+
"""
342+
Perform semantic deduplication on the given dataset.
343+
344+
Args:
345+
dataset (DocumentDataset): The dataset containing documents.
346+
type (str): Document type to process.
347+
348+
Returns:
349+
The deduplicated DocumentDataset.
350+
"""
351+
partition_lengths = dataset.df.map_partitions(len).compute()
352+
non_empty_partitions = [
353+
i for i, length in enumerate(partition_lengths) if length > 0
354+
]
355+
dataset.df = dataset.df.partitions[non_empty_partitions]
356+
357+
semdedup_config = SemDedupConfig.from_yaml(sem_dedupe_config_yaml_path)
358+
expand_outdir_and_mkdir(semdedup_config.cache_dir)
359+
semdup = SemDedup(config=semdedup_config, id_column_type="str")
360+
duplicates = semdup(dataset)
361+
return duplicates
362+
363+
285364
class TextLineCountFilter(DocumentFilter):
286365
"""
287366
Discard text files based on number of lines.
@@ -323,3 +402,8 @@ def score_document(self, text: str) -> bool:
323402

324403
def keep_document(self, score) -> bool:
325404
return score
405+
406+
407+
def rm_dir(cache_dir):
408+
if os.path.isdir(cache_dir):
409+
os.system(f"rm -rf {cache_dir}")

0 commit comments

Comments
 (0)