diff --git a/app/pybind_parse.cpp b/app/pybind_parse.cpp index ffcba63e..031ddc1d 100644 --- a/app/pybind_parse.cpp +++ b/app/pybind_parse.cpp @@ -252,6 +252,7 @@ PYBIND11_MODULE(pdf_parsers, m) { }, pybind11::arg("key"), pybind11::arg("filename"), + pybind11::call_guard(), R"( Load a document by key and filename. @@ -268,6 +269,7 @@ PYBIND11_MODULE(pdf_parsers, m) { }, pybind11::arg("key"), pybind11::arg("bytes_io"), + pybind11::call_guard(), R"( Load a document by key from a BytesIO-like object. @@ -310,6 +312,7 @@ PYBIND11_MODULE(pdf_parsers, m) { return self.get_annotations(key); }, pybind11::arg("key"), + pybind11::call_guard(), R"( Retrieve annotations for the document identified by its unique key and return them as JSON. @@ -324,6 +327,7 @@ PYBIND11_MODULE(pdf_parsers, m) { return self.get_table_of_contents(key); }, pybind11::arg("key"), + pybind11::call_guard(), R"( Retrieve the table of contents for the document identified by its unique key and return it as JSON. @@ -338,6 +342,7 @@ PYBIND11_MODULE(pdf_parsers, m) { return self.get_meta_xml(key); }, pybind11::arg("key"), + pybind11::call_guard(), R"( Retrieve the meta data in string or None. @@ -357,6 +362,7 @@ PYBIND11_MODULE(pdf_parsers, m) { pybind11::arg("key"), pybind11::arg("page_boundary") = "crop_box", // media_box pybind11::arg("do_sanitization") = true, // media_box + pybind11::call_guard(), R"( Parse the PDF document identified by its unique key and return a JSON representation. @@ -380,6 +386,7 @@ PYBIND11_MODULE(pdf_parsers, m) { pybind11::arg("page"), pybind11::arg("page_boundary") = "crop_box", // media_box pybind11::arg("do_sanitization") = true, // media_box + pybind11::call_guard(), R"( Parse a specific page of the PDF document identified by its unique key and return a JSON representation. @@ -417,6 +424,7 @@ PYBIND11_MODULE(pdf_parsers, m) { pybind11::arg("enforce_same_font")=true, pybind11::arg("space_width_factor_for_merge")=1.5, pybind11::arg("space_width_factor_for_merge_with_space")=0.33, + pybind11::call_guard(), R"( Sanitize table cells with specified parameters and return the processed JSON. @@ -457,6 +465,7 @@ Sanitize table cells with specified parameters and return the processed JSON. pybind11::arg("enforce_same_font")=true, pybind11::arg("space_width_factor_for_merge")=1.5, pybind11::arg("space_width_factor_for_merge_with_space")=0.33, + pybind11::call_guard(), R"( Sanitize table cells in a given bounding box with specified parameters and return the processed JSON. diff --git a/docling_parse/pdf_parser.py b/docling_parse/pdf_parser.py index 2f119194..dac1a524 100644 --- a/docling_parse/pdf_parser.py +++ b/docling_parse/pdf_parser.py @@ -1,9 +1,10 @@ """Parser for PDF files""" +import asyncio import hashlib from io import BytesIO from pathlib import Path -from typing import Dict, Iterator, List, Optional, Tuple, Union +from typing import AsyncIterator, Dict, Iterator, List, Optional, Tuple, Union from docling_core.types.doc.base import BoundingBox, CoordOrigin from docling_core.types.doc.page import ( @@ -34,6 +35,12 @@ def iterate_pages( for page_no in range(self.number_of_pages()): yield page_no + 1, self.get_page(page_no + 1) + async def iterate_pages_async( + self, + ) -> AsyncIterator[Tuple[int, SegmentedPdfPage]]: + for page_no in range(self.number_of_pages()): + yield page_no + 1, await self.get_page_async(page_no + 1) + def __init__( self, parser: "pdf_parser_v2", @@ -149,6 +156,36 @@ def get_page( return SegmentedPdfPage() + async def get_page_async( + self, page_no: int, create_words: bool = True, create_textlines: bool = True + ) -> SegmentedPdfPage: + if page_no in self._pages.keys(): + return self._pages[page_no] + else: + if 1 <= page_no <= self.number_of_pages(): + doc_dict = await asyncio.to_thread( + self._parser.parse_pdf_from_key_on_page, + key=self._key, + page=page_no - 1, + page_boundary=self._boundary_type.value, # Convert enum to string + do_sanitization=False, + ) + + for pi, page in enumerate( + doc_dict["pages"] + ): # only one page is expected + + self._pages[page_no] = self._to_segmented_page( + page=page["original"], + create_words=create_words, + create_textlines=create_textlines, + ) # put on cache + return self._pages[page_no] + + raise ValueError( + f"incorrect page_no: {page_no} for key={self._key} (min:1, max:{self.number_of_pages()})" + ) + def load_all_pages(self, create_words: bool = True, create_lines: bool = True): doc_dict = self._parser.parse_pdf_from_key( key=self._key, page_boundary=self._boundary_type, do_sanitization=False @@ -163,6 +200,24 @@ def load_all_pages(self, create_words: bool = True, create_lines: bool = True): create_textlines=create_lines, ) # put on cache + async def load_all_pages_async( + self, create_words: bool = True, create_lines: bool = True + ): + doc_dict = await asyncio.to_thread( + self._parser.parse_pdf_from_key, + key=self._key, + page_boundary=self._boundary_type.value, # Convert enum to string + do_sanitization=False, + ) + + for pi, page in enumerate(doc_dict["pages"]): + # will need to be changed once we remove the original/sanitized from C++ + self._pages[pi + 1] = self._to_segmented_page( + page["original"], + create_words=create_words, + create_textlines=create_lines, + ) # put on cache + def _to_page_geometry(self, dimension: dict) -> PdfPageGeometry: boundary_type: PdfPageBoundaryType = PdfPageBoundaryType( @@ -515,6 +570,46 @@ def load( else: raise RuntimeError(f"Failed to load document with key {key}") + async def load_async( + self, + path_or_stream: Union[str, Path, BytesIO], + lazy: bool = True, + boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX, + ) -> PdfDocument: + + if isinstance(path_or_stream, str): + path_or_stream = Path(path_or_stream) + + if isinstance(path_or_stream, Path): + key = f"key={str(path_or_stream)}" # use filepath as internal handle + success = await asyncio.to_thread( + self._load_document, key=key, filename=str(path_or_stream) + ) + + elif isinstance(path_or_stream, BytesIO): + hasher = hashlib.sha256(usedforsecurity=False) + + while chunk := path_or_stream.read(8192): + hasher.update(chunk) + path_or_stream.seek(0) + hash = hasher.hexdigest() + + key = f"key={hash}" # use md5 hash as internal handle + success = await asyncio.to_thread( + self._load_document_from_bytesio, key=key, data=path_or_stream + ) + + if success: + result_doc = PdfDocument( + parser=self.parser, key=key, boundary_type=boundary_type + ) + if not lazy: # eagerly parse the pages at init time if desired + await result_doc.load_all_pages_async() + + return result_doc + else: + raise RuntimeError(f"Failed to load document with key {key}") + def _load_document(self, key: str, filename: str) -> bool: """Load a document by key and filename. diff --git a/docling_parse/processing_dir.py b/docling_parse/processing_dir.py index e6037985..d14d55ff 100644 --- a/docling_parse/processing_dir.py +++ b/docling_parse/processing_dir.py @@ -1,15 +1,20 @@ import argparse +import asyncio import glob import hashlib import logging import os +import time from dataclasses import dataclass from pathlib import Path from queue import Queue +from docling_core.types.doc.page import PdfPageBoundaryType from tabulate import tabulate -from docling_parse import pdf_parser_v2 # type: ignore[attr-defined] +from docling_parse.pdf_parser import DoclingPdfParser, PdfDocument + +# from docling_parse import pdf_parser_v2 # type: ignore[attr-defined] # Configure logging logging.basicConfig( @@ -69,6 +74,14 @@ def fetch_files_from_disk(directory, recursive, task_queue): """Recursively fetch files from disk and add them to the queue.""" logging.info(f"Fetching file keys from disk: {directory}") + if os.path.exists(directory) and os.path.isfile(directory): + # Create a FileTask object + hash_object = hashlib.sha256(directory.encode(), usedforsecurity=False) + file_hash = hash_object.hexdigest() + + task = FileTask(folder_name=directory, file_name=directory, file_hash=file_hash) + task_queue.put(task) + for filename in sorted(glob.glob(os.path.join(directory, "*.pdf"))): file_name = str(Path(filename).resolve()) @@ -84,8 +97,11 @@ def fetch_files_from_disk(directory, recursive, task_queue): logging.info("Done with queue") -def process_files_from_queue(file_queue: Queue, page_level: bool, loglevel: str): - """Process files from the queue.""" +async def async_process_files_from_queue( + file_queue: Queue, page_level: bool, loglevel: str, sync: bool +): + + parser = DoclingPdfParser(loglevel=loglevel) overview = [] @@ -95,74 +111,107 @@ def process_files_from_queue(file_queue: Queue, page_level: bool, loglevel: str) if task is None: # End of queue signal break - logging.info( - f"Queue-size [{file_queue.qsize()}], Processing task: {task.file_name}" - ) + # logging.info( + print(f"Queue-size [{file_queue.qsize()}], Processing task (sync: {sync}): {task.file_name}") try: - parser = pdf_parser_v2(loglevel) - - parser.load_document(task.file_hash, str(task.file_name)) - - num_pages = parser.number_of_pages(task.file_hash) - logging.info(f" => #-pages of {task.file_name}: {num_pages}") - - overview.append([str(task.file_name), num_pages, -1, True]) - - if page_level: - # Parse page by page to minimize memory footprint - for page in range(0, num_pages): - fname = f"{task.file_name}-page-{page:03}.json" - - try: - json_doc = parser.parse_pdf_from_key_on_page( - task.file_hash, page - ) + start_time = time.time() + + # Load document asynchronously + pdf_doc: PdfDocument = await parser.load_async( + path_or_stream=task.file_name, + lazy=True, + boundary_type=PdfPageBoundaryType.CROP_BOX, + ) + assert pdf_doc is not None + + num_pages = pdf_doc.number_of_pages() + + if sync: + # Load pages sequentially using async + pages = [] + for page_no in range(1, pdf_doc.number_of_pages() + 1): + page = await pdf_doc.get_page_async( + page_no=page_no, create_words=True, create_textlines=True + ) + pages.append(page) - """ - with open(os.path.join(directory, fname), "w") as fw: - fw.write(json.dumps(json_doc, indent=2)) - """ - - overview.append([fname, num_pages, page, True]) - except Exception as exc: - overview.append([fname, num_pages, page, False]) - logging.error( - f"problem with parsing {task.file_name} on page {page}: {exc}" - ) else: - - parser.parse_pdf_from_key(task.file_hash) + # Load all pages in parallel using asyncio.gather + page_numbers = list(range(1, pdf_doc.number_of_pages() + 1)) + + # Create tasks for parallel page loading + page_tasks = [ + pdf_doc.get_page_async( + page_no=page_no, create_words=True, create_textlines=True + ) + for page_no in page_numbers + ] + print(f"number of tasks: {len(page_tasks)}") + + # Execute all page loading tasks in parallel + # pages = await asyncio.gather(*page_tasks) + # pages = await asyncio.gather(page_tasks[0:4]) + + STEP = 1 + for i in range(0, len(page_tasks), STEP): + print(i) + sublist = page_tasks[i : i + STEP] + pages = await asyncio.gather(*sublist) """ - # with open(os.path.join(task.folder_name, f"{task.file_name}.json"), "w") as fw: - with open(f"{task.file_name}.json", "w") as fw: - fw.write(json.dumps(json_doc, indent=2)) + for page_task in page_tasks: + print(page_task) + await page_task """ - overview.append([str(task.file_name), num_pages, -1, True]) + end_time = time.time() - # Unload the (QPDF) document and buffers - parser.unload_document(task.file_hash) + elapsed_time = end_time - start_time + print(f"Elapsed time on tasks: {elapsed_time:.2f} seconds") + + overview.append( + [os.path.basename(str(task.file_name)), num_pages, elapsed_time, True] + ) except Exception as exc: logging.error(exc) - overview.append([str(task.file_name), -1, -1, False]) + overview.append([os.path.basename(str(task.file_name)), -1, -1, False]) return overview +def process_files_from_queue( + file_queue: Queue, page_level: bool, loglevel: str, sync: bool +): + return asyncio.run( + async_process_files_from_queue(file_queue, page_level, loglevel, sync=sync) + ) + + def main(): directory, recursive, loglevel, page_level_parsing = parse_arguments() + """ task_queue = Queue() - fetch_files_from_disk(directory, recursive, task_queue) - overview = process_files_from_queue(task_queue, page_level_parsing, loglevel) + overview_sync = process_files_from_queue(task_queue, page_level_parsing, loglevel, sync=True) + print(tabulate(overview_sync, headers=["filename", "#-pages", "total-time", "success"])) + """ - print(tabulate(overview, headers=["filename", "success", "page-number", "#-pages"])) + task_queue = Queue() + fetch_files_from_disk(directory, recursive, task_queue) + + overview_async = process_files_from_queue( + task_queue, page_level_parsing, loglevel, sync=False + ) + print( + tabulate( + overview_async, headers=["filename", "#-pages", "total-time", "success"] + ) + ) logging.info("All files processed successfully.") diff --git a/pyproject.toml b/pyproject.toml index 42af92e1..b1b3c2be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,8 @@ dev = [ "tqdm>=4.67.0,<5.0.0", "boto>=2.49.0,<3.0.0", "boto3>=1.35.67,<2.0.0", - "autoflake>=2.3.1,<3.0.0" + "autoflake>=2.3.1,<3.0.0", + "pytest-asyncio>=0.23.8", ] [tool.uv] package = true diff --git a/src/pybind/docling_parser_v2.h b/src/pybind/docling_parser_v2.h index dbb6d155..1a11a749 100644 --- a/src/pybind/docling_parser_v2.h +++ b/src/pybind/docling_parser_v2.h @@ -351,7 +351,7 @@ namespace docling std::string page_boundary, bool do_sanitization) { - LOG_S(INFO) << __FUNCTION__; + LOG_S(INFO) << __FUNCTION__ << " with key: " << key << ", page: " << page; auto itr = key2doc.find(key); if(itr==key2doc.end()) diff --git a/src/v1/proj_folders/pdf_library/qpdf/parser/font.h b/src/v1/proj_folders/pdf_library/qpdf/parser/font.h index fb52e2ef..1c23b597 100755 --- a/src/v1/proj_folders/pdf_library/qpdf/parser/font.h +++ b/src/v1/proj_folders/pdf_library/qpdf/parser/font.h @@ -913,7 +913,8 @@ namespace pdf_lib //std::cout << name << "\n"; - if(bbox.isInitialized()) + //if(bbox.isInitialized()) + if(bbox) // using the new explicit bool() operator { if(not fm.ascent) { diff --git a/src/v2.h b/src/v2.h index 8b21c0f9..010b18ae 100644 --- a/src/v2.h +++ b/src/v2.h @@ -11,6 +11,8 @@ #include #include #include +#include +#include #ifdef _WIN32 #include // to define _SH_DENYNO for loguru diff --git a/src/v2/pdf_decoders/document.h b/src/v2/pdf_decoders/document.h index d78570d4..2e21ae4e 100644 --- a/src/v2/pdf_decoders/document.h +++ b/src/v2/pdf_decoders/document.h @@ -4,7 +4,8 @@ #define PDF_DOCUMENT_DECODER_H #include -//#include +#include +#include namespace pdflib { @@ -36,12 +37,16 @@ namespace pdflib private: + void initialise_individual_pages(); + void update_qpdf_logger(); void update_timings(std::map& timings_, bool set_timer); - + private: + std::mutex mtx; + std::string filename; std::string buffer; // keep a local copy, in order to not let it expire @@ -51,7 +56,9 @@ namespace pdflib QPDFObjectHandle qpdf_root; QPDFObjectHandle qpdf_pages; - + + std::map individual_pages; + int number_of_pages; //nlohmann::json json_toc; // table-of-contents @@ -69,12 +76,15 @@ namespace pdflib // have compatibulity between QPDF v10 and v11 qpdf_root(), qpdf_pages(), + individual_pages(), number_of_pages(-1), json_annots(nlohmann::json::value_t::null), json_document(nlohmann::json::value_t::null) { + LOG_S(INFO) << "pdf_decoder constuctor"; + update_qpdf_logger(); } @@ -94,6 +104,8 @@ namespace pdflib json_annots(nlohmann::json::value_t::null), json_document(nlohmann::json::value_t::null) { + LOG_S(INFO) << "pdf_decoder constuctor"; + update_qpdf_logger(); } @@ -121,7 +133,7 @@ namespace pdflib nlohmann::json pdf_decoder::get() { - LOG_S(INFO) << "get() [in pdf_decoder]"; + LOG_S(INFO) << "get() in pdf_decoder"; { json_document["annotations"] = json_annots; @@ -148,6 +160,8 @@ namespace pdflib try { + std::lock_guard lock(mtx); + qpdf_document.processFile(filename.c_str()); LOG_S(INFO) << "filename: " << filename << " processed by qpdf!"; @@ -159,6 +173,8 @@ namespace pdflib number_of_pages = qpdf_pages.getKey("/Count").getIntValue(); LOG_S(INFO) << "#-pages: " << number_of_pages; + initialise_individual_pages(); + nlohmann::json& info = json_document["info"]; { info["filename"] = filename; @@ -199,6 +215,8 @@ namespace pdflib number_of_pages = qpdf_pages.getKey("/Count").getIntValue(); LOG_S(INFO) << "#-pages: " << number_of_pages; + initialise_individual_pages(); + nlohmann::json& info = json_document["info"]; { info["filename"] = filename; @@ -215,6 +233,22 @@ namespace pdflib return true; } + + // got inspiration from https://github.com/qpdf/qpdf/blob/main/examples/pdf-split-pages.cc + void pdf_decoder::initialise_individual_pages() + { + LOG_S(INFO) << "initialise individual pages"; + std::vector pages = QPDFPageDocumentHelper(qpdf_document).getAllPages(); + + for(int page_no=0; page_no page: " << page_no << " is initialised!"; + } + } void pdf_decoder::decode_document(std::string page_boundary, bool do_sanitization) @@ -248,7 +282,7 @@ namespace pdflib timings[__FUNCTION__] = timer.get_time(); } - + void pdf_decoder::decode_document(std::vector& page_numbers, std::string page_boundary, bool do_sanitization) @@ -259,31 +293,51 @@ namespace pdflib // make sure that we only return the page from the page-numbers nlohmann::json& json_pages = json_document["pages"]; json_pages = nlohmann::json::array({}); - - std::vector pages = qpdf_document.getAllPages(); - + for(int l=0; l pages = qpdf_document.getAllPages(); + //std::vector pages = {}; + //{ + //std::lock_guard lock(mtx); + //pages = qpdf_document.getAllPages(); + //} + bool set_timer=true; // make sure we override all timings for this page-set - for(auto page_number:page_numbers) + + for(int l=0; l page_decoder(pages.at(page_number)); + std::vector page_handles = individual_pages.at(page_number).getAllPages(); + assert(page_handles.size()==1); + + pdf_decoder page_decoder(page_handles.at(0)); auto timings_ = page_decoder.decode_page(page_boundary, do_sanitization); update_timings(timings_, set_timer); set_timer=false; - - json_pages.push_back(page_decoder.get()); - std::stringstream ss; - ss << "decoding page " << page_number; - - timings[ss.str()] = page_timer.get_time(); + { + std::lock_guard lock(mtx); + + std::stringstream ss; + ss << "decoding page: " << page_number; + + LOG_S(INFO) << ss.str(); + + json_pages[l] = page_decoder.get(); + timings[ss.str()] = page_timer.get_time(); + } } else { diff --git a/src/v2/pdf_resources/page_font/base_font.h b/src/v2/pdf_resources/page_font/base_font.h index 2d594d4e..5b5e617b 100644 --- a/src/v2/pdf_resources/page_font/base_font.h +++ b/src/v2/pdf_resources/page_font/base_font.h @@ -35,12 +35,12 @@ namespace pdflib std::array get_font_bbox(); - //private: - void initialise(); private: + std::mutex mtx; + std::string filename; font_glyphs& glyphs; @@ -220,13 +220,15 @@ namespace pdflib void base_font::initialise() { + std::lock_guard lock(mtx); + if(initialised) { return; } initialised = true; - LOG_S(WARNING) << "initialising base-font: " << filename; + LOG_S(WARNING) << "initialising base-font by reading file: " << filename; std::ifstream file(filename.c_str()); diff --git a/src/v2/pdf_resources/page_font/base_fonts.h b/src/v2/pdf_resources/page_font/base_fonts.h index 6395947a..7b9e82db 100644 --- a/src/v2/pdf_resources/page_font/base_fonts.h +++ b/src/v2/pdf_resources/page_font/base_fonts.h @@ -172,11 +172,9 @@ namespace pdflib LOG_S(INFO) << "standard-fonts: " << standard.size(); for(auto path:standard) { - LOG_S(INFO) << "\t file-name: " << path; - std::string fontname = read_fontname(path); - //LOG_S(INFO) << "\t font-name: " << fontname; - + LOG_S(INFO) << "\t registering font " << fontname << "at " << path; + base_font bf(path, glyphs); name_to_basefont.emplace(std::pair(fontname, bf)); @@ -193,7 +191,7 @@ namespace pdflib if(name_to_basefont.count(fontname)==0) { - LOG_S(INFO) << "\t reading font " << fontname << "at " << path; + LOG_S(INFO) << "\t registering font " << fontname << "at " << path; base_font bf(path, glyphs); name_to_basefont.emplace(std::pair(fontname, bf)); @@ -211,11 +209,10 @@ namespace pdflib for(auto path:tex) { std::string fontname = read_fontname(path); - //LOG_S(INFO) << "\t font-name: " << fontname; if(name_to_basefont.count(fontname)==0) { - LOG_S(INFO) << "\t reading font " << fontname << "at " << path; + LOG_S(INFO) << "\t registering font " << fontname << "at " << path; base_font bf(path, glyphs); name_to_basefont.emplace(std::pair(fontname, bf)); @@ -231,6 +228,8 @@ namespace pdflib std::string base_fonts::read_fontname(std::string filename) { + //LOG_S(INFO) << "\t reading font with filename " << filename; + std::string fontname = "unknown"; std::ifstream file(filename.c_str()); diff --git a/src/v2/pdf_resources/page_font/encoding.h b/src/v2/pdf_resources/page_font/encoding.h index 9db66894..6193e535 100644 --- a/src/v2/pdf_resources/page_font/encoding.h +++ b/src/v2/pdf_resources/page_font/encoding.h @@ -26,6 +26,8 @@ namespace pdflib private: + std::mutex mtx; + font_encoding_name name; std::map numb_to_name; @@ -48,6 +50,8 @@ namespace pdflib std::string file_name, glyphs_type& glyphs) { + std::lock_guard lock(mtx); + LOG_S(WARNING) << __FUNCTION__ << ": " << file_name; name = name_; diff --git a/src/v2/pdf_resources/page_font/encodings.h b/src/v2/pdf_resources/page_font/encodings.h index 661f5ce5..50dfe7ed 100644 --- a/src/v2/pdf_resources/page_font/encodings.h +++ b/src/v2/pdf_resources/page_font/encodings.h @@ -56,6 +56,8 @@ namespace pdflib for(auto item:items) { + LOG_S(INFO) << "reading the font-encodings " << item.first << " at " << item.second; + font_encoding& encoding = name_to_encoding[item.first]; encoding.initialise(item.first, dirname+"/"+item.second, glyphs); } diff --git a/src/v2/pdf_resources/page_font/font_cid.h b/src/v2/pdf_resources/page_font/font_cid.h index 4f37f5fb..41522329 100644 --- a/src/v2/pdf_resources/page_font/font_cid.h +++ b/src/v2/pdf_resources/page_font/font_cid.h @@ -38,6 +38,8 @@ namespace pdflib private: + std::mutex mtx; + std::map cmap2cid; std::map cid2utf8; @@ -272,6 +274,8 @@ namespace pdflib std::string cid2code, std::vector columns) { + std::lock_guard lock(mtx); + LOG_S(INFO) << __FUNCTION__; { @@ -308,6 +312,8 @@ namespace pdflib void font_cid::decode_widths(std::map& numb_to_widths) { + std::lock_guard lock(mtx); + LOG_S(INFO) << __FUNCTION__; std::map numb_to_widths_ = numb_to_widths; diff --git a/src/v2/pdf_resources/page_font/font_cids.h b/src/v2/pdf_resources/page_font/font_cids.h index 822ccc25..461485eb 100644 --- a/src/v2/pdf_resources/page_font/font_cids.h +++ b/src/v2/pdf_resources/page_font/font_cids.h @@ -122,7 +122,7 @@ namespace pdflib for(auto file:files) { std::string key = "/"+file; - LOG_S(INFO) << __FUNCTION__ << "\t" << file; + //LOG_S(INFO) << __FUNCTION__ << "\t" << file; cmap_2_columns[key] = (itr->second); cmap_2_columns[file] = (itr->second); @@ -132,6 +132,8 @@ namespace pdflib cmap_2_filename[key] = cdir+"/"+file; cmap_2_filename[file] = cdir+"/"+file; + + LOG_S(INFO) << "registering the font-cid " << key << " at " << file; } } @@ -153,6 +155,8 @@ namespace pdflib std::vector columns = cmap_2_columns[cmap_name]; std::string cid2code = cmap_2_cid2code[cmap_name]; std::string filename = cmap_2_filename[cmap_name]; + + LOG_S(INFO) << "reading the font-cid " << cmap_name << " at " << filename; font_cid& cid = cids[cmap_name]; cid.decode_cmap_resource(filename, cid2code, columns); diff --git a/src/v2/pdf_resources/page_font/glyphs.h b/src/v2/pdf_resources/page_font/glyphs.h index 33c81fb3..f17dd6c8 100644 --- a/src/v2/pdf_resources/page_font/glyphs.h +++ b/src/v2/pdf_resources/page_font/glyphs.h @@ -39,6 +39,8 @@ namespace pdflib private: + // std::mutex mtx; + bool initialized; std::set unknown_glyphs; @@ -96,16 +98,20 @@ namespace pdflib std::string font_glyphs::operator[](std::string key) { if(name_to_utf8.count(key)==1) - return name_to_utf8[key]; - + { + return name_to_utf8[key]; + } + LOG_S(ERROR) << "could not find a glyph with name=" << key; unknown_glyphs.insert(key); - return "glyph["+key+"]"; + return "GLYPH<"+key+">"; } void font_glyphs::initialise(std::string dirname) { + // std::lock_guard lock(mtx); + if(initialized) { LOG_S(WARNING) << "skipping font_glyphs::initialise, already initialized ..."; diff --git a/tests/data/cases/2206.01062.pdf b/tests/data/cases/2206.01062.pdf new file mode 100644 index 00000000..3d499d64 Binary files /dev/null and b/tests/data/cases/2206.01062.pdf differ diff --git a/tests/test_parse.py b/tests/test_parse.py index 84e17ae5..4a3e9686 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -1,4 +1,16 @@ #!/usr/bin/env python +""" +Test suite for PDF parsing functionality. + +This module includes tests for both synchronous and asynchronous PDF parsing. +The async tests serve two purposes: +1. Verify that the async interface works correctly for sequential operations +2. Demonstrate thread-safety issues in the C-backend when parallel page loading is attempted + +The parallel loading test is expected to fail due to lack of thread synchronization +in the underlying C++ implementation, which is the intended behavior to expose this issue. +""" +import asyncio import glob import os import re @@ -351,3 +363,186 @@ def test_serialize_and_reload(): reloaded_pages: Dict[int, SegmentedPdfPage] = page_adapter.validate_json(json_pages) assert reloaded_pages == pdf_doc._pages + + +async def test_async_parallel_page_loading(): + """Test async interface with parallel page loading to trigger C-backend thread parallelism. + + NOTE: This test is expected to crash due to thread-safety issues in the C-backend + when multiple pages are loaded in parallel. The goal is to expose this problem + and demonstrate the need for proper thread synchronization in the C++ implementation. + """ + filename = "tests/data/cases/2206.01062.pdf" + print(f"testing on {filename}") + + import time + + start_time = time.time() + + parser = DoclingPdfParser(loglevel="fatal") + + # Load document asynchronously + pdf_doc: PdfDocument = await parser.load_async( + path_or_stream=filename, lazy=True, boundary_type=PdfPageBoundaryType.CROP_BOX + ) + + assert pdf_doc is not None + assert pdf_doc.number_of_pages() == 9 + + print(f"Document loaded successfully with {pdf_doc.number_of_pages()} pages") + print( + "Attempting parallel page loading (expected to trigger thread-safety issues)..." + ) + + # Load all pages in parallel using asyncio.gather + page_numbers = list(range(1, pdf_doc.number_of_pages() + 1)) + + # Create tasks for parallel page loading + page_tasks = [ + pdf_doc.get_page_async( + page_no=page_no, create_words=True, create_textlines=True + ) + for page_no in page_numbers + ] + + print(f"Created {len(page_tasks)} parallel tasks for pages {page_numbers}") + print( + "Executing parallel page loading (this may crash due to C-backend thread-safety issues)..." + ) + + try: + start_time_tasks = time.time() + + # Execute all page loading tasks in parallel + pages = await asyncio.gather(*page_tasks) + + # If we reach here, the parallel loading succeeded (unexpected) + print( + "WARNING: Parallel loading succeeded - this may indicate thread-safety has been fixed" + ) + + end_time = time.time() + + elapsed_time = end_time - start_time_tasks + print(f"Elapsed time on tasks: {elapsed_time:.2f} seconds") + + elapsed_time = end_time - start_time + print(f"Elapsed total time: {elapsed_time:.2f} seconds") + + # Verify all pages were loaded correctly + assert len(pages) == 9 # MAX_TASKS + + for i, page in enumerate(pages): + assert isinstance(page, SegmentedPdfPage) + assert len(page.char_cells) > 0 # Should have some text content + + # Verify the page was cached in the document + cached_page = pdf_doc._pages[i + 1] + assert cached_page == page + + print("All pages loaded and verified successfully") + + # Test async iteration as well + async_pages = [] + async for page_no, page in pdf_doc.iterate_pages_async(): + async_pages.append((page_no, page)) + + assert len(async_pages) == 9 + + # Verify async iteration returns the same pages + for i, (page_no, page) in enumerate(async_pages): + assert page_no == i + 1 + assert page == pages[i] + + print("Async iteration test completed successfully") + + except Exception as e: + print(f"Parallel loading failed as expected: {type(e).__name__}: {e}") + print("This failure indicates thread-safety issues in the C-backend") + print( + "The C++ implementation needs proper synchronization for concurrent page parsing" + ) + # Re-raise to make the test fail + raise + + +def test_async_parallel_page_loading_sync_wrapper(): + """Synchronous wrapper for the async test to integrate with pytest. + + This test is expected to fail due to thread-safety issues in the C-backend + when multiple pages are parsed concurrently. The failure demonstrates the + need for proper thread synchronization in the underlying C++ implementation. + """ + try: + asyncio.run(test_async_parallel_page_loading()) + except Exception as e: + print(f"\nTest failed as expected: {type(e).__name__}: {e}") + print("This confirms thread-safety issues in the C-backend") + # Re-raise to make the test fail + raise + + +async def test_async_sequential_page_loading(): + """Test async interface with sequential page loading to verify async functionality works correctly.""" + filename = "tests/data/cases/2206.01062.pdf" + print(f"testing on {filename}") + + import time + + start_time = time.time() + + parser = DoclingPdfParser(loglevel="fatal") + + # Load document asynchronously + pdf_doc: PdfDocument = await parser.load_async( + path_or_stream=filename, lazy=True, boundary_type=PdfPageBoundaryType.CROP_BOX + ) + + assert pdf_doc is not None + assert pdf_doc.number_of_pages() == 9 + + start_time_tasks = time.time() + + # Load pages sequentially using async + pages = [] + for page_no in range(1, pdf_doc.number_of_pages() + 1): + page = await pdf_doc.get_page_async( + page_no=page_no, create_words=True, create_textlines=True + ) + pages.append(page) + + end_time = time.time() + + elapsed_time = end_time - start_time_tasks + print(f"Elapsed time on tasks: {elapsed_time:.2f} seconds") + + elapsed_time = end_time - start_time + print(f"Elapsed total time: {elapsed_time:.2f} seconds") + + # Verify all pages were loaded correctly + assert len(pages) == 9 + + for i, page in enumerate(pages): + assert isinstance(page, SegmentedPdfPage) + assert len(page.char_cells) > 0 # Should have some text content + + # Verify the page was cached in the document + cached_page = pdf_doc._pages[i + 1] + assert cached_page == page + + # Test async iteration + async_pages = [] + async for page_no, page in pdf_doc.iterate_pages_async(): + async_pages.append((page_no, page)) + + assert len(async_pages) == 9 + + # Verify async iteration returns the same pages + for i, (page_no, page) in enumerate(async_pages): + assert page_no == i + 1 + assert page == pages[i] + + +def test_async_sequential_page_loading_sync_wrapper(): + """Synchronous wrapper for the sequential async test.""" + asyncio.run(test_async_sequential_page_loading()) diff --git a/uv.lock b/uv.lock index f5d05b87..f644b6c0 100644 --- a/uv.lock +++ b/uv.lock @@ -515,7 +515,7 @@ wheels = [ [[package]] name = "docling-parse" -version = "4.0.5" +version = "4.1.0" source = { editable = "." } dependencies = [ { name = "docling-core" }, @@ -543,6 +543,7 @@ dev = [ { name = "mypy" }, { name = "pre-commit" }, { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "python-semantic-release" }, { name = "tqdm" }, ] @@ -574,6 +575,7 @@ dev = [ { name = "mypy", specifier = ">=1.13.0,<2.0.0" }, { name = "pre-commit", specifier = ">=3.7.1,<4.0.0" }, { name = "pytest", specifier = ">=7.4.2,<8.0.0" }, + { name = "pytest-asyncio", specifier = ">=0.23.8" }, { name = "python-semantic-release", specifier = ">=7.32.2,<8.0.0" }, { name = "tqdm", specifier = ">=4.67.0,<5.0.0" }, ] @@ -601,7 +603,7 @@ name = "exceptiongroup" version = "1.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.13'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } wheels = [ @@ -1688,6 +1690,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/51/ff/f6e8b8f39e08547faece4bd80f89d5a8de68a38b2d179cc1c4490ffa3286/pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8", size = 325287, upload-time = "2023-12-31T12:00:13.963Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "0.23.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/de/b4/0b378b7bf26a8ae161c3890c0b48a91a04106c5713ce81b4b080ea2f4f18/pytest_asyncio-0.23.8.tar.gz", hash = "sha256:759b10b33a6dc61cce40a8bd5205e302978bbbcc00e279a8b61d9a6a3c82e4d3", size = 46920, upload-time = "2024-07-17T17:39:34.617Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/82/62e2d63639ecb0fbe8a7ee59ef0bc69a4669ec50f6d3459f74ad4e4189a2/pytest_asyncio-0.23.8-py3-none-any.whl", hash = "sha256:50265d892689a5faefb84df80819d1ecef566eb3549cf915dfb33569359d1ce2", size = 17663, upload-time = "2024-07-17T17:39:32.478Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"