Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from numpy.random import default_rng

from clean_helpers import build_small_docs_filter, filter_wiki_non_text_type, filter_wiki_user_titles, \
replace_newline_with_space, build_dedup_template, dedup_document, build_line_with_substring_remover, \
en_wiktionary_stripper, build_small_docs_bytes_filter, dedup_document_on_url, filter_remove_empty_docs,\
build_reference_remover
replace_newline_with_space, build_dedup_template, build_line_with_substring_remover, \
en_wiktionary_stripper, build_small_docs_bytes_filter, filter_remove_empty_docs,\
build_reference_remover, build_dedup_document
from clean_helpers.deduplication import split_text_with_new_lines, compute_text_hash, compute_url_hash
from clean_helpers.stopwords import stopwords

set_verbosity_info()
Expand Down Expand Up @@ -47,13 +48,19 @@
"dedup_template_soft": build_dedup_template(
min_template_line_size=15,
min_template_line_occurence=5,
split_text=split_text_with_new_lines
),
"dedup_pseudocrawl_newspapers": build_dedup_template(
min_template_line_size=0,
min_template_line_occurence=2,
split_text=split_text_with_new_lines
),
"dedup_document": dedup_document,
"dedup_document_on_url": dedup_document_on_url
"dedup_document": build_dedup_document(
compute_hash=compute_text_hash
),
"dedup_document_on_url": build_dedup_document(
compute_hash=compute_url_hash
)
}

MAPS_KEYS = set(MAPS.keys())
Expand Down
2 changes: 1 addition & 1 deletion clean_helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .map_strip_substring import en_wiktionary_stripper
from .map_remove_references import build_reference_remover
from .clean_lines import build_line_with_substring_remover
from .deduplication import build_dedup_template, dedup_document, dedup_document_on_url
from .deduplication import build_dedup_template, build_dedup_document
124 changes: 56 additions & 68 deletions clean_helpers/deduplication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from functools import partial
from typing import List, Set, Tuple, Dict
from typing import List, Set, Tuple, Dict, Callable
import hashlib
import re
import string
Expand All @@ -12,12 +12,16 @@
from clean_helpers.utils import parse_meta


def build_dedup_template(min_template_line_size: int, min_template_line_occurence: int):
def build_dedup_template(
min_template_line_size: int,
min_template_line_occurence: int,
split_text: Callable[[str], List[str]]
):
def dedup_template(ds: Dataset, num_proc: int, batch_size: int) -> Dataset:
"""Computes and remove templates lines"""
# Compute the hash of each lines
split_into_lines_and_hashes = ds.map(
split_text_to_lines_and_hash,
split_text,
num_proc=num_proc,
batched=True,
batch_size=batch_size,
Expand Down Expand Up @@ -53,68 +57,40 @@ def dedup_template(ds: Dataset, num_proc: int, batch_size: int) -> Dataset:

return dedup_template

def build_dedup_document(
compute_hash: Callable[[Dict], List[str]],
):
def dedup_document(
ds: Dataset,
num_proc: int,
batch_size: int
) -> Dataset:
hashed_documents = ds.map(
lambda batch: {**batch, "hash": compute_hash(batch)},
num_proc=num_proc,
batched=True,
batch_size=batch_size,
)

def dedup_document(ds: Dataset, num_proc: int, batch_size: int) -> Dataset:
hashed_documents = ds.map(
lambda batch: {**batch, "hash": get_hash_stripped(batch["text"])},
num_proc=num_proc,
batched=True,
batch_size=batch_size,
)

hashes = set()

return hashed_documents.map(
partial(delete_text_from_duplicates, hashes=hashes),
num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
batched=True,
batch_size=batch_size,
remove_columns=hashed_documents.column_names
)

# # TODO: This version is memory efficient and runs faster, but we lose alignment
# return hashed_documents.filter(
# lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_],
# num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
# input_columns=["hash"],
# batched=True,
# batch_size=batch_size,
# ).remove_columns("hash")
hashes = set()

url_regex = re.compile(r"^(.[^?]*)")
def dedup_document_on_url(ds: Dataset, num_proc: int, batch_size: int) -> Dataset:
"""Deduplication on meta['url']"""
hashed_documents = ds.map(
lambda batch: {
**batch,
"hash": get_hash([
url_regex.match(parse_meta(meta)["url"]).group(1)
for meta in batch["meta"]
])}
,
num_proc=num_proc,
batched=True,
batch_size=batch_size,
)

hashes = set()

return hashed_documents.map(
partial(delete_text_from_duplicates, hashes=hashes),
num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
batched=True,
batch_size=batch_size,
remove_columns=hashed_documents.column_names
)

# return hashed_documents.filter(
# lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_],
# num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
# input_columns=["hash"],
# batched=True,
# batch_size=batch_size,
# ).remove_columns("hash")
return hashed_documents.map(
partial(delete_text_from_duplicates, hashes=hashes),
num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
batched=True,
batch_size=batch_size,
remove_columns=hashed_documents.column_names
)

# # TODO: This version is memory efficient and runs faster, but we lose alignment
# return hashed_documents.filter(
# lambda hashes_: [is_new_hash(hash_, hashes) for hash_ in hashes_],
# num_proc=1, # VERY IMPORTANT: hashes will be updated, and is not thread safe.
# input_columns=["hash"],
# batched=True,
# batch_size=batch_size,
# ).remove_columns("hash")
return dedup_document

# =========== HELPERS ===============

Expand All @@ -131,11 +107,7 @@ def get_hash(texts: List[str]) -> List[str]:
return [hashlib.md5(text.strip().encode("utf-8")).hexdigest() for text in texts]


def split_text_in_lines(text: str) -> List[str]:
return [line.strip() for line in text.split("\n")]


def split_text_to_lines_and_hash(batch: Dict[str, List]):
def split_text_and_hash(batch: Dict[str, List], split_text_in_lines: Callable[[str], List[str]]):
lines_per_texts = [split_text_in_lines(text) for text in batch["text"]]
return {
**{k: v for k, v in batch.items() if k != "text"},
Expand Down Expand Up @@ -181,4 +153,20 @@ def delete_text_from_duplicates(batch: Dict[str, List], hashes: Set[str]) -> Dic
return {
**{k: v for k, v in batch.items() if k != "hash"},
"text": [text if is_new_hash(hash_, hashes) else "" for text, hash_ in zip(batch["text"], batch["hash"])]
}
}


def compute_text_hash(batch: Dict) -> List[str]:
return get_hash_stripped(batch["text"])


url_regex = re.compile(r"^(.[^?]*)")
def compute_url_hash(batch: Dict) -> List[str]:
return get_hash([
url_regex.match(parse_meta(meta)["url"]).group(1)
for meta in batch["meta"]
])


def split_text_with_new_lines(text: str) -> List[str]:
return [line.strip() for line in text.split("\n")]