From 98d12b0a2051c5ae5a8e22aa4074fba11737b238 Mon Sep 17 00:00:00 2001 From: Neri Carcasci Date: Wed, 23 Jul 2025 15:35:46 +0100 Subject: [PATCH 1/2] metrics created --- .../language/Levenshtein/levenshtein.py | 86 +++++++++++++++++++ src/core/metrics/language/__init__.py | 0 src/core/metrics/language/bleu/bleu.py | 65 ++++++++++++++ .../metrics/language/error_rates/__init__.py | 0 .../language/error_rates/base_result.py | 10 +++ .../language/error_rates/match_error_rate.py | 16 ++++ .../language/error_rates/word_error_rate.py | 14 +++ .../error_rates/word_information_lost.py | 16 ++++ .../error_rates/word_information_preserved.py | 16 ++++ .../metrics/language/fuzzymatch/fuzzymatch.py | 71 +++++++++++++++ src/core/metrics/language/rogue/rogue.py | 55 ++++++++++++ src/core/metrics/language/utils.py | 8 ++ 12 files changed, 357 insertions(+) create mode 100644 src/core/metrics/language/Levenshtein/levenshtein.py create mode 100644 src/core/metrics/language/__init__.py create mode 100644 src/core/metrics/language/bleu/bleu.py create mode 100644 src/core/metrics/language/error_rates/__init__.py create mode 100644 src/core/metrics/language/error_rates/base_result.py create mode 100644 src/core/metrics/language/error_rates/match_error_rate.py create mode 100644 src/core/metrics/language/error_rates/word_error_rate.py create mode 100644 src/core/metrics/language/error_rates/word_information_lost.py create mode 100644 src/core/metrics/language/error_rates/word_information_preserved.py create mode 100644 src/core/metrics/language/fuzzymatch/fuzzymatch.py create mode 100644 src/core/metrics/language/rogue/rogue.py create mode 100644 src/core/metrics/language/utils.py diff --git a/src/core/metrics/language/Levenshtein/levenshtein.py b/src/core/metrics/language/Levenshtein/levenshtein.py new file mode 100644 index 0000000..1ff2bc4 --- /dev/null +++ b/src/core/metrics/language/Levenshtein/levenshtein.py @@ -0,0 +1,86 @@ +from typing import Callable, List, Union +from dataclasses import dataclass +from nltk.metrics.distance import edit_distance_align +from utils import clean_text + + + +@dataclass +class LevenshteinResult: + distance: int + insertions: int + deletions: int + substitutions: int + reference_length: int + + def normalized_distance(self) -> float: + if self.reference_length == 0: + return 0.0 + return self.distance / self.reference_length + +class Levenshtein: + + @staticmethod + def compute_( + reference: str, + hypothesis: str, + tokenizer: Callable[[str], List[str]] = None + ) -> LevenshteinResult: + """ + Compute Levenshtein distance at the character or token level. + + :param reference: Ground truth string. + :param hypothesis: Predicted string. + :param tokenizer: Optional function to split input into tokens. If None, character-level is used. + """ + + clean_ref = clean_text(reference) + clean_hyp = clean_text(hypothesis) + + return edit_distance(clean_ref, clean_hyp) + + + @staticmethod + def compute_with_counter( + reference: str, + hypothesis: str, + tokenizer: Callable[[str], List[str]] = None + ) -> LevenshteinResult: + """ + Compute Levenshtein distance at the character or token level. + + :param reference: Ground truth string. + :param hypothesis: Predicted string. + :param tokenizer: Optional function to split input into tokens. If None, character-level is used. + """ + clean_ref = clean_text(reference) + clean_hyp = clean_text(hypothesis) + + if tokenizer: + ref_seq = tokenizer(clean_ref) + hyp_seq = tokenizer(clean_hyp) + else: + ref_seq = list(clean_ref) + hyp_seq = list(clean_hyp) + + _, aligned_ref, aligned_hyp = edit_distance_align(ref_seq, hyp_seq) + + insertions = deletions = substitutions = 0 + for r, h in zip(aligned_ref, aligned_hyp): + if r == h: + continue + elif r == '*': + insertions += 1 + elif h == '*': + deletions += 1 + else: + substitutions += 1 + + total_distance = insertions + deletions + substitutions + return LevenshteinResult( + distance=total_distance, + insertions=insertions, + deletions=deletions, + substitutions=substitutions, + reference_length=len(ref_seq) + ) \ No newline at end of file diff --git a/src/core/metrics/language/__init__.py b/src/core/metrics/language/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/metrics/language/bleu/bleu.py b/src/core/metrics/language/bleu/bleu.py new file mode 100644 index 0000000..04de872 --- /dev/null +++ b/src/core/metrics/language/bleu/bleu.py @@ -0,0 +1,65 @@ +from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction +from nltk.tokenize import word_tokenize +from typing import List, Optional, Callable + + +class BLEUMetric: + def __init__( + self, + smoothing_method: Optional[int] = None, + tokenizer: Optional[Callable[[str], List[str]]] = None + ): + """ + :param smoothing_method: Smoothing method number (1-7) from nltk's SmoothingFunction. + None means no smoothing. + :param tokenizer: A callable that tokenizes a string into a list of tokens. Defaults to str.split. + """ + self.smoothing_function = ( + getattr(SmoothingFunction(), f"method{smoothing_method}") + if smoothing_method is not None else None + ) + self.tokenizer = tokenizer or (lambda x: x.split()) + + + @staticmethod + def create_uniform_weights(max_ngram: int) -> List[float]: + """ + Create uniform weights for BLEU-N scoring. + """ + return [1.0 / max_ngram] * max_ngram + + def calculate(self, references: List[str], hypothesis: str, max_ngram: int = 4, weights: Optional[List[float]] = None) -> float: + """ + Calculate sentence-level BLEU score. + """ + if weights is None: + weights = self.create_uniform_weights(max_ngram) + + tokenized_refs = [word_tokenize(ref) for ref in references] + tokenized_hyp = word_tokenize(hypothesis) + + return sentence_bleu( + tokenized_refs, + tokenized_hyp, + weights=weights, + smoothing_function=self.smoothing_function + ) + + def calculate_corpus(self, references: List[List[str]], hypotheses: List[str], max_ngram: int = 4, weights: Optional[List[float]] = None) -> float: + """Calculate corpus-level BLEU score. + + :param references: List of lists of reference strings. One list of references per hypothesis. + :param hypotheses: List of hypothesis strings. + """ + if weights is None: + weights = self.create_uniform_weights(max_ngram) + + tokenized_refs = [[word_tokenize(ref) for ref in ref_group] for ref_group in references] + tokenized_hyps = [word_tokenize(hyp) for hyp in hypotheses] + + return corpus_bleu( + tokenized_refs, + tokenized_hyps, + weights=weights, + smoothing_function=self.smoothing_function + ) \ No newline at end of file diff --git a/src/core/metrics/language/error_rates/__init__.py b/src/core/metrics/language/error_rates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/metrics/language/error_rates/base_result.py b/src/core/metrics/language/error_rates/base_result.py new file mode 100644 index 0000000..1e21d62 --- /dev/null +++ b/src/core/metrics/language/error_rates/base_result.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + +@dataclass +class ErrorRateResult: + value: float + insertions: int + deletions: int + substitutions: int + correct: int + reference_length: int diff --git a/src/core/metrics/language/error_rates/match_error_rate.py b/src/core/metrics/language/error_rates/match_error_rate.py new file mode 100644 index 0000000..f52a8e5 --- /dev/null +++ b/src/core/metrics/language/error_rates/match_error_rate.py @@ -0,0 +1,16 @@ +from typing import Callable, Optional +from levenshtein import Levenshtein +from .base_result import ErrorRateResult + +class MatchErrorRate: + def __init__(self, tokenizer: Optional[Callable] = None): + self.tokenizer = tokenizer or (lambda x: x.split()) + + def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: + counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) + S, D, I = counters.substitutions, counters.deletions, counters.insertions + N = counters.reference_length + C = N - S - D + denom = S + I + D + C + value = (S + I) / denom if denom else 1.0 + return ErrorRateResult(value, I, D, S, C, N) diff --git a/src/core/metrics/language/error_rates/word_error_rate.py b/src/core/metrics/language/error_rates/word_error_rate.py new file mode 100644 index 0000000..4b3328f --- /dev/null +++ b/src/core/metrics/language/error_rates/word_error_rate.py @@ -0,0 +1,14 @@ +from typing import Callable, Optional +from levenshtein import Levenshtein +from .base_result import ErrorRateResult + +class WordErrorRate: + def __init__(self, tokenizer: Optional[Callable] = None): + self.tokenizer = tokenizer or (lambda x: x.split()) + + def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: + counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) + S, D, I = counters.substitutions, counters.deletions, counters.insertions + N = counters.reference_length + value = (S + D + I) / N if N else 1.0 + return ErrorRateResult(value, I, D, S, 0, N) diff --git a/src/core/metrics/language/error_rates/word_information_lost.py b/src/core/metrics/language/error_rates/word_information_lost.py new file mode 100644 index 0000000..5242509 --- /dev/null +++ b/src/core/metrics/language/error_rates/word_information_lost.py @@ -0,0 +1,16 @@ +from typing import Callable, Optional +from levenshtein import Levenshtein +from .base_result import ErrorRateResult + +class WordInformationLost: + def __init__(self, tokenizer: Optional[Callable] = None): + self.tokenizer = tokenizer or (lambda x: x.split()) + + def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: + counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) + S, D, I = counters.substitutions, counters.deletions, counters.insertions + N = counters.reference_length + C = N - S - D + denom = N + C + value = (S + D + I) / denom if denom else 1.0 + return ErrorRateResult(value, I, D, S, C, N) diff --git a/src/core/metrics/language/error_rates/word_information_preserved.py b/src/core/metrics/language/error_rates/word_information_preserved.py new file mode 100644 index 0000000..69d9dd5 --- /dev/null +++ b/src/core/metrics/language/error_rates/word_information_preserved.py @@ -0,0 +1,16 @@ +from typing import Callable, Optional +from levenshtein import Levenshtein +from .base_result import ErrorRateResult + +class WordInformationPreserved: + def __init__(self, tokenizer: Optional[Callable] = None): + self.tokenizer = tokenizer or (lambda x: x.split()) + + def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: + counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) + S, D, I = counters.substitutions, counters.deletions, counters.insertions + N = counters.reference_length + C = N - S - D + denom = N + C + value = C / denom if denom else 0.0 + return ErrorRateResult(value, I, D, S, C, N) diff --git a/src/core/metrics/language/fuzzymatch/fuzzymatch.py b/src/core/metrics/language/fuzzymatch/fuzzymatch.py new file mode 100644 index 0000000..40f396a --- /dev/null +++ b/src/core/metrics/language/fuzzymatch/fuzzymatch.py @@ -0,0 +1,71 @@ +from typing import List, Optional, Callable +from bleu import BLEU +from levenshtein import WordErrorRate +from difflib import SequenceMatcher + + +class FuzzyMatch: + def __init__(self, tokenizer: Optional[Callable[[str], List[str]]] = None): + """ + :param tokenizer: A callable that tokenizes a string into a list of tokens. Defaults to str.split. + """ + self.tokenizer = tokenizer or (lambda x: x.split()) + + def calculate(self, reference: str, input_str: str) -> bool: + """ + Compare two strings for exact equality. + """ + return reference == input_str + + def calculate_wer(self, reference: str, input_str: str, threshold: float) -> bool: + """ + Return True if the WER between reference and input is less than the threshold. + """ + wer_metric = WordErrorRate(tokenizer=self.tokenizer) + wer = wer_metric.calculate(reference, input_str).value + return wer < threshold + + def calculate_bleu(self, references: List[str], input_str: str, threshold: float) -> bool: + """ + Return True if the BLEU score is above the threshold. + """ + bleu_metric = BLEU(tokenizer=self.tokenizer) + score = bleu_metric.calculate(references, input_str) + return score > threshold + + def calculate_bleu_ngram(self, references: List[str], input_str: str, threshold: float, max_ngram: int) -> bool: + """ + BLEU score with custom max n-gram. + """ + bleu_metric = BLEU(tokenizer=self.tokenizer) + score = bleu_metric.calculate(references, input_str, max_ngram=max_ngram) + return score > threshold + + def calculate_bleu_ngram_weights( + self, + references: List[str], + input_str: str, + threshold: float, + max_ngram: int, + weights: List[float] + ) -> bool: + """ + BLEU score with custom max n-gram and weights. + """ + bleu_metric = BLEU(tokenizer=self.tokenizer) + score = bleu_metric.calculate(references, input_str, max_ngram=max_ngram, weights=weights) + return score > threshold + + + def calculate_similarity(self, reference: str, input_str: str) -> float: + """ + Return a similarity score between 0 and 1 using difflib's SequenceMatcher. + """ + return SequenceMatcher(None, reference, input_str).ratio() + + + def is_similar(self, reference: str, input_str: str, threshold: float = 0.85) -> bool: + """ + Return True if the SequenceMatcher similarity exceeds a threshold. + """ + return self.calculate_similarity(reference, input_str) >= threshold \ No newline at end of file diff --git a/src/core/metrics/language/rogue/rogue.py b/src/core/metrics/language/rogue/rogue.py new file mode 100644 index 0000000..dbbc222 --- /dev/null +++ b/src/core/metrics/language/rogue/rogue.py @@ -0,0 +1,55 @@ +import re +from rouge_score import rouge_scorer +from nltk.tokenize import sent_tokenize, word_tokenize +from typing import Literal, List +from utils import clean_text + +class ROUGEMetric: + def __init__(self, rouge_type: Literal["rouge1", "rouge2", "rougeL", "rougeLsum"] = "rougeL"): + """ + :param rouge_type: The type of ROUGE score to compute. + Supported values: + - "rouge1" : unigram overlap + - "rouge2" : bigram overlap + - "rougeL" : longest common subsequence (LCS) + - "rougeLsum" : sentence-level LCS averaged over pairs + """ + self.rouge_type = rouge_type + self.scorer = rouge_scorer.RougeScorer([rouge_type], use_stemmer=True) + + def calculate(self, reference: str, hypothesis: str) -> float: + """ + Calculates the ROUGE score between a single reference and hypothesis. + + : param reference: The ground truth string. + : param hypothesis: The generated string. + : return: The ROUGE F1 score as a float. + """ + if self.rouge_type == "rougeLsum": + return self._rouge_lsum(reference, hypothesis) + else: + reference = clean_text(reference) + hypothesis = clean_text(hypothesis) + score = self.scorer.score(reference, hypothesis) + return score[self.rouge_type].fmeasure + + def _rouge_lsum(self, reference: str, hypothesis: str) -> float: + """ + Calculates the ROUGE-Lsum score by averaging sentence-level ROUGE-L scores. + + :param reference: A reference paragraph consisting of multiple sentences. + :param hypothesis: A hypothesis paragraph consisting of multiple sentences. + :return: The average ROUGE-L F1 score over aligned sentence pairs. + """ + reference = clean_text(reference) + hypothesis = clean_text(hypothesis) + ref_sents = sent_tokenize(reference) + hyp_sents = sent_tokenize(hypothesis) + + total_score = 0.0 + count = min(len(ref_sents), len(hyp_sents)) + for i in range(count): + score = self.scorer.score(ref_sents[i], hyp_sents[i]) + total_score += score["rougeLsum"].fmeasure + + return total_score / count if count > 0 else 0.0 \ No newline at end of file diff --git a/src/core/metrics/language/utils.py b/src/core/metrics/language/utils.py new file mode 100644 index 0000000..02e0f8b --- /dev/null +++ b/src/core/metrics/language/utils.py @@ -0,0 +1,8 @@ +import string +import re + +def clean_text(text: str) -> str: + # Remove punctuation and extra whitespace, lowercasing + text = text.lower() + text = re.sub(f"[{re.escape(string.punctuation)}]", "", text) + return text.strip() \ No newline at end of file From f2eec94c6ed6313cd8329d9a3cc1069360801226 Mon Sep 17 00:00:00 2001 From: Neri Carcasci Date: Thu, 28 Aug 2025 15:42:41 +0100 Subject: [PATCH 2/2] updated metrics & unit tests --- pyproject.toml | 2 + .../metrics/language/Levenshtein/__init__.py | 1 + .../language/Levenshtein/levenshtein.py | 13 +- src/core/metrics/language/bleu/__init__.py | 1 + src/core/metrics/language/bleu/bleu.py | 22 ++-- .../language/error_rates/match_error_rate.py | 2 +- .../language/error_rates/word_error_rate.py | 2 +- .../error_rates/word_information_lost.py | 13 +- .../error_rates/word_information_preserved.py | 21 +++- .../{fuzzymatch => match}/fuzzymatch.py | 7 +- src/core/metrics/language/rogue/__init__.py | 1 + src/core/metrics/language/rogue/rogue.py | 22 ++-- tests/metrics/language/test_bleu.py | 112 ++++++++++++++++++ tests/metrics/language/test_fuzzyMatch.py | 51 ++++++++ .../language/test_levenshteinCommon.py | 41 +++++++ tests/metrics/language/test_rogue.py | 38 ++++++ tests/metrics/language/test_wordErrorRate.py | 59 +++++++++ 17 files changed, 369 insertions(+), 39 deletions(-) create mode 100644 src/core/metrics/language/Levenshtein/__init__.py create mode 100644 src/core/metrics/language/bleu/__init__.py rename src/core/metrics/language/{fuzzymatch => match}/fuzzymatch.py (87%) create mode 100644 src/core/metrics/language/rogue/__init__.py create mode 100644 tests/metrics/language/test_bleu.py create mode 100644 tests/metrics/language/test_fuzzyMatch.py create mode 100644 tests/metrics/language/test_levenshteinCommon.py create mode 100644 tests/metrics/language/test_rogue.py create mode 100644 tests/metrics/language/test_wordErrorRate.py diff --git a/pyproject.toml b/pyproject.toml index 37e76bd..052b331 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,8 @@ dependencies = [ "aif360", "hypothesis>=6.136.2", "pytest>=8.4.1", + "nltk>=3.8", + "rouge-score>=0.1.2", ] [project.optional-dependencies] diff --git a/src/core/metrics/language/Levenshtein/__init__.py b/src/core/metrics/language/Levenshtein/__init__.py new file mode 100644 index 0000000..8af22a7 --- /dev/null +++ b/src/core/metrics/language/Levenshtein/__init__.py @@ -0,0 +1 @@ +from .levenshtein import Levenshtein \ No newline at end of file diff --git a/src/core/metrics/language/Levenshtein/levenshtein.py b/src/core/metrics/language/Levenshtein/levenshtein.py index 1ff2bc4..4dd6634 100644 --- a/src/core/metrics/language/Levenshtein/levenshtein.py +++ b/src/core/metrics/language/Levenshtein/levenshtein.py @@ -1,7 +1,7 @@ from typing import Callable, List, Union from dataclasses import dataclass -from nltk.metrics.distance import edit_distance_align -from utils import clean_text +from nltk.metrics.distance import edit_distance, edit_distance_align +from src.core.metrics.language.utils import clean_text @@ -63,7 +63,14 @@ def compute_with_counter( ref_seq = list(clean_ref) hyp_seq = list(clean_hyp) - _, aligned_ref, aligned_hyp = edit_distance_align(ref_seq, hyp_seq) + alignment = edit_distance_align(ref_seq, hyp_seq) + + aligned_ref = [] + aligned_hyp = [] + for i, j in alignment: + aligned_ref.append(ref_seq[i] if i < len(ref_seq) else '*') + aligned_hyp.append(hyp_seq[j] if j < len(hyp_seq) else '*') + insertions = deletions = substitutions = 0 for r, h in zip(aligned_ref, aligned_hyp): diff --git a/src/core/metrics/language/bleu/__init__.py b/src/core/metrics/language/bleu/__init__.py new file mode 100644 index 0000000..a836f70 --- /dev/null +++ b/src/core/metrics/language/bleu/__init__.py @@ -0,0 +1 @@ +from .bleu import BLEUMetric \ No newline at end of file diff --git a/src/core/metrics/language/bleu/bleu.py b/src/core/metrics/language/bleu/bleu.py index 04de872..c974aa7 100644 --- a/src/core/metrics/language/bleu/bleu.py +++ b/src/core/metrics/language/bleu/bleu.py @@ -1,5 +1,4 @@ from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction -from nltk.tokenize import word_tokenize from typing import List, Optional, Callable @@ -14,10 +13,13 @@ def __init__( None means no smoothing. :param tokenizer: A callable that tokenizes a string into a list of tokens. Defaults to str.split. """ - self.smoothing_function = ( - getattr(SmoothingFunction(), f"method{smoothing_method}") - if smoothing_method is not None else None - ) + if isinstance(smoothing_method, int): + self.smoothing_function = getattr(SmoothingFunction(), f"method{smoothing_method}") + elif callable(smoothing_method): + self.smoothing_function = smoothing_method + else: + self.smoothing_function = None + self.tokenizer = tokenizer or (lambda x: x.split()) @@ -26,6 +28,8 @@ def create_uniform_weights(max_ngram: int) -> List[float]: """ Create uniform weights for BLEU-N scoring. """ + if not isinstance(max_ngram, int): + max_ngram = int(max_ngram) return [1.0 / max_ngram] * max_ngram def calculate(self, references: List[str], hypothesis: str, max_ngram: int = 4, weights: Optional[List[float]] = None) -> float: @@ -35,8 +39,8 @@ def calculate(self, references: List[str], hypothesis: str, max_ngram: int = 4, if weights is None: weights = self.create_uniform_weights(max_ngram) - tokenized_refs = [word_tokenize(ref) for ref in references] - tokenized_hyp = word_tokenize(hypothesis) + tokenized_refs = [self.tokenizer(ref) for ref in references] + tokenized_hyp = self.tokenizer(hypothesis) return sentence_bleu( tokenized_refs, @@ -54,8 +58,8 @@ def calculate_corpus(self, references: List[List[str]], hypotheses: List[str], m if weights is None: weights = self.create_uniform_weights(max_ngram) - tokenized_refs = [[word_tokenize(ref) for ref in ref_group] for ref_group in references] - tokenized_hyps = [word_tokenize(hyp) for hyp in hypotheses] + tokenized_refs = [[self.tokenizer(ref) for ref in ref_group] for ref_group in references] + tokenized_hyps = [self.tokenizer(hyp) for hyp in hypotheses] return corpus_bleu( tokenized_refs, diff --git a/src/core/metrics/language/error_rates/match_error_rate.py b/src/core/metrics/language/error_rates/match_error_rate.py index f52a8e5..40dffe9 100644 --- a/src/core/metrics/language/error_rates/match_error_rate.py +++ b/src/core/metrics/language/error_rates/match_error_rate.py @@ -1,5 +1,5 @@ from typing import Callable, Optional -from levenshtein import Levenshtein +from src.core.metrics.language.levenshtein import Levenshtein from .base_result import ErrorRateResult class MatchErrorRate: diff --git a/src/core/metrics/language/error_rates/word_error_rate.py b/src/core/metrics/language/error_rates/word_error_rate.py index 4b3328f..f8f202c 100644 --- a/src/core/metrics/language/error_rates/word_error_rate.py +++ b/src/core/metrics/language/error_rates/word_error_rate.py @@ -1,5 +1,5 @@ from typing import Callable, Optional -from levenshtein import Levenshtein +from src.core.metrics.language.levenshtein import Levenshtein from .base_result import ErrorRateResult class WordErrorRate: diff --git a/src/core/metrics/language/error_rates/word_information_lost.py b/src/core/metrics/language/error_rates/word_information_lost.py index 5242509..09421f5 100644 --- a/src/core/metrics/language/error_rates/word_information_lost.py +++ b/src/core/metrics/language/error_rates/word_information_lost.py @@ -1,5 +1,6 @@ from typing import Callable, Optional -from levenshtein import Levenshtein +from src.core.metrics.language.levenshtein import Levenshtein +from .word_information_preserved import WordInformationPreserved from .base_result import ErrorRateResult class WordInformationLost: @@ -7,10 +8,6 @@ def __init__(self, tokenizer: Optional[Callable] = None): self.tokenizer = tokenizer or (lambda x: x.split()) def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: - counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) - S, D, I = counters.substitutions, counters.deletions, counters.insertions - N = counters.reference_length - C = N - S - D - denom = N + C - value = (S + D + I) / denom if denom else 1.0 - return ErrorRateResult(value, I, D, S, C, N) + wip = WordInformationPreserved(self.tokenizer).calculate(reference, hypothesis).value + value = 1.0 - wip + return ErrorRateResult(value, 0, 0, 0, 0, 0) diff --git a/src/core/metrics/language/error_rates/word_information_preserved.py b/src/core/metrics/language/error_rates/word_information_preserved.py index 69d9dd5..4f1727c 100644 --- a/src/core/metrics/language/error_rates/word_information_preserved.py +++ b/src/core/metrics/language/error_rates/word_information_preserved.py @@ -1,5 +1,5 @@ from typing import Callable, Optional -from levenshtein import Levenshtein +from src.core.metrics.language.levenshtein import Levenshtein from .base_result import ErrorRateResult class WordInformationPreserved: @@ -7,10 +7,19 @@ def __init__(self, tokenizer: Optional[Callable] = None): self.tokenizer = tokenizer or (lambda x: x.split()) def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult: + tokens_ref = self.tokenizer(reference) + tokens_hyp = self.tokenizer(hypothesis) + counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer) S, D, I = counters.substitutions, counters.deletions, counters.insertions - N = counters.reference_length - C = N - S - D - denom = N + C - value = C / denom if denom else 0.0 - return ErrorRateResult(value, I, D, S, C, N) + H = counters.reference_length - S - D # Correct words = reference length - (S + D) + + N_ref = len(tokens_ref) + N_hyp = len(tokens_hyp) + + # Approximate WIP: (H / N_ref) * (H / N_hyp) + wip = 0.0 + if N_ref > 0 and N_hyp > 0: + wip = (H / N_ref) * (H / N_hyp) + + return ErrorRateResult(wip, I, D, S, H, N_ref) \ No newline at end of file diff --git a/src/core/metrics/language/fuzzymatch/fuzzymatch.py b/src/core/metrics/language/match/fuzzymatch.py similarity index 87% rename from src/core/metrics/language/fuzzymatch/fuzzymatch.py rename to src/core/metrics/language/match/fuzzymatch.py index 40f396a..b55c9ba 100644 --- a/src/core/metrics/language/fuzzymatch/fuzzymatch.py +++ b/src/core/metrics/language/match/fuzzymatch.py @@ -1,6 +1,6 @@ from typing import List, Optional, Callable -from bleu import BLEU -from levenshtein import WordErrorRate +from src.core.metrics.language.bleu.bleu import BLEUMetric as BLEU +from src.core.metrics.language.error_rates.word_error_rate import WordErrorRate from difflib import SequenceMatcher @@ -23,6 +23,7 @@ def calculate_wer(self, reference: str, input_str: str, threshold: float) -> boo """ wer_metric = WordErrorRate(tokenizer=self.tokenizer) wer = wer_metric.calculate(reference, input_str).value + print(f"[calculate_wer] reference: {reference}\ninput_str: {input_str}\nWER: {wer}\nthreshold: {threshold}") return wer < threshold def calculate_bleu(self, references: List[str], input_str: str, threshold: float) -> bool: @@ -57,7 +58,7 @@ def calculate_bleu_ngram_weights( return score > threshold - def calculate_similarity(self, reference: str, input_str: str) -> float: + def calculate_similarity(self, reference: str, input_str: str) -> float: """ Return a similarity score between 0 and 1 using difflib's SequenceMatcher. """ diff --git a/src/core/metrics/language/rogue/__init__.py b/src/core/metrics/language/rogue/__init__.py new file mode 100644 index 0000000..21d8537 --- /dev/null +++ b/src/core/metrics/language/rogue/__init__.py @@ -0,0 +1 @@ +from .rogue import ROUGEMetric \ No newline at end of file diff --git a/src/core/metrics/language/rogue/rogue.py b/src/core/metrics/language/rogue/rogue.py index dbbc222..362a993 100644 --- a/src/core/metrics/language/rogue/rogue.py +++ b/src/core/metrics/language/rogue/rogue.py @@ -1,8 +1,8 @@ import re from rouge_score import rouge_scorer -from nltk.tokenize import sent_tokenize, word_tokenize +from nltk.tokenize import sent_tokenize from typing import Literal, List -from utils import clean_text +from src.core.metrics.language.utils import clean_text class ROUGEMetric: def __init__(self, rouge_type: Literal["rouge1", "rouge2", "rougeL", "rougeLsum"] = "rougeL"): @@ -17,6 +17,12 @@ def __init__(self, rouge_type: Literal["rouge1", "rouge2", "rougeL", "rougeLsum" self.rouge_type = rouge_type self.scorer = rouge_scorer.RougeScorer([rouge_type], use_stemmer=True) + @staticmethod + def simple_sent_tokenize(text: str) -> list[str]: + # Split on sentence-ending punctuation followed by a space and a capital letter + pattern = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') + return re.split(pattern, text) + def calculate(self, reference: str, hypothesis: str) -> float: """ Calculates the ROUGE score between a single reference and hypothesis. @@ -41,15 +47,15 @@ def _rouge_lsum(self, reference: str, hypothesis: str) -> float: :param hypothesis: A hypothesis paragraph consisting of multiple sentences. :return: The average ROUGE-L F1 score over aligned sentence pairs. """ - reference = clean_text(reference) - hypothesis = clean_text(hypothesis) - ref_sents = sent_tokenize(reference) - hyp_sents = sent_tokenize(hypothesis) + ref_sents = self.simple_sent_tokenize(reference) + hyp_sents = self.simple_sent_tokenize(hypothesis) + ref_sents_cleaned = [clean_text(s) for s in ref_sents] + hyp_sents_cleaned = [clean_text(s) for s in hyp_sents] total_score = 0.0 - count = min(len(ref_sents), len(hyp_sents)) + count = min(len(ref_sents_cleaned), len(hyp_sents_cleaned)) for i in range(count): - score = self.scorer.score(ref_sents[i], hyp_sents[i]) + score = self.scorer.score(ref_sents_cleaned[i], hyp_sents_cleaned[i]) total_score += score["rougeLsum"].fmeasure return total_score / count if count > 0 else 0.0 \ No newline at end of file diff --git a/tests/metrics/language/test_bleu.py b/tests/metrics/language/test_bleu.py new file mode 100644 index 0000000..e3449b1 --- /dev/null +++ b/tests/metrics/language/test_bleu.py @@ -0,0 +1,112 @@ +import pytest +from src.core.metrics.language.bleu import BLEUMetric as BLEU +from nltk.translate.bleu_score import SmoothingFunction + + +common_hypothesis = "the cat the cat on mat" +common = "the cat is on the mat" +uncommon = "The candidate has no alignment to any of the references" +common_references = [common, common] + +validation_reference = [ + "It is a guide to action that ensures that the military will forever heed Party commands", + "It is the guiding principle which guarantees the military forces always being under the command of the Party", + "It is the practical guide for the army always to heed the directions of the party" +] + +validation_hypothesis_a = "It is a guide to action which ensures that the military always obeys the commands of the party" +validation_hypothesis_b = "It is to insure the troops forever hearing the activity guidebook that party direct" + +hyp1 = "It is a guide to action which ensures that the military always obeys the commands of the party" +hyp2 = "he read the book because he was interested in world history" + +ref1a = "It is a guide to action that ensures that the military will forever heed Party commands" +ref1b = "It is the guiding principle which guarantees the military forces always being under the command of the Party" +ref1c = "It is the practical guide for the army always to heed the directions of the party" +ref2a = "he was interested in world history because he read the book" + +def test_sentence_epsilon_smoothing_weights(): + smoothing = SmoothingFunction().method1 + bleu = BLEU(smoothing_method=smoothing) + score = bleu.calculate(common_references, common_hypothesis, max_ngram=2, weights=[0.3, 0.7]) + assert pytest.approx(score, 0.05) == 0.3 + +def test_sentence_no_smoothing_weights(): + bleu = BLEU() + score = bleu.calculate(common_references, common_hypothesis, max_ngram=2, weights=[0.3, 0.7]) + assert pytest.approx(score, 0.05) == 0.3 + +def test_sentence_no_smoothing_no_weights(): + bleu = BLEU() + score = bleu.calculate(common_references, common_hypothesis, max_ngram=2) + assert pytest.approx(score, abs=0.01) == 0.4082 + +def test_zero_matches(): + references = [uncommon] + hypothesis = "John loves Mary" + bleu = BLEU() + for n in range(1, 6): + assert bleu.calculate(references, hypothesis, max_ngram=n) == 0.0 + +def test_full_matches(): + references = [uncommon] * 4 + bleu = BLEU() + for n in range(1, 11): + assert bleu.calculate(references, uncommon, max_ngram=n) == 1.0 + + + +def test_validation_bleu_2(): + bleu = BLEU() + assert pytest.approx(bleu.calculate(validation_reference, validation_hypothesis_a, max_ngram=2), 0.05) == 0.7453 + +def test_validation_bleu_3(): + bleu = BLEU() + assert pytest.approx(bleu.calculate(validation_reference, validation_hypothesis_a, max_ngram=3), 0.05) == 0.6240 + +def test_validation_bleu_4(): + bleu = BLEU() + assert pytest.approx(bleu.calculate(validation_reference, validation_hypothesis_a, max_ngram=4), 0.02) == 0.5045 + +def test_validation_bleu_5(): + bleu = BLEU() + assert pytest.approx(bleu.calculate(validation_reference, validation_hypothesis_a, max_ngram=5), 0.02) == 0.3920 + +def test_bleu_corpus(): + bleu = BLEU() + references = [[ref1a, ref1b, ref1c], [ref2a]] + hypotheses = [hyp1, hyp2] + weights = [0.25] * 4 + score = bleu.calculate_corpus(references, hypotheses, max_ngram=4, weights=weights) + assert pytest.approx(score, 0.01) == 0.5920 + +def test_bleu_corpus_individual_consistency(): + bleu = BLEU() + score1 = bleu.calculate([ref1a, ref1b, ref1c], hyp1) + score2 = bleu.calculate([ref2a], hyp2) + average = (score1 + score2) / 2.0 + assert pytest.approx(average, 0.05) == 0.6223 + +def test_bleu_corpus_custom_weights(): + bleu = BLEU() + weights = [0.1, 0.3, 0.5, 0.1] + references = [[ref1a, ref1b, ref1c], [ref2a]] + hypotheses = [hyp1, hyp2] + assert pytest.approx(bleu.calculate_corpus(references, hypotheses, max_ngram=4, weights=weights), 0.01) == 0.5818 + +def test_bleu_corpus_multiple_weight_sets(): + bleu = BLEU() + weight_sets = [ + [0.5, 0.5], + [0.333, 0.333, 0.334], + [0.25, 0.25, 0.25, 0.25], + [0.2, 0.2, 0.2, 0.2, 0.2] + ] + expected_scores = [0.8242, 0.7067, 0.5920, 0.4719] + references = [[ref1a, ref1b, ref1c], [ref2a]] + hypotheses = [hyp1, hyp2] + + for weights, expected in zip(weight_sets, expected_scores): + ngram = len(weights) + score = bleu.calculate_corpus(references, hypotheses, max_ngram=ngram, weights=weights) + assert pytest.approx(score, 0.02) == expected \ No newline at end of file diff --git a/tests/metrics/language/test_fuzzyMatch.py b/tests/metrics/language/test_fuzzyMatch.py new file mode 100644 index 0000000..ba9cbfc --- /dev/null +++ b/tests/metrics/language/test_fuzzyMatch.py @@ -0,0 +1,51 @@ +import pytest +from src.core.metrics.language.match.fuzzymatch import FuzzyMatch + + +references = [ + "This is the test reference, to which I will compare alignment against.", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur condimentum velit id velit posuere dictum. Fusce euismod tortor massa, nec euismod sapien laoreet non. Donec vulputate mi velit, eu ultricies nibh iaculis vel. Aenean posuere urna nec sapien consectetur, vitae porttitor sapien finibus. Duis nec libero convallis lectus pharetra blandit ut ac odio. Vivamus nec dui quis sem convallis pulvinar. Maecenas sodales sollicitudin leo a faucibus.", + "The quick red fox jumped over the lazy brown dog" +] + +inputs = [ + "I'm a hypothesis reference, from which the aligner will compare against.", + "Lorem ipsum sit amet, consectetur adipiscing elit. Curabitur condimentum velit id velit posuere dictum. Fusce blandit euismod tortor massa, nec euismod sapien blandit laoreet non. Donec vulputate mi velit, eu ultricies nibh iaculis vel. Aenean posuere urna nec sapien consectetur, vitae porttitor sapien finibus. Duis nec libero convallis lectus pharetra blandit ut ac odio. Vivamus nec dui quis sem convallis pulvinar. Maecenas sodales sollicitudin leo a faucibus.", + "The quick red fox jumped over the lazy brown dog" +] + + +def commons_tokenizer(text: str): + return [token for token in text.split() if token.strip()] + + +def test_exact_match(): + expected = [False, False, True] + fm = FuzzyMatch() + for i, (ref, hyp) in enumerate(zip(references, inputs)): + assert fm.calculate(ref, hyp) == expected[i] + +def test_wer_match_default_tokenizer(): + expected = [True, True, True] + fm = FuzzyMatch() + for i, (ref, hyp) in enumerate(zip(references, inputs)): + actual = fm.calculate_wer(ref, hyp, threshold=0.67) # 0.65 is too low for this test + assert actual == expected[i] + +def test_wer_match_commons_tokenizer(): + expected = [False, True, True] + fm = FuzzyMatch(tokenizer=commons_tokenizer) + for i, (ref, hyp) in enumerate(zip(references, inputs)): + assert fm.calculate_wer(ref, hyp, threshold=0.65) == expected[i] + +def test_bleu_match_default_tokenizer(): + expected = [False, True, True] + fm = FuzzyMatch() + for i, (ref, hyp) in enumerate(zip(references, inputs)): + assert fm.calculate_bleu([ref], hyp, threshold=0.8) == expected[i] + +def test_bleu_match_commons_tokenizer(): + expected = [False, True, True] + fm = FuzzyMatch(tokenizer=commons_tokenizer) + for i, (ref, hyp) in enumerate(zip(references, inputs)): + assert fm.calculate_bleu([ref], hyp, threshold=0.8) == expected[i] \ No newline at end of file diff --git a/tests/metrics/language/test_levenshteinCommon.py b/tests/metrics/language/test_levenshteinCommon.py new file mode 100644 index 0000000..369d754 --- /dev/null +++ b/tests/metrics/language/test_levenshteinCommon.py @@ -0,0 +1,41 @@ +import pytest +from src.core.metrics.language.error_rates.word_error_rate import WordErrorRate +from src.core.metrics.language.error_rates.match_error_rate import MatchErrorRate +from src.core.metrics.language.error_rates.word_information_lost import WordInformationLost +from src.core.metrics.language.error_rates.word_information_preserved import WordInformationPreserved + +TOLERANCE = 1e-5 + +def run_test_all(reference_tokens, hypothesis_tokens, expected_wer, expected_mer, expected_wil, expected_wip, tol=TOLERANCE): + reference = " ".join(reference_tokens) + hypothesis = " ".join(hypothesis_tokens) + + wer = WordErrorRate().calculate(reference, hypothesis).value + mer = MatchErrorRate().calculate(reference, hypothesis).value + wil = WordInformationLost().calculate(reference, hypothesis).value + wip = WordInformationPreserved().calculate(reference, hypothesis).value + + print(f"[test_all] ref: {reference}\nhyp: {hypothesis}\nWER: {wer} (expected {expected_wer})\nMER: {mer} (expected {expected_mer})\nWIL: {wil} (expected {expected_wil})\nWIP: {wip} (expected {expected_wip})") + + assert pytest.approx(wer, abs=tol) == expected_wer, f"Expected WER: {expected_wer}, got {wer}" + assert pytest.approx(mer, abs=tol) == expected_mer, f"Expected MER: {expected_mer}, got {mer}" + assert pytest.approx(wil, abs=tol) == expected_wil, f"Expected WIL: {expected_wil}, got {wil}" + assert pytest.approx(wip, abs=tol) == expected_wip, f"Expected WIP: {expected_wip}, got {wip}" + + +def test_equal_reference_hypothesis(): + ref = ["X"] + hyp = ["X"] + run_test_all(ref, hyp, expected_wer=0.0, expected_mer=0.0, expected_wil=0.0, expected_wip=1.0) + + +def test_repeated_hypothesis(): + ref = ["X"] + hyp = ["X", "X", "Y", "Y"] + run_test_all(ref, hyp, expected_wer=3.0, expected_mer=0.75, expected_wil=0.75, expected_wip=0.25) + + +def test_overlap(): + ref = ["X", "Y", "Z"] + hyp = ["X", "Z"] + run_test_all(ref, hyp, expected_wer=1/3, expected_mer=1/3, expected_wil=1/3, expected_wip=2/3) \ No newline at end of file diff --git a/tests/metrics/language/test_rogue.py b/tests/metrics/language/test_rogue.py new file mode 100644 index 0000000..1d958df --- /dev/null +++ b/tests/metrics/language/test_rogue.py @@ -0,0 +1,38 @@ +import pytest +from src.core.metrics.language.rogue import ROUGEMetric as ROUGE + +def test_rouge1(): + rouge = ROUGE(rouge_type="rouge1") + score = rouge.calculate("testing one two", "testing") + assert score == 0.5 + +def test_rouge_scores_empty(): + for rouge_type in ["rouge1", "rouge2", "rougeL", "rougeLsum"]: + rouge = ROUGE(rouge_type=rouge_type) + score = rouge.calculate("testing one two", "") + assert score == 0 + +def test_rouge2(): + rouge = ROUGE(rouge_type="rouge2") + score = rouge.calculate("testing one two", "testing one") + assert pytest.approx(score, abs=0.05) == 0.66 + +def test_rougel_consecutive(): + rouge = ROUGE(rouge_type="rougeL") + score = rouge.calculate("testing one two", "testing one") + assert pytest.approx(score, abs=0.05) == 0.8 + +def test_rougel_non_consecutive(): + rouge = ROUGE(rouge_type="rougeL") + score = rouge.calculate("testing one two", "testing two") + assert pytest.approx(score, abs=0.05) == 0.8 + +def test_rougel_sum(): + rouge = ROUGE(rouge_type="rougeLsum") + score = rouge.calculate("w1 w2 w3 w4 w5", "w1 w2 w6 w7 w8\nw1 w3 w8 w9 w5") + assert pytest.approx(score, abs=0.05) == 0.5 + +def test_rougel_sum_non_word(): + rouge = ROUGE(rouge_type="rougeLsum") + score = rouge.calculate("w1 w2 w3 w4 w5", "/") + assert score == 0 \ No newline at end of file diff --git a/tests/metrics/language/test_wordErrorRate.py b/tests/metrics/language/test_wordErrorRate.py new file mode 100644 index 0000000..eb5c43b --- /dev/null +++ b/tests/metrics/language/test_wordErrorRate.py @@ -0,0 +1,59 @@ +import pytest +from src.core.metrics.language.error_rates.word_error_rate import WordErrorRate + + +references = [ + "This is the test reference, to which I will compare alignment against.", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur condimentum velit id velit posuere dictum. Fusce euismod tortor massa, nec euismod sapien laoreet non. Donec vulputate mi velit, eu ultricies nibh iaculis vel. Aenean posuere urna nec sapien consectetur, vitae porttitor sapien finibus. Duis nec libero convallis lectus pharetra blandit ut ac odio. Vivamus nec dui quis sem convallis pulvinar. Maecenas sodales sollicitudin leo a faucibus.", + "The quick red fox jumped over the lazy brown dog", + "i love cold pizza" +] + +inputs = [ + "I'm a hypothesis reference, from which the aligner will compare against.", + "Lorem ipsum sit amet, consectetur adipiscing elit. Curabitur condimentum velit id velit posuere dictum. Fusce blandit euismod tortor massa, nec euismod sapien blandit laoreet non. Donec vulputate mi velit, eu ultricies nibh iaculis vel. Aenean posuere urna nec sapien consectetur, vitae porttitor sapien finibus. Duis nec libero convallis lectus pharetra blandit ut ac odio. Vivamus nec dui quis sem convallis pulvinar. Maecenas sodales sollicitudin leo a faucibus.", + "dog brown lazy the over jumped fox red quick The", + "i love pizza" +] + + +ground_truth_commons = [8 / 12., 3 / 66., 1.0, 0.25] +ground_truth_nlp = [9 / 14., 3 / 78., 1.0, 0.25] +ground_truth_whitespace = [8 / 14., 3 / 78., 10 / 10., 0.25] + +TOLERANCE = 0.1 + + + +def commons_tokenizer(text: str): + """Simulates Apache StringTokenizer (split on whitespace, drop empty tokens).""" + return [token for token in text.split() if token.strip()] + +def whitespace_tokenizer(text: str): + """Simulates Java-style default whitespace split.""" + return text.split() + +def simulated_open_nlp_tokenizer(text: str): + """Rough simulation of OpenNLP's SimpleTokenizer with basic punctuation split.""" + import re + return re.findall(r"\w+|[^\w\s]", text, re.UNICODE) + + + +@pytest.mark.parametrize("ref, hyp, expected", zip(references, inputs, ground_truth_commons)) +def test_commons_tokenizer(ref, hyp, expected): + wer = WordErrorRate(tokenizer=commons_tokenizer) + result = wer.calculate(ref, hyp) + assert pytest.approx(result.value, abs=TOLERANCE) == expected + +@pytest.mark.parametrize("ref, hyp, expected", zip(references, inputs, ground_truth_whitespace)) +def test_whitespace_tokenizer_default(ref, hyp, expected): + wer = WordErrorRate(tokenizer=whitespace_tokenizer) + result = wer.calculate(ref, hyp) + assert pytest.approx(result.value, abs=TOLERANCE) == expected + +@pytest.mark.parametrize("ref, hyp, expected", zip(references, inputs, ground_truth_nlp)) +def test_simulated_open_nlp_tokenizer(ref, hyp, expected): + wer = WordErrorRate(tokenizer=simulated_open_nlp_tokenizer) + result = wer.calculate(ref, hyp) + assert pytest.approx(result.value, abs=TOLERANCE) == expected \ No newline at end of file