Skip to content

Commit dac3e65

Browse files
added first attempt at threaded pdf parsing
Signed-off-by: Peter Staar <taa@zurich.ibm.com>
1 parent 0ed688a commit dac3e65

File tree

9 files changed

+1378
-31
lines changed

9 files changed

+1378
-31
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,45 @@ for page_no, pred_page in pdf_doc.iterate_pages():
8686
img.show()
8787
```
8888

89+
### Parallel parsing (multi-threaded)
90+
91+
Parse pages from one or more PDFs in parallel using a thread pool with backpressure:
92+
93+
```python
94+
from docling_parse.pdf_parser import (
95+
DoclingThreadedPdfParser,
96+
ThreadedPdfParserConfig,
97+
)
98+
from docling_parse.pdf_parsers import DecodePageConfig # type: ignore[import]
99+
100+
parser_config = ThreadedPdfParserConfig(
101+
loglevel="fatal",
102+
threads=4, # worker threads
103+
max_concurrent_results=32 # cap buffered results to limit memory
104+
)
105+
decode_config = DecodePageConfig()
106+
107+
parser = DoclingThreadedPdfParser(
108+
parser_config=parser_config,
109+
decode_config=decode_config,
110+
)
111+
112+
# load one or more documents
113+
for source in ["doc_a.pdf", "doc_b.pdf"]:
114+
parser.load(source)
115+
116+
# consume decoded pages as they become available
117+
while parser.has_tasks():
118+
task = parser.get_task()
119+
120+
if task.success:
121+
page_decoder, timings = task.get()
122+
print(f"{task.doc_key} p{task.page_number}: "
123+
f"{len(list(page_decoder.get_word_cells()))} words")
124+
else:
125+
print(f"error on {task.doc_key} p{task.page_number}: {task.error()}")
126+
```
127+
89128
Use the CLI
90129

91130
```sh

app/pybind_parse.cpp

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <pybind/utils/pybind11_json.h>
99

1010
#include <pybind/docling_parser.h>
11+
#include <pybind/docling_threaded_parser.h>
1112

1213
// Include parse headers for typed bindings
1314
#include <parse.h>
@@ -533,4 +534,135 @@ PYBIND11_MODULE(pdf_parsers, m) {
533534
534535
Returns:
535536
PdfPageDecoder: A typed page decoder object.)");
537+
538+
// ============= Threaded PDF Parser =============
539+
540+
// PageDecodeResult - result of a threaded page decode task
541+
pybind11::class_<docling::page_decode_result>(m, "PageDecodeResult",
542+
R"(
543+
Result of a threaded page decoding task.
544+
545+
Attributes:
546+
doc_key (str): The document key this page belongs to.
547+
page_number (int): The page number (0-indexed).
548+
success (bool): Whether the decoding succeeded.
549+
)")
550+
.def_readonly("doc_key", &docling::page_decode_result::doc_key)
551+
.def_readonly("page_number", &docling::page_decode_result::page_number)
552+
.def_readonly("success", &docling::page_decode_result::success)
553+
.def("get", [](docling::page_decode_result& self)
554+
-> std::pair<std::shared_ptr<pdflib::pdf_decoder<pdflib::PAGE>>,
555+
std::unordered_map<std::string, double>> {
556+
if(!self.success)
557+
{
558+
throw std::runtime_error("Cannot get result from failed task: " + self.error_message);
559+
}
560+
auto timings_map = self.page_decoder->get_timings().to_sum_map();
561+
return std::make_pair(self.page_decoder, timings_map);
562+
},
563+
R"(
564+
Get the page decoder and timing information.
565+
566+
Returns:
567+
Tuple[PdfPageDecoder, Dict[str, float]]: The page decoder and timing data.
568+
569+
Raises:
570+
RuntimeError: If the task was not successful.)")
571+
.def("error", [](docling::page_decode_result& self) -> std::string {
572+
return self.error_message;
573+
},
574+
R"(
575+
Get the error message if the task failed.
576+
577+
Returns:
578+
str: The error message.)");
579+
580+
// threaded_pdf_parser - parallel PDF parser with bounded result queue
581+
pybind11::class_<docling::docling_threaded_parser>(m, "threaded_pdf_parser",
582+
R"(
583+
Threaded PDF parser that processes pages in parallel.
584+
585+
Loads multiple documents and decodes their pages using a thread pool.
586+
Results are available via a bounded queue to control memory usage.
587+
)")
588+
.def(pybind11::init<const std::string&, int, int, pdflib::decode_page_config>(),
589+
pybind11::arg("loglevel") = "fatal",
590+
pybind11::arg("num_threads") = 4,
591+
pybind11::arg("max_concurrent_results") = 32,
592+
pybind11::arg("config") = pdflib::decode_page_config(),
593+
R"(
594+
Construct a threaded PDF parser.
595+
596+
Parameters:
597+
loglevel (str): Logging level ('fatal', 'error', 'warning', 'info').
598+
num_threads (int): Number of worker threads.
599+
max_concurrent_results (int): Maximum results buffered before workers pause.
600+
config (DecodePageConfig): Configuration for page decoding.)")
601+
602+
.def("load_document",
603+
[](docling::docling_threaded_parser& self,
604+
const std::string& key,
605+
const std::string& filename,
606+
std::optional<std::string>& password) -> bool {
607+
return self.load_document(key, filename, password);
608+
},
609+
pybind11::arg("key"),
610+
pybind11::arg("filename"),
611+
pybind11::arg("password") = pybind11::none(),
612+
R"(
613+
Load a document by key and filename.
614+
615+
Parameters:
616+
key (str): The unique key to identify the document.
617+
filename (str): The path to the document file to load.
618+
password (str, optional): Optional password for password-protected files.
619+
620+
Returns:
621+
bool: True if the document was successfully loaded.)")
622+
623+
.def("load_document_from_bytesio",
624+
[](docling::docling_threaded_parser& self,
625+
const std::string& key,
626+
pybind11::object bytes_io,
627+
std::optional<std::string>& password) -> bool {
628+
return self.load_document_from_bytesio(key, bytes_io, password);
629+
},
630+
pybind11::arg("key"),
631+
pybind11::arg("bytes_io"),
632+
pybind11::arg("password") = pybind11::none(),
633+
R"(
634+
Load a document from a BytesIO-like object.
635+
636+
Parameters:
637+
key (str): The unique key to identify the document.
638+
bytes_io (Any): A BytesIO-like object containing the document data.
639+
password (str, optional): Optional password for password-protected files.
640+
641+
Returns:
642+
bool: True if the document was successfully loaded.)")
643+
644+
.def("has_tasks",
645+
[](docling::docling_threaded_parser& self) -> bool {
646+
return self.has_tasks();
647+
},
648+
R"(
649+
Check if there are remaining tasks to consume.
650+
651+
On first call, builds the task queue from all loaded documents and starts worker threads.
652+
653+
Returns:
654+
bool: True if there are remaining results to consume.)")
655+
656+
.def("get_task",
657+
[](docling::docling_threaded_parser& self) -> docling::page_decode_result {
658+
pybind11::gil_scoped_release release;
659+
return self.get_task();
660+
},
661+
R"(
662+
Get the next completed page decode result.
663+
664+
Blocks until a result is available. Releases the GIL while waiting.
665+
666+
Returns:
667+
PageDecodeResult: The result of a page decoding task.)");
536668
}

docling_parse/pdf_parser.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
from pydantic import BaseModel, ConfigDict
3030

3131
from docling_parse.pdf_parsers import DecodePageConfig # type: ignore[import]
32+
from docling_parse.pdf_parsers import PageDecodeResult # type: ignore[import]
3233
from docling_parse.pdf_parsers import pdf_parser # type: ignore[import]
34+
from docling_parse.pdf_parsers import threaded_pdf_parser # type: ignore[import]
3335
from docling_parse.pdf_parsers import ( # type: ignore[import]
3436
TIMING_KEY_CREATE_LINE_CELLS,
3537
TIMING_KEY_CREATE_WORD_CELLS,
@@ -854,3 +856,123 @@ def _load_document_from_bytesio(self, key: str, data: BytesIO) -> bool:
854856
bool: True if the document was successfully loaded, False otherwise.)")
855857
"""
856858
return self.parser.load_document_from_bytesio(key=key, bytes_io=data)
859+
860+
861+
class ThreadedPdfParserConfig(BaseModel):
862+
"""Configuration for the threaded PDF parser.
863+
864+
Attributes:
865+
loglevel: Logging level ('fatal', 'error', 'warning', 'info').
866+
threads: Number of worker threads for parallel page decoding.
867+
max_concurrent_results: Maximum results buffered before workers pause.
868+
"""
869+
870+
model_config = ConfigDict(arbitrary_types_allowed=True)
871+
872+
loglevel: str = "fatal"
873+
threads: int = 4
874+
max_concurrent_results: int = 32
875+
876+
877+
class DoclingThreadedPdfParser:
878+
"""Threaded PDF parser that decodes pages from multiple documents in parallel.
879+
880+
Usage::
881+
882+
parser_config = ThreadedPdfParserConfig(loglevel="fatal", threads=4, max_concurrent_results=32)
883+
decode_config = DecodePageConfig()
884+
885+
parser = DoclingThreadedPdfParser(parser_config=parser_config, decode_config=decode_config)
886+
887+
for source in sources:
888+
parser.load(source)
889+
890+
while parser.has_tasks():
891+
task = parser.get_task()
892+
893+
if task.success:
894+
page_decoder, timings = task.get()
895+
else:
896+
error_msg = task.error()
897+
"""
898+
899+
def __init__(
900+
self,
901+
parser_config: Optional[ThreadedPdfParserConfig] = None,
902+
decode_config: Optional[DecodePageConfig] = None,
903+
):
904+
if parser_config is None:
905+
parser_config = ThreadedPdfParserConfig()
906+
if decode_config is None:
907+
decode_config = DecodePageConfig()
908+
909+
self._parser = threaded_pdf_parser(
910+
loglevel=parser_config.loglevel,
911+
num_threads=parser_config.threads,
912+
max_concurrent_results=parser_config.max_concurrent_results,
913+
config=decode_config,
914+
)
915+
916+
def load(
917+
self,
918+
path_or_stream: Union[str, Path, BytesIO],
919+
password: Optional[str] = None,
920+
) -> str:
921+
"""Load a document for parallel processing.
922+
923+
Parameters:
924+
path_or_stream: File path or BytesIO object.
925+
password: Optional password for protected files.
926+
927+
Returns:
928+
str: The document key.
929+
"""
930+
if isinstance(path_or_stream, str):
931+
path_or_stream = Path(path_or_stream)
932+
933+
if isinstance(path_or_stream, Path):
934+
key = f"key={str(path_or_stream)}"
935+
success = self._parser.load_document(
936+
key=key, filename=str(path_or_stream).encode("utf8"), password=password
937+
)
938+
elif isinstance(path_or_stream, BytesIO):
939+
hasher = hashlib.sha256(usedforsecurity=False)
940+
while chunk := path_or_stream.read(8192):
941+
hasher.update(chunk)
942+
path_or_stream.seek(0)
943+
hash_val = hasher.hexdigest()
944+
945+
key = f"key={hash_val}"
946+
success = self._parser.load_document_from_bytesio(
947+
key=key, bytes_io=path_or_stream, password=password
948+
)
949+
else:
950+
raise TypeError(
951+
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
952+
)
953+
954+
if not success:
955+
raise RuntimeError(f"Failed to load document with key {key}")
956+
957+
return key
958+
959+
def has_tasks(self) -> bool:
960+
"""Check if there are remaining tasks to consume.
961+
962+
On first call, builds the task queue and starts worker threads.
963+
964+
Returns:
965+
bool: True if there are remaining results to consume.
966+
"""
967+
return self._parser.has_tasks()
968+
969+
def get_task(self) -> "PageDecodeResult":
970+
"""Get the next completed page decode result.
971+
972+
Blocks until a result is available.
973+
974+
Returns:
975+
PageDecodeResult: The result with doc_key, page_number, success flag.
976+
Use task.get() to get (PdfPageDecoder, timings) or task.error() for error message.
977+
"""
978+
return self._parser.get_task()

0 commit comments

Comments
 (0)