diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py new file mode 100644 index 000000000..fd9c825ce --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -0,0 +1,15 @@ +"""Text chunking strategies for handling long texts.""" + +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk +from presidio_analyzer.chunkers.character_based_text_chunker import ( + CharacterBasedTextChunker, +) +from presidio_analyzer.chunkers.text_chunker_provider import TextChunkerProvider + +__all__ = [ + "BaseTextChunker", + "TextChunk", + "CharacterBasedTextChunker", + "TextChunkerProvider", +] + diff --git a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py new file mode 100644 index 000000000..5547b8788 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py @@ -0,0 +1,146 @@ +"""Abstract base class for text chunking strategies.""" +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING, Callable, List + +if TYPE_CHECKING: + from presidio_analyzer import RecognizerResult + + +@dataclass +class TextChunk: + """Represents a chunk of text with its position in the original text. + + :param text: The chunk content + :param start: Start position in the original text (inclusive) + :param end: End position in the original text (exclusive) + """ + + text: str + start: int + end: int + + +class BaseTextChunker(ABC): + """Abstract base class for text chunking strategies. + + Subclasses must implement the chunk() method to split text into + TextChunk objects that include both content and position information. + + Provides methods for processing predictions across chunks and + deduplicating overlapping entities. + """ + + @abstractmethod + def chunk(self, text: str) -> List[TextChunk]: + """Split text into chunks with position information. + + :param text: The input text to split + :return: List of TextChunk objects with text and position data + """ + pass + + def predict_with_chunking( + self, + text: str, + predict_func: Callable[[str], List["RecognizerResult"]], + ) -> List["RecognizerResult"]: + """Process text with automatic chunking for long texts. + + For short text, calls predict_func directly. + For long text, chunks it and merges predictions with deduplication. + + :param text: Input text to process + :param predict_func: Function that takes text and returns + RecognizerResult objects + :return: List of RecognizerResult with correct offsets + """ + chunks = self.chunk(text) + if not chunks: + return [] + if len(chunks) == 1: + return predict_func(text) + + predictions = self._process_chunks(chunks, predict_func) + return self.deduplicate_overlapping_entities(predictions) + + def _process_chunks( + self, + chunks: List[TextChunk], + process_func: Callable[[str], List["RecognizerResult"]], + ) -> List["RecognizerResult"]: + """Process text chunks and adjust entity offsets. + + :param chunks: List of TextChunk objects with text and position information + :param process_func: Function that takes chunk text and returns + RecognizerResult objects + :return: List of RecognizerResult with adjusted offsets + """ + from presidio_analyzer import RecognizerResult + + all_predictions = [] + + for chunk in chunks: + chunk_predictions = process_func(chunk.text) + + # Create new RecognizerResult objects with adjusted offsets + # to avoid mutating the original predictions + for pred in chunk_predictions: + adjusted_pred = RecognizerResult( + entity_type=pred.entity_type, + start=pred.start + chunk.start, + end=pred.end + chunk.start, + score=pred.score, + analysis_explanation=pred.analysis_explanation, + recognition_metadata=pred.recognition_metadata, + ) + all_predictions.append(adjusted_pred) + + return all_predictions + + def deduplicate_overlapping_entities( + self, + predictions: List["RecognizerResult"], + overlap_threshold: float = 0.5, + ) -> List["RecognizerResult"]: + """Remove duplicate entities from overlapping chunks. + + :param predictions: List of RecognizerResult objects + :param overlap_threshold: Overlap ratio threshold to consider duplicates + (default: 0.5) + :return: Deduplicated list of RecognizerResult sorted by position + """ + if not predictions: + return predictions + + # Sort by score descending to keep highest scoring entities + sorted_preds = sorted(predictions, key=lambda p: p.score, reverse=True) + unique = [] + + for pred in sorted_preds: + is_duplicate = False + for kept in unique: + # Check if same entity type and overlapping positions + if pred.entity_type == kept.entity_type: + overlap_start = max(pred.start, kept.start) + overlap_end = min(pred.end, kept.end) + + if overlap_start < overlap_end: + # Calculate overlap ratio + overlap_len = overlap_end - overlap_start + pred_len = pred.end - pred.start + kept_len = kept.end - kept.start + + if pred_len <= 0 or kept_len <= 0: + continue + + # Check if overlap exceeds threshold + if overlap_len / min(pred_len, kept_len) > overlap_threshold: + is_duplicate = True + break + + if not is_duplicate: + unique.append(pred) + + # Sort by position for consistent output + return sorted(unique, key=lambda p: p.start) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py new file mode 100644 index 000000000..52a739089 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -0,0 +1,123 @@ +"""Character-based text chunker with word boundary preservation. + +Based on gliner-spacy implementation: +https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 +""" +import logging +from typing import Iterable, List, Tuple + +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk + +logger = logging.getLogger("presidio-analyzer") + + +WORD_BOUNDARY_CHARS: Tuple[str, ...] = (" ", "\n") + + +class CharacterBasedTextChunker(BaseTextChunker): + """Character-based text chunker with word boundary preservation.""" + + def __init__( + self, + chunk_size: int = 250, + chunk_overlap: int = 50, + boundary_chars: Iterable[str] | None = None, + ): + """Initialize the character-based text chunker. + + Note: Chunks may slightly exceed chunk_size to preserve complete words. + When this occurs, the actual overlap may vary from the specified value. + + :param chunk_size: Target maximum characters per chunk (must be > 0) + :param chunk_overlap: Target characters to overlap between chunks + (must be >= 0 and < chunk_size) + :param boundary_chars: Characters that count as word boundaries. + Defaults to space/newline to keep current behavior. + """ + if chunk_size <= 0: + logger.error("Invalid chunk_size: %d. Must be greater than 0.", chunk_size) + raise ValueError("chunk_size must be greater than 0") + if chunk_overlap < 0 or chunk_overlap >= chunk_size: + logger.error( + "Invalid chunk_overlap. Must be non-negative and less than chunk_size" + ) + raise ValueError( + "chunk_overlap must be non-negative and less than chunk_size" + ) + + self._chunk_size = chunk_size + self._chunk_overlap = chunk_overlap + # Allow callers to tune boundaries + # (e.g., punctuation, tabs) without changing defaults. + self._boundary_chars: Tuple[str, ...] = ( + tuple(boundary_chars) if boundary_chars is not None else WORD_BOUNDARY_CHARS + ) + + @property + def chunk_size(self) -> int: + """Get the chunk size. + + :return: The chunk size + """ + return self._chunk_size + + @property + def chunk_overlap(self) -> int: + """Get the chunk overlap. + + :return: The chunk overlap + """ + return self._chunk_overlap + + @property + def boundary_chars(self) -> Tuple[str, ...]: + """Characters treated as word boundaries when extending chunks.""" + + return self._boundary_chars + + def chunk(self, text: str) -> List[TextChunk]: + """Split text into overlapping chunks at word boundaries. + + Chunks are extended to the nearest word boundary (space or newline) + to avoid splitting words. This means chunks may slightly exceed + chunk_size. For texts without spaces (e.g., CJK languages), chunks + may extend to end of text. + + :param text: The input text to chunk + :return: List of TextChunk objects with text and position information + """ + if not text: + logger.debug("Empty text provided, returning empty chunk list") + return [] + + logger.debug( + "Chunking text: length=%d, chunk_size=%d, overlap=%d", + len(text), + self._chunk_size, + self._chunk_overlap, + ) + + chunks = [] + start = 0 + + while start < len(text): + # Calculate end position + end = ( + start + self._chunk_size + if start + self._chunk_size < len(text) + else len(text) + ) + + # Extend to complete word boundary (space or newline by default) + while end < len(text) and text[end] not in self._boundary_chars: + end += 1 + + chunks.append(TextChunk(text=text[start:end], start=start, end=end)) + + # Move start position with overlap (stop if we've covered all text) + if end >= len(text): + break + start = end - self._chunk_overlap + + logger.debug("Created %d chunks from text", len(chunks)) + return chunks diff --git a/presidio-analyzer/presidio_analyzer/chunkers/text_chunker_provider.py b/presidio-analyzer/presidio_analyzer/chunkers/text_chunker_provider.py new file mode 100644 index 000000000..cb668265b --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/text_chunker_provider.py @@ -0,0 +1,60 @@ +"""Factory provider for creating text chunkers from configuration.""" + +import logging +from typing import Any, Dict, Optional, Type + +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +from presidio_analyzer.chunkers.character_based_text_chunker import ( + CharacterBasedTextChunker, +) + +logger = logging.getLogger("presidio-analyzer") + +# Registry mapping chunker type names to classes +_CHUNKER_REGISTRY: Dict[str, Type[BaseTextChunker]] = { + "character": CharacterBasedTextChunker, +} + + +class TextChunkerProvider: + """Create text chunkers from configuration. + + :param chunker_configuration: Dict with chunker_type and optional params. + Example:: + + {"chunker_type": "character", "chunk_size": 300, "chunk_overlap": 75} + + If no configuration provided, uses character-based chunker with default params + tuned for boundary coverage (chunk_size=250, chunk_overlap=50). + """ + + def __init__( + self, + chunker_configuration: Optional[Dict[str, Any]] = None, + ): + # Default to a safe overlap to avoid boundary losses for cross-chunk entities. + self.chunker_configuration = chunker_configuration or { + "chunker_type": "character", + "chunk_size": 250, + "chunk_overlap": 50, + } + + def create_chunker(self) -> BaseTextChunker: + """Create a text chunker instance from configuration.""" + config = self.chunker_configuration.copy() + chunker_type = config.pop("chunker_type", "character") + + if chunker_type not in _CHUNKER_REGISTRY: + raise ValueError( + f"Unknown chunker_type '{chunker_type}'. " + f"Available: {list(_CHUNKER_REGISTRY.keys())}" + ) + + chunker_class = _CHUNKER_REGISTRY[chunker_type] + try: + return chunker_class(**config) + except TypeError as exc: + raise ValueError( + f"Invalid configuration for chunker_type '{chunker_type}': {config}" + ) from exc + diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index de83e1173..4e451666c 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -7,6 +7,7 @@ LocalRecognizer, RecognizerResult, ) +from presidio_analyzer.chunkers import BaseTextChunker from presidio_analyzer.nlp_engine import ( NerModelConfiguration, NlpArtifacts, @@ -19,7 +20,6 @@ GLiNER = None GLiNERConfig = None - logger = logging.getLogger("presidio-analyzer") @@ -39,6 +39,7 @@ def __init__( multi_label: bool = False, threshold: float = 0.30, map_location: Optional[str] = None, + text_chunker: Optional[BaseTextChunker] = None, ): """GLiNER model based entity recognizer. @@ -58,7 +59,10 @@ def __init__( :param threshold: The threshold for the model's output (see GLiNER's documentation) :param map_location: The device to use for the model. - If None, will auto-detect GPU or use CPU. + If None, will auto-detect GPU or use CPU. + :param text_chunker: Custom text chunking strategy. If None, uses + CharacterBasedTextChunker with default settings (chunk_size=250, + chunk_overlap=50) """ @@ -98,6 +102,17 @@ def __init__( self.multi_label = multi_label self.threshold = threshold + # Use provided chunker or default to in-house character-based chunker + if text_chunker is not None: + self.text_chunker = text_chunker + else: + from presidio_analyzer.chunkers import CharacterBasedTextChunker + + self.text_chunker = CharacterBasedTextChunker( + chunk_size=250, + chunk_overlap=50, + ) + self.gliner = None super().__init__( @@ -135,42 +150,55 @@ def analyze( # combine the input labels as this model allows for ad-hoc labels labels = self.__create_input_labels(entities) - predictions = self.gliner.predict_entities( - text=text, - labels=labels, - flat_ner=self.flat_ner, - threshold=self.threshold, - multi_label=self.multi_label, - ) - recognizer_results = [] - for prediction in predictions: - presidio_entity = self.model_to_presidio_entity_mapping.get( - prediction["label"], prediction["label"] + # Process text with automatic chunking + def predict_func(text: str) -> List[RecognizerResult]: + # Get predictions from GLiNER (returns dicts) + gliner_predictions = self.gliner.predict_entities( + text=text, + labels=labels, + flat_ner=self.flat_ner, + threshold=self.threshold, + multi_label=self.multi_label, ) - if entities and presidio_entity not in entities: - continue - analysis_explanation = AnalysisExplanation( - recognizer=self.name, - original_score=prediction["score"], - textual_explanation=f"Identified as {presidio_entity} by GLiNER", - ) + # Convert dicts to RecognizerResult objects + results = [] + for pred in gliner_predictions: + presidio_entity = self.model_to_presidio_entity_mapping.get( + pred["label"], pred["label"] + ) + + # Filter by requested entities + if entities and presidio_entity not in entities: + continue - recognizer_results.append( - RecognizerResult( - entity_type=presidio_entity, - start=prediction["start"], - end=prediction["end"], - score=prediction["score"], - analysis_explanation=analysis_explanation, + analysis_explanation = AnalysisExplanation( + recognizer=self.name, + original_score=pred["score"], + textual_explanation=f"Identified as {presidio_entity} by GLiNER", ) - ) - return recognizer_results + results.append( + RecognizerResult( + entity_type=presidio_entity, + start=pred["start"], + end=pred["end"], + score=pred["score"], + analysis_explanation=analysis_explanation, + ) + ) + return results + + predictions = self.text_chunker.predict_with_chunking( + text=text, + predict_func=predict_func, + ) + + return predictions def __create_input_labels(self, entities): """Append the entities requested by the user to the list of labels if it's not there.""" # noqa: E501 - labels = self.gliner_labels + labels = list(self.gliner_labels) for entity in entities: if ( entity not in self.model_to_presidio_entity_mapping.values() diff --git a/presidio-analyzer/tests/test_base_chunker.py b/presidio-analyzer/tests/test_base_chunker.py new file mode 100644 index 000000000..189fff113 --- /dev/null +++ b/presidio-analyzer/tests/test_base_chunker.py @@ -0,0 +1,125 @@ +"""Tests for BaseTextChunker methods.""" +import pytest + +from presidio_analyzer import RecognizerResult +from presidio_analyzer.chunkers import CharacterBasedTextChunker + + +class TestPredictWithChunking: + """Test predict_with_chunking orchestration.""" + + def test_short_text_not_chunked(self): + """Short text bypasses chunking.""" + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) + predict_func = lambda t: [ + RecognizerResult(entity_type="PERSON", start=0, end=5, score=0.9) + ] + + result = chunker.predict_with_chunking("Short text", predict_func) + + assert len(result) == 1 + assert result[0].start == 0 + + def test_long_text_offsets_adjusted(self): + """Entity offsets are adjusted to original text positions.""" + chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) + text = "John Smith lives in New York City with Jane Doe" + + def predict_func(chunk): + if "Jane" in chunk: + idx = chunk.index("Jane") + return [ + RecognizerResult(entity_type="PERSON", start=idx, end=idx + 4, score=0.9) + ] + return [] + + result = chunker.predict_with_chunking(text, predict_func) + + # Jane appears at position 39 in original text + assert len(result) == 1 + assert result[0].start == text.index("Jane") + + +class TestDeduplicateOverlappingEntities: + """Test deduplication of overlapping entities from chunk boundaries.""" + + def test_exact_duplicates_keeps_highest_score(self): + """Same entity from overlapping chunks keeps higher score.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.7), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 1 + assert result[0].score == 0.9 + + def test_overlapping_same_type_deduplicated(self): + """Overlapping entities of same type are deduplicated.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=3, end=13, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 1 + + def test_different_types_not_deduplicated(self): + """Overlapping entities of different types are kept.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="LOCATION", start=5, end=15, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 2 + + def test_results_sorted_by_position(self): + """Results are sorted by start position.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert result[0].start == 0 + assert result[1].start == 20 + + def test_zero_length_span_does_not_raise(self): + """Zero-length spans should not cause ZeroDivisionError.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=5, end=5, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.8), + ] + + # Should not raise ZeroDivisionError + result = chunker.deduplicate_overlapping_entities(predictions) + assert len(result) == 2 + + +class TestPredictWithChunkingEdgeCases: + """Test edge cases in predict_with_chunking.""" + + def test_empty_text_returns_empty_without_calling_predict(self): + """Empty text should return [] without invoking predict_func.""" + chunker = CharacterBasedTextChunker(chunk_size=100) + call_count = 0 + + def predict_func(t): + nonlocal call_count + call_count += 1 + return [] + + result = chunker.predict_with_chunking("", predict_func) + + assert result == [] + assert call_count == 0, "predict_func should not be called for empty text" diff --git a/presidio-analyzer/tests/test_character_based_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py new file mode 100644 index 000000000..5b0012eb5 --- /dev/null +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -0,0 +1,148 @@ +"""Tests for CharacterBasedTextChunker.""" + +import pytest + +from presidio_analyzer.chunkers import CharacterBasedTextChunker, TextChunk + + +class TestCharacterBasedTextChunkerInit: + """Tests for CharacterBasedTextChunker initialization.""" + + def test_default_values(self): + """Test default initialization values.""" + chunker = CharacterBasedTextChunker() + assert chunker.chunk_size == 250 + assert chunker.chunk_overlap == 50 + + def test_custom_boundary_chars(self): + """Test custom boundary characters.""" + chunker = CharacterBasedTextChunker(boundary_chars=[" ", "\n", "\t"]) + assert chunker.boundary_chars == (" ", "\n", "\t") + + def test_invalid_chunk_size_raises_error(self): + """Test that invalid chunk_size raises ValueError.""" + with pytest.raises(ValueError, match="chunk_size must be greater than 0"): + CharacterBasedTextChunker(chunk_size=0) + with pytest.raises(ValueError, match="chunk_size must be greater than 0"): + CharacterBasedTextChunker(chunk_size=-10) + + def test_invalid_chunk_overlap_raises_error(self): + """Test that invalid chunk_overlap raises ValueError.""" + with pytest.raises(ValueError, match="chunk_overlap must be non-negative"): + CharacterBasedTextChunker(chunk_size=100, chunk_overlap=-5) + + with pytest.raises(ValueError, match="chunk_overlap must be non-negative"): + CharacterBasedTextChunker(chunk_size=100, chunk_overlap=100) + + +class TestCharacterBasedTextChunkerChunk: + """Tests for CharacterBasedTextChunker.chunk() method.""" + + def test_empty_text_returns_empty_list(self): + """Test chunking empty text returns empty list.""" + chunker = CharacterBasedTextChunker(chunk_size=50, chunk_overlap=10) + assert chunker.chunk("") == [] + + def test_short_text_returns_single_chunk(self): + """Test text shorter than chunk_size returns single chunk.""" + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=10) + text = "Hello world" + chunks = chunker.chunk(text) + + assert len(chunks) == 1 + assert isinstance(chunks[0], TextChunk) + assert chunks[0].text == text + assert chunks[0].start == 0 + assert chunks[0].end == len(text) + + def test_chunks_extend_to_word_boundary(self): + """Test that chunks extend to word boundaries (space/newline).""" + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) + text = "Hello world foo bar" + chunks = chunker.chunk(text) + + # Verify chunks don't cut words in the middle + for chunk in chunks: + # Each chunk text should not start/end mid-word (except first/last) + assert text[chunk.start:chunk.end] == chunk.text + + def test_offset_calculation_is_correct(self): + """Test that chunk offsets map correctly to original text.""" + chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) + text = "This is a test string for chunking purposes" + chunks = chunker.chunk(text) + + # Critical: offsets must point to correct positions + for chunk in chunks: + assert text[chunk.start:chunk.end] == chunk.text + + +class TestCharacterBasedTextChunkerEdgeCases: + """Edge case tests for CharacterBasedTextChunker.""" + + def test_whitespace_only_text(self): + """Test chunking whitespace-only text.""" + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) + text = " " + chunks = chunker.chunk(text) + + assert len(chunks) == 1 + assert chunks[0].text == text + + def test_newline_boundary(self): + """Test that newlines are treated as word boundaries.""" + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) + text = "Hello\nworld foo" + chunks = chunker.chunk(text) + + # "Hello\nworld" is 11 chars, extends past chunk_size=10 until space at position 11 + # The chunk stops AT the boundary (space), not including it + assert chunks[0].text == "Hello\nworld" + assert chunks[0].end == 11 # Position of space + assert text[chunks[0].start:chunks[0].end] == chunks[0].text + + def test_text_without_spaces_cjk(self): + """Test chunking CJK text without spaces extends to end.""" + chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=1) + text = "这是中文文本" # Chinese: 6 chars, no spaces + chunks = chunker.chunk(text) + + # Without word boundaries, should extend to end + assert len(chunks) == 1 + assert chunks[0].text == text + + def test_very_long_word_extends_to_boundary(self): + """Test words longer than chunk_size extend to next boundary.""" + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) + text = "supercalifragilisticexpialidocious end" + chunks = chunker.chunk(text) + + # Long word should extend until space is found + assert len(chunks) >= 1 + assert "supercalifragilisticexpialidocious" in chunks[0].text + + +class TestCharacterBasedTextChunkerIntegration: + """Integration tests for CharacterBasedTextChunker.""" + + def test_long_text_produces_multiple_chunks(self): + """Test chunking longer text produces multiple chunks with correct offsets.""" + chunker = CharacterBasedTextChunker(chunk_size=50, chunk_overlap=10) + text = "John Smith works at Microsoft. Jane Doe lives in Seattle. Bob Johnson studies at MIT." + + chunks = chunker.chunk(text) + + assert len(chunks) > 1 + # Verify all offsets are correct + for chunk in chunks: + assert text[chunk.start:chunk.end] == chunk.text + + def test_overlap_captures_entity_at_boundary(self): + """Test that overlap prevents missing entities at chunk boundaries.""" + # This is the core purpose of overlap + chunker = CharacterBasedTextChunker(chunk_size=25, chunk_overlap=10) + text = "Some prefix text. John Smith is here. Some suffix." + chunks = chunker.chunk(text) + + # "John Smith" should appear complete in at least one chunk + assert any("John Smith" in chunk.text for chunk in chunks) diff --git a/presidio-analyzer/tests/test_gliner_recognizer.py b/presidio-analyzer/tests/test_gliner_recognizer.py index 5fcea527b..920fb482b 100644 --- a/presidio-analyzer/tests/test_gliner_recognizer.py +++ b/presidio-analyzer/tests/test_gliner_recognizer.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch from presidio_analyzer.predefined_recognizers import GLiNERRecognizer +from presidio_analyzer.chunkers import CharacterBasedTextChunker @pytest.fixture @@ -87,6 +88,8 @@ def test_analyze_with_unsupported_entity(mock_gliner): supported_entities=entities, ) + gliner_recognizer.gliner = mock_gliner + results = gliner_recognizer.analyze(text, entities) # Should filter out unsupported entities @@ -106,6 +109,8 @@ def test_analyze_with_entity_mapping(mock_gliner): entity_mapping=entity_mapping, ) + gliner_recognizer.gliner = mock_gliner + results = gliner_recognizer.analyze(text, ["ORG"]) # Check mapping from 'organization' to 'ORG' @@ -132,3 +137,120 @@ def test_analyze_with_no_entities(mock_gliner): # Should return no results assert len(results) == 0 + + +def test_gliner_handles_long_text_with_chunking(mock_gliner): + """Test that GLiNER chunks long text and adjusts entity offsets correctly.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + text = "John Smith lives here. " + ("x " * 120) + "Jane Doe works there." + + # Mock returns entities with positions relative to each chunk + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + entities = [] + if "John Smith" in text: + start = text.find("John Smith") + entities.append({"label": "person", "start": start, "end": start + 10, "score": 0.95}) + if "Jane Doe" in text: + start = text.find("Jane Doe") + entities.append({"label": "person", "start": start, "end": start + 8, "score": 0.93}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify chunking occurred (predict_entities called multiple times) + assert mock_gliner.predict_entities.call_count == 2, f"Expected 2 chunks, got {mock_gliner.predict_entities.call_count}" + + # Verify exactly 2 entities were detected + assert len(results) == 2, f"Expected 2 entities, found {len(results)}" + + # Verify both entities have correct offsets in original text + assert text[results[0].start:results[0].end] == "John Smith" + assert results[0].entity_type == "PERSON" + assert results[0].score == 0.95 + + assert text[results[1].start:results[1].end] == "Jane Doe" + assert results[1].entity_type == "PERSON" + assert results[1].score == 0.93 + + +def test_gliner_detects_entity_split_across_chunk_boundary(mock_gliner): + """Test that overlap catches entities split at chunk boundaries.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + # Entity "Amanda Williams" will be split: "Amanda" at end of chunk 1, "Williams" at start of chunk 2 + # With 50-char overlap, both parts should be in the overlapping region + text = ("x " * 100) + "Amanda Williams" + (" x" * 100) + + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + entities = [] + if "Amanda Williams" in text: + start = text.find("Amanda Williams") + entities.append({"label": "person", "start": start, "end": start + 15, "score": 0.92}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify entity at boundary was detected + assert len(results) == 1, f"Expected 1 entity, found {len(results)}" + assert text[results[0].start:results[0].end] == "Amanda Williams" + assert results[0].entity_type == "PERSON" + + +def test_gliner_deduplicates_entities_in_overlap_region(mock_gliner): + """Test that duplicate entities from overlapping chunks are removed.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + # Create text where entity appears in overlap region of both chunks + text = ("x " * 95) + "Dr. Smith" + (" x" * 100) + + call_count = 0 + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + nonlocal call_count + call_count += 1 + entities = [] + if "Dr. Smith" in text: + start = text.find("Dr. Smith") + # Return slightly different scores to test that highest is kept + score = 0.95 if call_count == 1 else 0.90 + entities.append({"label": "person", "start": start, "end": start + 9, "score": score}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify: Called multiple times due to overlap + assert mock_gliner.predict_entities.call_count >= 2, "Should process multiple chunks" + + # Verify: Only 1 result after deduplication (not 2) + assert len(results) == 1, f"Expected 1 deduplicated entity, found {len(results)}" + + # Verify: Kept the one with highest score (0.95 from first chunk) + assert results[0].score == 0.95 + assert text[results[0].start:results[0].end] == "Dr. Smith" diff --git a/presidio-analyzer/tests/test_text_chunker_provider.py b/presidio-analyzer/tests/test_text_chunker_provider.py new file mode 100644 index 000000000..fcb1b6464 --- /dev/null +++ b/presidio-analyzer/tests/test_text_chunker_provider.py @@ -0,0 +1,48 @@ +"""Tests for TextChunkerProvider factory pattern.""" + +import pytest + +from presidio_analyzer.chunkers import ( + TextChunkerProvider, + CharacterBasedTextChunker, +) + + +class TestTextChunkerProvider: + """Test TextChunkerProvider.""" + + def test_default_creates_character_chunker(self): + """Default provider creates CharacterBasedTextChunker.""" + provider = TextChunkerProvider() + chunker = provider.create_chunker() + assert isinstance(chunker, CharacterBasedTextChunker) + + def test_custom_params_passed_to_chunker(self): + """Custom parameters are passed to chunker.""" + provider = TextChunkerProvider(chunker_configuration={ + "chunker_type": "character", + "chunk_size": 500, + "chunk_overlap": 100, + }) + chunker = provider.create_chunker() + assert chunker._chunk_size == 500 + assert chunker._chunk_overlap == 100 + + def test_unknown_chunker_type_raises_error(self): + """Unknown chunker_type raises ValueError.""" + provider = TextChunkerProvider(chunker_configuration={ + "chunker_type": "unknown" + }) + with pytest.raises(ValueError, match="Unknown chunker_type"): + provider.create_chunker() + + def test_character_chunker_type(self): + """Provider creates CharacterBasedTextChunker when type is 'character'.""" + provider = TextChunkerProvider(chunker_configuration={ + "chunker_type": "character", + "chunk_size": 300, + }) + chunker = provider.create_chunker() + assert isinstance(chunker, CharacterBasedTextChunker) + assert chunker.chunk_size == 300 +