|
1 | 1 | import pytest |
| 2 | +from pathlib import Path |
| 3 | +from unittest.mock import patch |
2 | 4 | from talkpipe.data.extraction import ( |
3 | | - ReadFile, readtxt, readdocx, readcsv, readjsonl, listFiles, |
4 | | - ExtractorRegistry, extract_text, extract_docx, extract_csv, extract_jsonl, skip_file, get_default_registry, |
| 5 | + ReadFile, readtxt, readdocx, readpdf, readcsv, readjsonl, listFiles, |
| 6 | + ExtractorRegistry, extract_text, extract_docx, extract_pdf, extract_csv, extract_jsonl, skip_file, get_default_registry, |
5 | 7 | global_extractor_registry, ExtractionResult |
6 | 8 | ) |
7 | 9 |
|
@@ -600,6 +602,113 @@ def test_readjsonl(tmp_path): |
600 | 602 | assert "products.jsonl:2" in results[1].title |
601 | 603 |
|
602 | 604 |
|
| 605 | +def _create_pdf_with_text(path, text: str = "Hello PDF") -> None: |
| 606 | + """Create a minimal PDF file with the given text content.""" |
| 607 | + content = f"""BT |
| 608 | +/F1 12 Tf |
| 609 | +100 700 Td |
| 610 | +({text}) Tj |
| 611 | +ET |
| 612 | +""".encode() |
| 613 | + obj1 = b"1 0 obj\n<< /Type /Catalog /Pages 2 0 R >>\nendobj\n" |
| 614 | + obj2 = b"2 0 obj\n<< /Type /Pages /Kids [3 0 R] /Count 1 >>\nendobj\n" |
| 615 | + obj3 = b"3 0 obj\n<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Contents 4 0 R /Resources 5 0 R >>\nendobj\n" |
| 616 | + obj4 = ( |
| 617 | + b"4 0 obj\n<< /Length " + str(len(content)).encode("ascii") + b" >>\nstream\n" |
| 618 | + + content + b"\nendstream\nendobj\n" |
| 619 | + ) |
| 620 | + obj5 = b"5 0 obj\n<< /Font << /F1 6 0 R >> >>\nendobj\n" |
| 621 | + obj6 = b"6 0 obj\n<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>\nendobj\n" |
| 622 | + body = obj1 + obj2 + obj3 + obj4 + obj5 + obj6 |
| 623 | + startxref = 9 + len(body) |
| 624 | + offsets = [9] |
| 625 | + for obj in [obj1, obj2, obj3, obj4, obj5]: |
| 626 | + offsets.append(offsets[-1] + len(obj)) |
| 627 | + xref = b"xref\n0 7\n0000000000 65535 f \n" |
| 628 | + for i in range(1, 7): |
| 629 | + xref += f"{offsets[i - 1]:010d} 00000 n \n".encode() |
| 630 | + trailer = f"trailer\n<< /Size 7 /Root 1 0 R >>\nstartxref\n{startxref}\n%%EOF\n".encode() |
| 631 | + Path(path).write_bytes(b"%PDF-1.4\n" + body + xref + trailer) |
| 632 | + |
| 633 | + |
| 634 | +def test_extract_pdf_requires_pypdf(tmp_path): |
| 635 | + """Test that extract_pdf raises helpful ImportError when pypdf is not installed.""" |
| 636 | + pdf_path = tmp_path / "test.pdf" |
| 637 | + pdf_path.write_bytes(b"%PDF-1.4 minimal\n") |
| 638 | + |
| 639 | + import builtins |
| 640 | + real_import = builtins.__import__ |
| 641 | + |
| 642 | + def mock_import(name, *args, **kwargs): |
| 643 | + if name == "pypdf": |
| 644 | + raise ImportError("No module named 'pypdf'") |
| 645 | + return real_import(name, *args, **kwargs) |
| 646 | + |
| 647 | + with patch.object(builtins, "__import__", side_effect=mock_import): |
| 648 | + with pytest.raises(ImportError) as exc_info: |
| 649 | + list(extract_pdf(pdf_path)) |
| 650 | + assert "pypdf" in str(exc_info.value) |
| 651 | + assert "pip install talkpipe[pypdf]" in str(exc_info.value) |
| 652 | + |
| 653 | + |
| 654 | +def test_extract_pdf_file_not_found(): |
| 655 | + """Test extract_pdf raises FileNotFoundError for missing file.""" |
| 656 | + pytest.importorskip("pypdf") |
| 657 | + with pytest.raises(FileNotFoundError, match="Path does not exist"): |
| 658 | + list(extract_pdf("/nonexistent/path.pdf")) |
| 659 | + |
| 660 | + |
| 661 | +def test_extract_pdf_with_pypdf(tmp_path): |
| 662 | + """Test PDF extraction when pypdf is installed.""" |
| 663 | + pytest.importorskip("pypdf") |
| 664 | + |
| 665 | + pdf_path = tmp_path / "test.pdf" |
| 666 | + _create_pdf_with_text(pdf_path, "Hello PDF") |
| 667 | + |
| 668 | + results = list(extract_pdf(pdf_path)) |
| 669 | + assert len(results) == 1 |
| 670 | + assert isinstance(results[0], ExtractionResult) |
| 671 | + assert "test.pdf" in results[0].source |
| 672 | + assert results[0].id == results[0].source |
| 673 | + assert results[0].title == "test.pdf" |
| 674 | + assert "Hello PDF" in results[0].content |
| 675 | + |
| 676 | + |
| 677 | +def test_readpdf_segment(tmp_path): |
| 678 | + """Test readpdf segment when pypdf is installed.""" |
| 679 | + pytest.importorskip("pypdf") |
| 680 | + |
| 681 | + pdf_path = tmp_path / "segment_test.pdf" |
| 682 | + _create_pdf_with_text(pdf_path, "Segment test content") |
| 683 | + |
| 684 | + results = list(readpdf()([str(pdf_path)])) |
| 685 | + assert len(results) == 1 |
| 686 | + assert isinstance(results[0], ExtractionResult) |
| 687 | + assert "segment_test.pdf" in results[0].source |
| 688 | + assert "Segment test content" in results[0].content |
| 689 | + |
| 690 | + |
| 691 | +def test_pdf_in_default_registry(tmp_path): |
| 692 | + """Test that PDF extractor is registered in default registry.""" |
| 693 | + registry = get_default_registry() |
| 694 | + assert "pdf" in registry.registered_extensions |
| 695 | + |
| 696 | + |
| 697 | +def test_ReadFile_with_pdf(tmp_path): |
| 698 | + """Test ReadFile extracts PDF when pypdf is installed.""" |
| 699 | + pytest.importorskip("pypdf") |
| 700 | + |
| 701 | + pdf_path = tmp_path / "readfile_test.pdf" |
| 702 | + _create_pdf_with_text(pdf_path, "ReadFile PDF content") |
| 703 | + |
| 704 | + fe = ReadFile() |
| 705 | + results = list(fe([str(pdf_path)])) |
| 706 | + assert len(results) == 1 |
| 707 | + assert isinstance(results[0], ExtractionResult) |
| 708 | + assert "readfile_test.pdf" in results[0].source |
| 709 | + assert "ReadFile PDF content" in results[0].content |
| 710 | + |
| 711 | + |
603 | 712 | def test_jsonl_in_default_registry(tmp_path): |
604 | 713 | """Test that JSONL extractor is registered in default registry.""" |
605 | 714 | registry = get_default_registry() |
|
0 commit comments