Skip to content

Commit aed772a

Browse files
cau-gitUbuntu
andauthored
feat: Threaded PDF pipeline (#1951)
* Initial async pdf pipeline Signed-off-by: Christoph Auer <[email protected]> * UpstreamAwareQueue Signed-off-by: Christoph Auer <[email protected]> * Refactoring into async pipeline primitives and graph Signed-off-by: Christoph Auer <[email protected]> * Cleanups and safety improvements Signed-off-by: Christoph Auer <[email protected]> * Better threaded PDF pipeline Signed-off-by: Christoph Auer <[email protected]> * Pin docling-ibm-models Signed-off-by: Christoph Auer <[email protected]> * Remove unused args Signed-off-by: Christoph Auer <[email protected]> * Add test Signed-off-by: Christoph Auer <[email protected]> * Revise pipeline Signed-off-by: Christoph Auer <[email protected]> * Unload doc backend Signed-off-by: Christoph Auer <[email protected]> * Revert "Unload doc backend" This reverts commit 01066f0. * Remove redundant method Signed-off-by: Christoph Auer <[email protected]> * Update threaded test Signed-off-by: Ubuntu <[email protected]> * Stop accumulating docs in test run Signed-off-by: Christoph Auer <[email protected]> * Fix: don't starve on docs with > max_queue_size pages Signed-off-by: Christoph Auer <[email protected]> * Fix: don't starve on docs with > max_queue_size pages Signed-off-by: Christoph Auer <[email protected]> * DCO Remediation Commit for Christoph Auer <[email protected]> I, Christoph Auer <[email protected]>, hereby add my Signed-off-by to this commit: fa71cde I, Ubuntu <[email protected]>, hereby add my Signed-off-by to this commit: d66da87 Signed-off-by: Christoph Auer <[email protected]> * Fix: python3.9 compat Signed-off-by: Christoph Auer <[email protected]> * Option to enable threadpool with doc_batch_concurrency setting Signed-off-by: Christoph Auer <[email protected]> * Clean up unused code Signed-off-by: Christoph Auer <[email protected]> * Fix settings defaults expectations Signed-off-by: Christoph Auer <[email protected]> * Use released docling-ibm-models Signed-off-by: Christoph Auer <[email protected]> * Remove ignores for typing/linting Signed-off-by: Christoph Auer <[email protected]> --------- Signed-off-by: Christoph Auer <[email protected]> Signed-off-by: Ubuntu <[email protected]> Co-authored-by: Ubuntu <[email protected]>
1 parent aec29a7 commit aed772a

File tree

9 files changed

+917
-100
lines changed

9 files changed

+917
-100
lines changed

docling/datamodel/pipeline_options.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,18 @@ class ProcessingPipeline(str, Enum):
332332
STANDARD = "standard"
333333
VLM = "vlm"
334334
ASR = "asr"
335+
336+
337+
class ThreadedPdfPipelineOptions(PdfPipelineOptions):
338+
"""Pipeline options for the threaded PDF pipeline with batching and backpressure control"""
339+
340+
# Batch sizes for different stages
341+
ocr_batch_size: int = 4
342+
layout_batch_size: int = 4
343+
table_batch_size: int = 4
344+
345+
# Timing control
346+
batch_timeout_seconds: float = 2.0
347+
348+
# Backpressure and queue control
349+
queue_max_size: int = 100

docling/datamodel/settings.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,13 @@ class DocumentLimits(BaseModel):
2626

2727

2828
class BatchConcurrencySettings(BaseModel):
29-
doc_batch_size: int = 2
30-
doc_batch_concurrency: int = 2
31-
page_batch_size: int = 4
32-
page_batch_concurrency: int = 2
33-
elements_batch_size: int = 16
34-
35-
# doc_batch_size: int = 1
36-
# doc_batch_concurrency: int = 1
37-
# page_batch_size: int = 1
38-
# page_batch_concurrency: int = 1
39-
40-
# model_concurrency: int = 2
29+
doc_batch_size: int = 1 # Number of documents processed in one batch. Should be >= doc_batch_concurrency
30+
doc_batch_concurrency: int = 1 # Number of parallel threads processing documents. Warning: Experimental! No benefit expected without free-threaded python.
31+
page_batch_size: int = 4 # Number of pages processed in one batch.
32+
page_batch_concurrency: int = 1 # Currently unused.
33+
elements_batch_size: int = (
34+
16 # Number of elements processed in one batch, in enrichment models.
35+
)
4136

4237
# To force models into single core: export OMP_NUM_THREADS=1
4338

docling/document_converter.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import time
66
from collections.abc import Iterable, Iterator
7+
from concurrent.futures import ThreadPoolExecutor
78
from functools import partial
89
from pathlib import Path
910
from typing import Dict, List, Optional, Tuple, Type, Union
@@ -284,24 +285,33 @@ def _convert(
284285
settings.perf.doc_batch_size, # pass format_options
285286
):
286287
_log.info("Going to convert document batch...")
288+
process_func = partial(
289+
self._process_document, raises_on_error=raises_on_error
290+
)
287291

288-
# parallel processing only within input_batch
289-
# with ThreadPoolExecutor(
290-
# max_workers=settings.perf.doc_batch_concurrency
291-
# ) as pool:
292-
# yield from pool.map(self.process_document, input_batch)
293-
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
294-
295-
for item in map(
296-
partial(self._process_document, raises_on_error=raises_on_error),
297-
input_batch,
292+
if (
293+
settings.perf.doc_batch_concurrency > 1
294+
and settings.perf.doc_batch_size > 1
298295
):
299-
elapsed = time.monotonic() - start_time
300-
start_time = time.monotonic()
301-
_log.info(
302-
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
303-
)
304-
yield item
296+
with ThreadPoolExecutor(
297+
max_workers=settings.perf.doc_batch_concurrency
298+
) as pool:
299+
for item in pool.map(
300+
process_func,
301+
input_batch,
302+
):
303+
yield item
304+
else:
305+
for item in map(
306+
process_func,
307+
input_batch,
308+
):
309+
elapsed = time.monotonic() - start_time
310+
start_time = time.monotonic()
311+
_log.info(
312+
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
313+
)
314+
yield item
305315

306316
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
307317
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""
@@ -330,7 +340,7 @@ def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
330340
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
331341
)
332342

333-
return self.initialized_pipelines[cache_key]
343+
return self.initialized_pipelines[cache_key]
334344

335345
def _process_document(
336346
self, in_doc: InputDocument, raises_on_error: bool

docling/models/layout_model.py

Lines changed: 84 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import warnings
44
from collections.abc import Iterable
55
from pathlib import Path
6-
from typing import Optional
6+
from typing import List, Optional, Union
77

88
import numpy as np
99
from docling_core.types.doc import DocItemLabel
@@ -148,72 +148,90 @@ def draw_clusters_and_cells_side_by_side(
148148
def __call__(
149149
self, conv_res: ConversionResult, page_batch: Iterable[Page]
150150
) -> Iterable[Page]:
151-
for page in page_batch:
151+
# Convert to list to allow multiple iterations
152+
pages = list(page_batch)
153+
154+
# Separate valid and invalid pages
155+
valid_pages = []
156+
valid_page_images: List[Union[Image.Image, np.ndarray]] = []
157+
158+
for page in pages:
152159
assert page._backend is not None
153160
if not page._backend.is_valid():
154-
yield page
155-
else:
156-
with TimeRecorder(conv_res, "layout"):
157-
assert page.size is not None
158-
page_image = page.get_image(scale=1.0)
159-
assert page_image is not None
160-
161-
clusters = []
162-
for ix, pred_item in enumerate(
163-
self.layout_predictor.predict(page_image)
164-
):
165-
label = DocItemLabel(
166-
pred_item["label"]
167-
.lower()
168-
.replace(" ", "_")
169-
.replace("-", "_")
170-
) # Temporary, until docling-ibm-model uses docling-core types
171-
cluster = Cluster(
172-
id=ix,
173-
label=label,
174-
confidence=pred_item["confidence"],
175-
bbox=BoundingBox.model_validate(pred_item),
176-
cells=[],
177-
)
178-
clusters.append(cluster)
179-
180-
if settings.debug.visualize_raw_layout:
181-
self.draw_clusters_and_cells_side_by_side(
182-
conv_res, page, clusters, mode_prefix="raw"
183-
)
184-
185-
# Apply postprocessing
186-
187-
processed_clusters, processed_cells = LayoutPostprocessor(
188-
page, clusters, self.options
189-
).postprocess()
190-
# Note: LayoutPostprocessor updates page.cells and page.parsed_page internally
191-
192-
with warnings.catch_warnings():
193-
warnings.filterwarnings(
194-
"ignore",
195-
"Mean of empty slice|invalid value encountered in scalar divide",
196-
RuntimeWarning,
197-
"numpy",
198-
)
199-
200-
conv_res.confidence.pages[page.page_no].layout_score = float(
201-
np.mean([c.confidence for c in processed_clusters])
202-
)
203-
204-
conv_res.confidence.pages[page.page_no].ocr_score = float(
205-
np.mean(
206-
[c.confidence for c in processed_cells if c.from_ocr]
207-
)
208-
)
209-
210-
page.predictions.layout = LayoutPrediction(
211-
clusters=processed_clusters
212-
)
213-
214-
if settings.debug.visualize_layout:
215-
self.draw_clusters_and_cells_side_by_side(
216-
conv_res, page, processed_clusters, mode_prefix="postprocessed"
217-
)
161+
continue
218162

163+
assert page.size is not None
164+
page_image = page.get_image(scale=1.0)
165+
assert page_image is not None
166+
167+
valid_pages.append(page)
168+
valid_page_images.append(page_image)
169+
170+
# Process all valid pages with batch prediction
171+
batch_predictions = []
172+
if valid_page_images:
173+
with TimeRecorder(conv_res, "layout"):
174+
batch_predictions = self.layout_predictor.predict_batch( # type: ignore[attr-defined]
175+
valid_page_images
176+
)
177+
178+
# Process each page with its predictions
179+
valid_page_idx = 0
180+
for page in pages:
181+
assert page._backend is not None
182+
if not page._backend.is_valid():
219183
yield page
184+
continue
185+
186+
page_predictions = batch_predictions[valid_page_idx]
187+
valid_page_idx += 1
188+
189+
clusters = []
190+
for ix, pred_item in enumerate(page_predictions):
191+
label = DocItemLabel(
192+
pred_item["label"].lower().replace(" ", "_").replace("-", "_")
193+
) # Temporary, until docling-ibm-model uses docling-core types
194+
cluster = Cluster(
195+
id=ix,
196+
label=label,
197+
confidence=pred_item["confidence"],
198+
bbox=BoundingBox.model_validate(pred_item),
199+
cells=[],
200+
)
201+
clusters.append(cluster)
202+
203+
if settings.debug.visualize_raw_layout:
204+
self.draw_clusters_and_cells_side_by_side(
205+
conv_res, page, clusters, mode_prefix="raw"
206+
)
207+
208+
# Apply postprocessing
209+
processed_clusters, processed_cells = LayoutPostprocessor(
210+
page, clusters, self.options
211+
).postprocess()
212+
# Note: LayoutPostprocessor updates page.cells and page.parsed_page internally
213+
214+
with warnings.catch_warnings():
215+
warnings.filterwarnings(
216+
"ignore",
217+
"Mean of empty slice|invalid value encountered in scalar divide",
218+
RuntimeWarning,
219+
"numpy",
220+
)
221+
222+
conv_res.confidence.pages[page.page_no].layout_score = float(
223+
np.mean([c.confidence for c in processed_clusters])
224+
)
225+
226+
conv_res.confidence.pages[page.page_no].ocr_score = float(
227+
np.mean([c.confidence for c in processed_cells if c.from_ocr])
228+
)
229+
230+
page.predictions.layout = LayoutPrediction(clusters=processed_clusters)
231+
232+
if settings.debug.visualize_layout:
233+
self.draw_clusters_and_cells_side_by_side(
234+
conv_res, page, processed_clusters, mode_prefix="postprocessed"
235+
)
236+
237+
yield page

0 commit comments

Comments
 (0)