Skip to content

Commit 268d027

Browse files
dolfim-ibmcau-git
andauthored
feat: Use threading in the standard pipeline and move old behavior to legacy (#2452)
* rename standard to legacy Signed-off-by: Michele Dolfi <[email protected]> * remove old standard pipeline Signed-off-by: Michele Dolfi <[email protected]> * move threaded to standard Signed-off-by: Michele Dolfi <[email protected]> * add backwards compatible threaded pipeline Signed-off-by: Michele Dolfi <[email protected]> * Updates for threaded pipeline to lower memory requirements Signed-off-by: Christoph Auer <[email protected]> * updating deps seem to remove the corrupted double-linked list error Signed-off-by: Michele Dolfi <[email protected]> * update pinning Signed-off-by: Michele Dolfi <[email protected]> * use main lock Signed-off-by: Michele Dolfi <[email protected]> * add more threadsafe blocks Signed-off-by: Michele Dolfi <[email protected]> * rename batch_timeout_seconds Signed-off-by: Michele Dolfi <[email protected]> --------- Signed-off-by: Michele Dolfi <[email protected]> Signed-off-by: Christoph Auer <[email protected]> Co-authored-by: Christoph Auer <[email protected]>
1 parent 01577e9 commit 268d027

File tree

7 files changed

+851
-752
lines changed

7 files changed

+851
-752
lines changed

docling/datamodel/pipeline_options.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,23 +361,26 @@ class PdfPipelineOptions(PaginatedPipelineOptions):
361361

362362
generate_parsed_pages: bool = False
363363

364-
365-
class ProcessingPipeline(str, Enum):
366-
STANDARD = "standard"
367-
VLM = "vlm"
368-
ASR = "asr"
369-
370-
371-
class ThreadedPdfPipelineOptions(PdfPipelineOptions):
372-
"""Pipeline options for the threaded PDF pipeline with batching and backpressure control"""
364+
### Arguments for threaded PDF pipeline with batching and backpressure control
373365

374366
# Batch sizes for different stages
375367
ocr_batch_size: int = 4
376368
layout_batch_size: int = 4
377369
table_batch_size: int = 4
378370

379371
# Timing control
380-
batch_timeout_seconds: float = 2.0
372+
batch_polling_interval_seconds: float = 0.5
381373

382374
# Backpressure and queue control
383375
queue_max_size: int = 100
376+
377+
378+
class ProcessingPipeline(str, Enum):
379+
LEGACY = "legacy"
380+
STANDARD = "standard"
381+
VLM = "vlm"
382+
ASR = "asr"
383+
384+
385+
class ThreadedPdfPipelineOptions(PdfPipelineOptions):
386+
"""Pipeline options for the threaded PDF pipeline with batching and backpressure control"""

docling/models/layout_model.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ def __call__(
167167
valid_pages.append(page)
168168
valid_page_images.append(page_image)
169169

170+
print(f"{len(pages)=}, {pages[0].page_no}-{pages[-1].page_no}")
171+
print(f"{len(valid_pages)=}")
172+
print(f"{len(valid_page_images)=}")
173+
170174
# Process all valid pages with batch prediction
171175
batch_predictions = []
172176
if valid_page_images:
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import logging
2+
import warnings
3+
from pathlib import Path
4+
from typing import Optional, cast
5+
6+
import numpy as np
7+
from docling_core.types.doc import DocItem, ImageRef, PictureItem, TableItem
8+
9+
from docling.backend.abstract_backend import AbstractDocumentBackend
10+
from docling.backend.pdf_backend import PdfDocumentBackend
11+
from docling.datamodel.base_models import AssembledUnit, Page
12+
from docling.datamodel.document import ConversionResult
13+
from docling.datamodel.layout_model_specs import LayoutModelConfig
14+
from docling.datamodel.pipeline_options import PdfPipelineOptions
15+
from docling.datamodel.settings import settings
16+
from docling.models.base_ocr_model import BaseOcrModel
17+
from docling.models.code_formula_model import CodeFormulaModel, CodeFormulaModelOptions
18+
from docling.models.factories import get_ocr_factory
19+
from docling.models.layout_model import LayoutModel
20+
from docling.models.page_assemble_model import PageAssembleModel, PageAssembleOptions
21+
from docling.models.page_preprocessing_model import (
22+
PagePreprocessingModel,
23+
PagePreprocessingOptions,
24+
)
25+
from docling.models.readingorder_model import ReadingOrderModel, ReadingOrderOptions
26+
from docling.models.table_structure_model import TableStructureModel
27+
from docling.pipeline.base_pipeline import PaginatedPipeline
28+
from docling.utils.model_downloader import download_models
29+
from docling.utils.profiling import ProfilingScope, TimeRecorder
30+
31+
_log = logging.getLogger(__name__)
32+
33+
34+
class LegacyStandardPdfPipeline(PaginatedPipeline):
35+
def __init__(self, pipeline_options: PdfPipelineOptions):
36+
super().__init__(pipeline_options)
37+
self.pipeline_options: PdfPipelineOptions
38+
39+
with warnings.catch_warnings(): # deprecated generate_table_images
40+
warnings.filterwarnings("ignore", category=DeprecationWarning)
41+
self.keep_images = (
42+
self.pipeline_options.generate_page_images
43+
or self.pipeline_options.generate_picture_images
44+
or self.pipeline_options.generate_table_images
45+
)
46+
47+
self.reading_order_model = ReadingOrderModel(options=ReadingOrderOptions())
48+
49+
ocr_model = self.get_ocr_model(artifacts_path=self.artifacts_path)
50+
51+
self.build_pipe = [
52+
# Pre-processing
53+
PagePreprocessingModel(
54+
options=PagePreprocessingOptions(
55+
images_scale=pipeline_options.images_scale,
56+
)
57+
),
58+
# OCR
59+
ocr_model,
60+
# Layout model
61+
LayoutModel(
62+
artifacts_path=self.artifacts_path,
63+
accelerator_options=pipeline_options.accelerator_options,
64+
options=pipeline_options.layout_options,
65+
),
66+
# Table structure model
67+
TableStructureModel(
68+
enabled=pipeline_options.do_table_structure,
69+
artifacts_path=self.artifacts_path,
70+
options=pipeline_options.table_structure_options,
71+
accelerator_options=pipeline_options.accelerator_options,
72+
),
73+
# Page assemble
74+
PageAssembleModel(options=PageAssembleOptions()),
75+
]
76+
77+
self.enrichment_pipe = [
78+
# Code Formula Enrichment Model
79+
CodeFormulaModel(
80+
enabled=pipeline_options.do_code_enrichment
81+
or pipeline_options.do_formula_enrichment,
82+
artifacts_path=self.artifacts_path,
83+
options=CodeFormulaModelOptions(
84+
do_code_enrichment=pipeline_options.do_code_enrichment,
85+
do_formula_enrichment=pipeline_options.do_formula_enrichment,
86+
),
87+
accelerator_options=pipeline_options.accelerator_options,
88+
),
89+
*self.enrichment_pipe,
90+
]
91+
92+
if (
93+
self.pipeline_options.do_formula_enrichment
94+
or self.pipeline_options.do_code_enrichment
95+
or self.pipeline_options.do_picture_classification
96+
or self.pipeline_options.do_picture_description
97+
):
98+
self.keep_backend = True
99+
100+
@staticmethod
101+
def download_models_hf(
102+
local_dir: Optional[Path] = None, force: bool = False
103+
) -> Path:
104+
warnings.warn(
105+
"The usage of LegacyStandardPdfPipeline.download_models_hf() is deprecated "
106+
"use instead the utility `docling-tools models download`, or "
107+
"the upstream method docling.utils.models_downloader.download_all()",
108+
DeprecationWarning,
109+
stacklevel=3,
110+
)
111+
112+
output_dir = download_models(output_dir=local_dir, force=force, progress=False)
113+
return output_dir
114+
115+
def get_ocr_model(self, artifacts_path: Optional[Path] = None) -> BaseOcrModel:
116+
factory = get_ocr_factory(
117+
allow_external_plugins=self.pipeline_options.allow_external_plugins
118+
)
119+
return factory.create_instance(
120+
options=self.pipeline_options.ocr_options,
121+
enabled=self.pipeline_options.do_ocr,
122+
artifacts_path=artifacts_path,
123+
accelerator_options=self.pipeline_options.accelerator_options,
124+
)
125+
126+
def initialize_page(self, conv_res: ConversionResult, page: Page) -> Page:
127+
with TimeRecorder(conv_res, "page_init"):
128+
page._backend = conv_res.input._backend.load_page(page.page_no) # type: ignore
129+
if page._backend is not None and page._backend.is_valid():
130+
page.size = page._backend.get_size()
131+
132+
return page
133+
134+
def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult:
135+
all_elements = []
136+
all_headers = []
137+
all_body = []
138+
139+
with TimeRecorder(conv_res, "doc_assemble", scope=ProfilingScope.DOCUMENT):
140+
for p in conv_res.pages:
141+
if p.assembled is not None:
142+
for el in p.assembled.body:
143+
all_body.append(el)
144+
for el in p.assembled.headers:
145+
all_headers.append(el)
146+
for el in p.assembled.elements:
147+
all_elements.append(el)
148+
149+
conv_res.assembled = AssembledUnit(
150+
elements=all_elements, headers=all_headers, body=all_body
151+
)
152+
153+
conv_res.document = self.reading_order_model(conv_res)
154+
155+
# Generate page images in the output
156+
if self.pipeline_options.generate_page_images:
157+
for page in conv_res.pages:
158+
assert page.image is not None
159+
page_no = page.page_no + 1
160+
conv_res.document.pages[page_no].image = ImageRef.from_pil(
161+
page.image, dpi=int(72 * self.pipeline_options.images_scale)
162+
)
163+
164+
# Generate images of the requested element types
165+
with warnings.catch_warnings(): # deprecated generate_table_images
166+
warnings.filterwarnings("ignore", category=DeprecationWarning)
167+
if (
168+
self.pipeline_options.generate_picture_images
169+
or self.pipeline_options.generate_table_images
170+
):
171+
scale = self.pipeline_options.images_scale
172+
for element, _level in conv_res.document.iterate_items():
173+
if not isinstance(element, DocItem) or len(element.prov) == 0:
174+
continue
175+
if (
176+
isinstance(element, PictureItem)
177+
and self.pipeline_options.generate_picture_images
178+
) or (
179+
isinstance(element, TableItem)
180+
and self.pipeline_options.generate_table_images
181+
):
182+
page_ix = element.prov[0].page_no - 1
183+
page = next(
184+
(p for p in conv_res.pages if p.page_no == page_ix),
185+
cast("Page", None),
186+
)
187+
assert page is not None
188+
assert page.size is not None
189+
assert page.image is not None
190+
191+
crop_bbox = (
192+
element.prov[0]
193+
.bbox.scaled(scale=scale)
194+
.to_top_left_origin(
195+
page_height=page.size.height * scale
196+
)
197+
)
198+
199+
cropped_im = page.image.crop(crop_bbox.as_tuple())
200+
element.image = ImageRef.from_pil(
201+
cropped_im, dpi=int(72 * scale)
202+
)
203+
204+
# Aggregate confidence values for document:
205+
if len(conv_res.pages) > 0:
206+
with warnings.catch_warnings():
207+
warnings.filterwarnings(
208+
"ignore",
209+
category=RuntimeWarning,
210+
message="Mean of empty slice|All-NaN slice encountered",
211+
)
212+
conv_res.confidence.layout_score = float(
213+
np.nanmean(
214+
[c.layout_score for c in conv_res.confidence.pages.values()]
215+
)
216+
)
217+
conv_res.confidence.parse_score = float(
218+
np.nanquantile(
219+
[c.parse_score for c in conv_res.confidence.pages.values()],
220+
q=0.1, # parse score should relate to worst 10% of pages.
221+
)
222+
)
223+
conv_res.confidence.table_score = float(
224+
np.nanmean(
225+
[c.table_score for c in conv_res.confidence.pages.values()]
226+
)
227+
)
228+
conv_res.confidence.ocr_score = float(
229+
np.nanmean(
230+
[c.ocr_score for c in conv_res.confidence.pages.values()]
231+
)
232+
)
233+
234+
return conv_res
235+
236+
@classmethod
237+
def get_default_options(cls) -> PdfPipelineOptions:
238+
return PdfPipelineOptions()
239+
240+
@classmethod
241+
def is_backend_supported(cls, backend: AbstractDocumentBackend):
242+
return isinstance(backend, PdfDocumentBackend)

0 commit comments

Comments
 (0)