diff --git a/clean.py b/clean.py index f2be952..c7b99cc 100644 --- a/clean.py +++ b/clean.py @@ -10,9 +10,10 @@ from numpy.random import default_rng from clean_helpers import build_small_docs_filter, filter_wiki_non_text_type, filter_wiki_user_titles, \ - replace_newline_with_space, build_dedup_template, dedup_document, build_line_with_substring_remover, \ - en_wiktionary_stripper, build_small_docs_bytes_filter, dedup_document_on_url, filter_remove_empty_docs,\ - build_reference_remover + replace_newline_with_space, build_dedup_template, build_line_with_substring_remover, \ + en_wiktionary_stripper, build_small_docs_bytes_filter, filter_remove_empty_docs,\ + build_reference_remover, build_dedup_document +from clean_helpers.deduplication import split_text_with_new_lines, compute_text_hash, compute_url_hash from clean_helpers.stopwords import stopwords set_verbosity_info() @@ -47,13 +48,19 @@ "dedup_template_soft": build_dedup_template( min_template_line_size=15, min_template_line_occurence=5, + split_text=split_text_with_new_lines ), "dedup_pseudocrawl_newspapers": build_dedup_template( min_template_line_size=0, min_template_line_occurence=2, + split_text=split_text_with_new_lines ), - "dedup_document": dedup_document, - "dedup_document_on_url": dedup_document_on_url + "dedup_document": build_dedup_document( + compute_hash=compute_text_hash + ), + "dedup_document_on_url": build_dedup_document( + compute_hash=compute_url_hash + ) } MAPS_KEYS = set(MAPS.keys()) diff --git a/clean_helpers/__init__.py b/clean_helpers/__init__.py index aa4183e..cc4aca6 100644 --- a/clean_helpers/__init__.py +++ b/clean_helpers/__init__.py @@ -4,4 +4,4 @@ from .map_strip_substring import en_wiktionary_stripper from .map_remove_references import build_reference_remover from .clean_lines import build_line_with_substring_remover -from .deduplication import build_dedup_template, dedup_document, dedup_document_on_url \ No newline at end of file +from .deduplication import build_dedup_template, build_dedup_document diff --git a/clean_helpers/deduplication.py b/clean_helpers/deduplication.py index d7eadfc..e4a9ada 100644 --- a/clean_helpers/deduplication.py +++ b/clean_helpers/deduplication.py @@ -1,6 +1,6 @@ from collections import defaultdict from functools import partial -from typing import List, Set, Tuple, Dict +from typing import List, Set, Tuple, Dict, Callable import hashlib import re import string @@ -12,12 +12,16 @@ from clean_helpers.utils import parse_meta -def build_dedup_template(min_template_line_size: int, min_template_line_occurence: int): +def build_dedup_template( + min_template_line_size: int, + min_template_line_occurence: int, + split_text: Callable[[str], List[str]] +): def dedup_template(ds: Dataset, num_proc: int, batch_size: int) -> Dataset: """Computes and remove templates lines""" # Compute the hash of each lines split_into_lines_and_hashes = ds.map( - split_text_to_lines_and_hash, + split_text, num_proc=num_proc, batched=True, batch_size=batch_size, @@ -53,68 +57,40 @@ def dedup_template(ds: Dataset, num_proc: int, batch_size: int) -> Dataset: return dedup_template +def build_dedup_document( + compute_hash: Callable[[Dict], List[str]], +): + def dedup_document( + ds: Dataset, + num_proc: int, + batch_size: int + ) -> Dataset: + hashed_documents = ds.map( + lambda batch: {**batch, "hash": compute_hash(batch)}, + num_proc=num_proc, + batched=True, + batch_size=batch_size, + ) -def dedup_document(ds: Dataset, num_proc: int, batch_size: int) -> Dataset: - hashed_documents = ds.map( - lambda batch: {**batch, "hash": get_hash_stripped(batch["text"])}, - num_proc=num_proc, - batched=True, - batch_size=batch_size, - ) - - hashes = set() - - return hashed_documents.map( - partial(delete_text_from_duplicates, hashes=hashes), - num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. - batched=True, - batch_size=batch_size, - remove_columns=hashed_documents.column_names - ) - - # # TODO: This version is memory efficient and runs faster, but we lose alignment - # return hashed_documents.filter( - # lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_], - # num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. - # input_columns=["hash"], - # batched=True, - # batch_size=batch_size, - # ).remove_columns("hash") + hashes = set() -url_regex = re.compile(r"^(.[^?]*)") -def dedup_document_on_url(ds: Dataset, num_proc: int, batch_size: int) -> Dataset: - """Deduplication on meta['url']""" - hashed_documents = ds.map( - lambda batch: { - **batch, - "hash": get_hash([ - url_regex.match(parse_meta(meta)["url"]).group(1) - for meta in batch["meta"] - ])} - , - num_proc=num_proc, - batched=True, - batch_size=batch_size, - ) - - hashes = set() - - return hashed_documents.map( - partial(delete_text_from_duplicates, hashes=hashes), - num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. - batched=True, - batch_size=batch_size, - remove_columns=hashed_documents.column_names - ) - - # return hashed_documents.filter( - # lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_], - # num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. - # input_columns=["hash"], - # batched=True, - # batch_size=batch_size, - # ).remove_columns("hash") + return hashed_documents.map( + partial(delete_text_from_duplicates, hashes=hashes), + num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. + batched=True, + batch_size=batch_size, + remove_columns=hashed_documents.column_names + ) + # # TODO: This version is memory efficient and runs faster, but we lose alignment + # return hashed_documents.filter( + # lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_], + # num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe. + # input_columns=["hash"], + # batched=True, + # batch_size=batch_size, + # ).remove_columns("hash") + return dedup_document # =========== HELPERS =============== @@ -131,11 +107,7 @@ def get_hash(texts: List[str]) -> List[str]: return [hashlib.md5(text.strip().encode("utf-8")).hexdigest() for text in texts] -def split_text_in_lines(text: str) -> List[str]: - return [line.strip() for line in text.split("\n")] - - -def split_text_to_lines_and_hash(batch: Dict[str, List]): +def split_text_and_hash(batch: Dict[str, List], split_text_in_lines: Callable[[str], List[str]]): lines_per_texts = [split_text_in_lines(text) for text in batch["text"]] return { **{k: v for k, v in batch.items() if k != "text"}, @@ -181,4 +153,20 @@ def delete_text_from_duplicates(batch: Dict[str, List], hashes: Set[str]) -> Dic return { **{k: v for k, v in batch.items() if k != "hash"}, "text": [text if is_new_hash(hash_, hashes) else "" for text, hash_ in zip(batch["text"], batch["hash"])] - } \ No newline at end of file + } + + +def compute_text_hash(batch: Dict) -> List[str]: + return get_hash_stripped(batch["text"]) + + +url_regex = re.compile(r"^(.[^?]*)") +def compute_url_hash(batch: Dict) -> List[str]: + return get_hash([ + url_regex.match(parse_meta(meta)["url"]).group(1) + for meta in batch["meta"] + ]) + + +def split_text_with_new_lines(text: str) -> List[str]: + return [line.strip() for line in text.split("\n")]