Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies = [
"aif360",
"hypothesis>=6.136.2",
"pytest>=8.4.1",
"nltk>=3.8",
"rouge-score>=0.1.2",
]

[project.optional-dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/core/metrics/language/Levenshtein/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .levenshtein import Levenshtein
93 changes: 93 additions & 0 deletions src/core/metrics/language/Levenshtein/levenshtein.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Callable, List, Union
from dataclasses import dataclass
from nltk.metrics.distance import edit_distance, edit_distance_align
from src.core.metrics.language.utils import clean_text



@dataclass
class LevenshteinResult:
distance: int
insertions: int
deletions: int
substitutions: int
reference_length: int

def normalized_distance(self) -> float:
if self.reference_length == 0:
return 0.0
return self.distance / self.reference_length

class Levenshtein:

@staticmethod
def compute_(
reference: str,
hypothesis: str,
tokenizer: Callable[[str], List[str]] = None
) -> LevenshteinResult:
"""
Compute Levenshtein distance at the character or token level.

:param reference: Ground truth string.
:param hypothesis: Predicted string.
:param tokenizer: Optional function to split input into tokens. If None, character-level is used.
"""

clean_ref = clean_text(reference)
clean_hyp = clean_text(hypothesis)

return edit_distance(clean_ref, clean_hyp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: compute_ returns raw edit_distance instead of LevenshteinResult.

Please update compute_ to return a LevenshteinResult object instead of a raw integer for consistency with the rest of the codebase.

Suggested implementation:

        clean_ref = clean_text(reference)
        clean_hyp = clean_text(hypothesis)

        distance, insertions, deletions, substitutions = edit_distance(clean_ref, clean_hyp, return_operations=True)
        reference_length = len(clean_ref)
        return LevenshteinResult(
            distance=distance,
            insertions=insertions,
            deletions=deletions,
            substitutions=substitutions,
            reference_length=reference_length
        )
  • The edit_distance function must support a return_operations=True argument and return a tuple: (distance, insertions, deletions, substitutions). If it does not, you will need to update edit_distance accordingly.
  • If edit_distance only returns an integer, you must refactor it to also compute and return insertions, deletions, and substitutions.
  • Ensure that LevenshteinResult is imported or defined in the scope of compute_.



@staticmethod
def compute_with_counter(
reference: str,
hypothesis: str,
tokenizer: Callable[[str], List[str]] = None
) -> LevenshteinResult:
"""
Compute Levenshtein distance at the character or token level.

:param reference: Ground truth string.
:param hypothesis: Predicted string.
:param tokenizer: Optional function to split input into tokens. If None, character-level is used.
"""
clean_ref = clean_text(reference)
clean_hyp = clean_text(hypothesis)

if tokenizer:
ref_seq = tokenizer(clean_ref)
hyp_seq = tokenizer(clean_hyp)
else:
ref_seq = list(clean_ref)
hyp_seq = list(clean_hyp)

alignment = edit_distance_align(ref_seq, hyp_seq)

aligned_ref = []
aligned_hyp = []
for i, j in alignment:
aligned_ref.append(ref_seq[i] if i < len(ref_seq) else '*')
aligned_hyp.append(hyp_seq[j] if j < len(hyp_seq) else '*')


insertions = deletions = substitutions = 0
for r, h in zip(aligned_ref, aligned_hyp):
if r == h:
continue
elif r == '*':
insertions += 1
elif h == '*':
deletions += 1
else:
substitutions += 1

total_distance = insertions + deletions + substitutions
return LevenshteinResult(
distance=total_distance,
insertions=insertions,
deletions=deletions,
substitutions=substitutions,
reference_length=len(ref_seq)
)
Empty file.
1 change: 1 addition & 0 deletions src/core/metrics/language/bleu/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .bleu import BLEUMetric
69 changes: 69 additions & 0 deletions src/core/metrics/language/bleu/bleu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
from typing import List, Optional, Callable


class BLEUMetric:
def __init__(
self,
smoothing_method: Optional[int] = None,
tokenizer: Optional[Callable[[str], List[str]]] = None
):
"""
:param smoothing_method: Smoothing method number (1-7) from nltk's SmoothingFunction.
None means no smoothing.
:param tokenizer: A callable that tokenizes a string into a list of tokens. Defaults to str.split.
"""
if isinstance(smoothing_method, int):
self.smoothing_function = getattr(SmoothingFunction(), f"method{smoothing_method}")
elif callable(smoothing_method):
self.smoothing_function = smoothing_method
else:
self.smoothing_function = None
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): No validation for smoothing_method range.

Invalid values for smoothing_method will cause AttributeError. Please validate that smoothing_method is within the allowed range (1-7) and handle errors appropriately.

Suggested change
if isinstance(smoothing_method, int):
self.smoothing_function = getattr(SmoothingFunction(), f"method{smoothing_method}")
elif callable(smoothing_method):
self.smoothing_function = smoothing_method
else:
self.smoothing_function = None
if isinstance(smoothing_method, int):
if 1 <= smoothing_method <= 7:
self.smoothing_function = getattr(SmoothingFunction(), f"method{smoothing_method}")
else:
raise ValueError(
f"smoothing_method must be an integer between 1 and 7 (inclusive), got {smoothing_method}."
)
elif callable(smoothing_method):
self.smoothing_function = smoothing_method
else:
self.smoothing_function = None


self.tokenizer = tokenizer or (lambda x: x.split())


@staticmethod
def create_uniform_weights(max_ngram: int) -> List[float]:
"""
Create uniform weights for BLEU-N scoring.
"""
if not isinstance(max_ngram, int):
max_ngram = int(max_ngram)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary casts to int, str, float or bool (remove-unnecessary-cast)

Suggested change
max_ngram = int(max_ngram)
max_ngram = max_ngram

return [1.0 / max_ngram] * max_ngram
Comment on lines +31 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Implicit conversion of max_ngram may mask errors.

Instead of silently casting, raise an error or warning when max_ngram is not an integer to prevent hidden bugs.

Suggested change
if not isinstance(max_ngram, int):
max_ngram = int(max_ngram)
return [1.0 / max_ngram] * max_ngram
if not isinstance(max_ngram, int):
raise TypeError(f"max_ngram must be an integer, got {type(max_ngram).__name__}")
return [1.0 / max_ngram] * max_ngram


def calculate(self, references: List[str], hypothesis: str, max_ngram: int = 4, weights: Optional[List[float]] = None) -> float:
"""
Calculate sentence-level BLEU score.
"""
if weights is None:
weights = self.create_uniform_weights(max_ngram)

tokenized_refs = [self.tokenizer(ref) for ref in references]
tokenized_hyp = self.tokenizer(hypothesis)

return sentence_bleu(
tokenized_refs,
tokenized_hyp,
weights=weights,
smoothing_function=self.smoothing_function
)

def calculate_corpus(self, references: List[List[str]], hypotheses: List[str], max_ngram: int = 4, weights: Optional[List[float]] = None) -> float:
"""Calculate corpus-level BLEU score.

:param references: List of lists of reference strings. One list of references per hypothesis.
:param hypotheses: List of hypothesis strings.
"""
if weights is None:
weights = self.create_uniform_weights(max_ngram)

tokenized_refs = [[self.tokenizer(ref) for ref in ref_group] for ref_group in references]
tokenized_hyps = [self.tokenizer(hyp) for hyp in hypotheses]

return corpus_bleu(
tokenized_refs,
tokenized_hyps,
weights=weights,
smoothing_function=self.smoothing_function
)
Empty file.
10 changes: 10 additions & 0 deletions src/core/metrics/language/error_rates/base_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass

@dataclass
class ErrorRateResult:
value: float
insertions: int
deletions: int
substitutions: int
correct: int
reference_length: int
16 changes: 16 additions & 0 deletions src/core/metrics/language/error_rates/match_error_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Callable, Optional
from src.core.metrics.language.levenshtein import Levenshtein
from .base_result import ErrorRateResult

class MatchErrorRate:
def __init__(self, tokenizer: Optional[Callable] = None):
self.tokenizer = tokenizer or (lambda x: x.split())

def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult:
counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer)
S, D, I = counters.substitutions, counters.deletions, counters.insertions
N = counters.reference_length
C = N - S - D
denom = S + I + D + C
value = (S + I) / denom if denom else 1.0
return ErrorRateResult(value, I, D, S, C, N)
14 changes: 14 additions & 0 deletions src/core/metrics/language/error_rates/word_error_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import Callable, Optional
from src.core.metrics.language.levenshtein import Levenshtein
from .base_result import ErrorRateResult

class WordErrorRate:
def __init__(self, tokenizer: Optional[Callable] = None):
self.tokenizer = tokenizer or (lambda x: x.split())

def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult:
counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer)
S, D, I = counters.substitutions, counters.deletions, counters.insertions
N = counters.reference_length
value = (S + D + I) / N if N else 1.0
return ErrorRateResult(value, I, D, S, 0, N)
13 changes: 13 additions & 0 deletions src/core/metrics/language/error_rates/word_information_lost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Callable, Optional
from src.core.metrics.language.levenshtein import Levenshtein
from .word_information_preserved import WordInformationPreserved
from .base_result import ErrorRateResult

class WordInformationLost:
def __init__(self, tokenizer: Optional[Callable] = None):
self.tokenizer = tokenizer or (lambda x: x.split())

def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult:
wip = WordInformationPreserved(self.tokenizer).calculate(reference, hypothesis).value
value = 1.0 - wip
return ErrorRateResult(value, 0, 0, 0, 0, 0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Callable, Optional
from src.core.metrics.language.levenshtein import Levenshtein
from .base_result import ErrorRateResult

class WordInformationPreserved:
def __init__(self, tokenizer: Optional[Callable] = None):
self.tokenizer = tokenizer or (lambda x: x.split())

def calculate(self, reference: str, hypothesis: str) -> ErrorRateResult:
tokens_ref = self.tokenizer(reference)
tokens_hyp = self.tokenizer(hypothesis)

counters = Levenshtein.compute_with_counter(reference, hypothesis, tokenizer=self.tokenizer)
S, D, I = counters.substitutions, counters.deletions, counters.insertions
H = counters.reference_length - S - D # Correct words = reference length - (S + D)

N_ref = len(tokens_ref)
N_hyp = len(tokens_hyp)

# Approximate WIP: (H / N_ref) * (H / N_hyp)
wip = 0.0
if N_ref > 0 and N_hyp > 0:
wip = (H / N_ref) * (H / N_hyp)

Comment on lines +20 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): We've found these issues:

Suggested change
# Approximate WIP: (H / N_ref) * (H / N_hyp)
wip = 0.0
if N_ref > 0 and N_hyp > 0:
wip = (H / N_ref) * (H / N_hyp)
wip = (H / N_ref) * (H / N_hyp) if N_ref > 0 and N_hyp > 0 else 0.0

return ErrorRateResult(wip, I, D, S, H, N_ref)
72 changes: 72 additions & 0 deletions src/core/metrics/language/match/fuzzymatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import List, Optional, Callable
from src.core.metrics.language.bleu.bleu import BLEUMetric as BLEU
from src.core.metrics.language.error_rates.word_error_rate import WordErrorRate
from difflib import SequenceMatcher


class FuzzyMatch:
def __init__(self, tokenizer: Optional[Callable[[str], List[str]]] = None):
"""
:param tokenizer: A callable that tokenizes a string into a list of tokens. Defaults to str.split.
"""
self.tokenizer = tokenizer or (lambda x: x.split())

def calculate(self, reference: str, input_str: str) -> bool:
"""
Compare two strings for exact equality.
"""
return reference == input_str

def calculate_wer(self, reference: str, input_str: str, threshold: float) -> bool:
"""
Return True if the WER between reference and input is less than the threshold.
"""
wer_metric = WordErrorRate(tokenizer=self.tokenizer)
wer = wer_metric.calculate(reference, input_str).value
print(f"[calculate_wer] reference: {reference}\ninput_str: {input_str}\nWER: {wer}\nthreshold: {threshold}")
return wer < threshold

def calculate_bleu(self, references: List[str], input_str: str, threshold: float) -> bool:
"""
Return True if the BLEU score is above the threshold.
"""
bleu_metric = BLEU(tokenizer=self.tokenizer)
score = bleu_metric.calculate(references, input_str)
return score > threshold

def calculate_bleu_ngram(self, references: List[str], input_str: str, threshold: float, max_ngram: int) -> bool:
"""
BLEU score with custom max n-gram.
"""
bleu_metric = BLEU(tokenizer=self.tokenizer)
score = bleu_metric.calculate(references, input_str, max_ngram=max_ngram)
return score > threshold

def calculate_bleu_ngram_weights(
self,
references: List[str],
input_str: str,
threshold: float,
max_ngram: int,
weights: List[float]
) -> bool:
"""
BLEU score with custom max n-gram and weights.
"""
bleu_metric = BLEU(tokenizer=self.tokenizer)
score = bleu_metric.calculate(references, input_str, max_ngram=max_ngram, weights=weights)
return score > threshold


def calculate_similarity(self, reference: str, input_str: str) -> float:
"""
Return a similarity score between 0 and 1 using difflib's SequenceMatcher.
"""
return SequenceMatcher(None, reference, input_str).ratio()


def is_similar(self, reference: str, input_str: str, threshold: float = 0.85) -> bool:
"""
Return True if the SequenceMatcher similarity exceeds a threshold.
"""
return self.calculate_similarity(reference, input_str) >= threshold
1 change: 1 addition & 0 deletions src/core/metrics/language/rogue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .rogue import ROUGEMetric
Loading
Loading