Skip to content
Merged

Dev #4473

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/en/usage/cli_tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ Here are the environment variables and their descriptions:
- `MINERU_MODEL_SOURCE`:
* Used to specify model source
* supports `huggingface/modelscope/local`
* defaults to `huggingface`, can be switched to `modelscope` or local models through environment variables.
* Default is `huggingface`; you can switch via an environment variable to `modelscope` to use a domestic acceleration mirror, or switch to `local` to use a local model.


- `MINERU_TOOLS_CONFIG_JSON`:
* Used to specify configuration file path
Expand All @@ -101,8 +102,14 @@ Here are the environment variables and their descriptions:
* Default is `true`, can be set to `false` via environment variable to disable table merging functionality.

- `MINERU_PDF_RENDER_TIMEOUT`:
* Used to set the timeout period (in seconds) for rendering PDF to images
* Default is `300` seconds, can be set to other values via environment variable to adjust the image rendering timeout.
* Used to set the timeout (in seconds) for rendering PDFs to images.
* Default is `300` seconds; you can set a different value via an environment variable to adjust the rendering timeout.
* Only effective on Linux and macOS systems.

- `MINERU_PDF_RENDER_THREADS`:
* Used to set the number of threads used when rendering PDFs to images.
* Default is `4`; you can set a different value via an environment variable to adjust the number of threads for image rendering.
* Only effective on Linux and macOS systems.

- `MINERU_INTRA_OP_NUM_THREADS`:
* Used to set the intra_op thread count for ONNX models, affects the computation speed of individual operators
Expand Down
8 changes: 7 additions & 1 deletion docs/zh/usage/cli_tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ MinerU命令行工具的某些参数存在相同功能的环境变量配置,
- `MINERU_MODEL_SOURCE`:
* 用于指定模型来源
* 支持`huggingface/modelscope/local`
* 默认为`huggingface`可通过环境变量切换为`modelscope`或使用本地模型
* 默认为`huggingface`可通过环境变量切换为`modelscope`使用国内加速源或切换至`local`以使用本地模型

- `MINERU_TOOLS_CONFIG_JSON`:
* 用于指定配置文件路径
Expand All @@ -98,6 +98,12 @@ MinerU命令行工具的某些参数存在相同功能的环境变量配置,
- `MINERU_PDF_RENDER_TIMEOUT`:
* 用于设置将PDF渲染为图片的超时时间(秒)
* 默认为`300`秒,可通过环境变量设置为其他值以调整渲染图片的超时时间。
* 仅在linux和macOS系统中生效。

- `MINERU_PDF_RENDER_THREADS`:
* 用于设置将PDF渲染为图片时使用的线程数
* 默认为`4`,可通过环境变量设置为其他值以调整渲染图片时的线程数。
* 仅在linux和macOS系统中生效。

- `MINERU_INTRA_OP_NUM_THREADS`:
* 用于设置onnx模型的intra_op线程数,影响单个算子的计算速度
Expand Down
4 changes: 4 additions & 0 deletions mineru/utils/os_env_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ def get_load_images_timeout() -> int:
return get_value_from_string(env_value, 300)


def get_load_images_threads() -> int:
env_value = os.getenv('MINERU_PDF_RENDER_THREADS', None)
return get_value_from_string(env_value, 4)

def get_value_from_string(env_value: str, default_value: int) -> int:
if env_value is not None:
try:
Expand Down
98 changes: 73 additions & 25 deletions mineru/utils/pdf_image_tools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) Opendatalab. All rights reserved.
import os
import signal
import time
from io import BytesIO

import numpy as np
Expand All @@ -9,13 +11,13 @@

from mineru.data.data_reader_writer import FileBasedDataWriter
from mineru.utils.check_sys_env import is_windows_environment
from mineru.utils.os_env_config import get_load_images_timeout
from mineru.utils.os_env_config import get_load_images_timeout, get_load_images_threads
from mineru.utils.pdf_reader import image_to_b64str, image_to_bytes, page_to_image
from mineru.utils.enum_class import ImageType
from mineru.utils.hash_utils import str_sha256
from mineru.utils.pdf_page_id import get_end_page_id

from concurrent.futures import ProcessPoolExecutor, TimeoutError as FuturesTimeoutError
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED


def pdf_page_to_image(page: pdfium.PdfPage, dpi=200, image_type=ImageType.PIL) -> dict:
Expand Down Expand Up @@ -57,7 +59,7 @@ def load_images_from_pdf(
end_page_id=None,
image_type=ImageType.PIL,
timeout=None,
threads=4,
threads=None,
):
"""带超时控制的 PDF 转图片函数,支持多进程加速

Expand All @@ -67,8 +69,8 @@ def load_images_from_pdf(
start_page_id (int, optional): 起始页码. Defaults to 0.
end_page_id (int | None, optional): 结束页码. Defaults to None.
image_type (ImageType, optional): 图片类型. Defaults to ImageType.PIL.
timeout (int | None, optional): 超时时间(秒)。如果为 None,则从环境变量 MINERU_PDF_LOAD_IMAGES_TIMEOUT 读取,若未设置则默认为 300 秒。
threads (int): 进程数,默认 4
timeout (int | None, optional): 超时时间(秒)。如果为 None,则从环境变量 MINERU_PDF_RENDER_TIMEOUT 读取,若未设置则默认为 300 秒。
threads (int): 进程数, 如果为 None,则从环境变量 MINERU_PDF_RENDER_THREADS 读取,若未设置则默认为 4.

Raises:
TimeoutError: 当转换超时时抛出
Expand All @@ -86,6 +88,9 @@ def load_images_from_pdf(
else:
if timeout is None:
timeout = get_load_images_timeout()
if threads is None:
threads = get_load_images_threads()

end_page_id = get_end_page_id(end_page_id, len(pdf_doc))

# 计算总页数
Expand All @@ -108,11 +113,13 @@ def load_images_from_pdf(

page_ranges.append((range_start, range_end))

# logger.debug(f"PDF to images using {actual_threads} processes, page ranges: {page_ranges}")
logger.debug(f"PDF to images using {actual_threads} processes, page ranges: {page_ranges}")
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 116 was previously commented out (as indicated by the diff), but now it's active. Debugging log statements should generally be controlled through log levels rather than being commented/uncommented. Consider whether this debug log should remain active or if it was uncommented for debugging purposes and should be reverted.

Copilot uses AI. Check for mistakes.

with ProcessPoolExecutor(max_workers=actual_threads) as executor:
executor = ProcessPoolExecutor(max_workers=actual_threads)
try:
# 提交所有任务
futures = []
future_to_range = {}
for range_start, range_end in page_ranges:
future = executor.submit(
_load_images_from_pdf_worker,
Expand All @@ -122,27 +129,68 @@ def load_images_from_pdf(
range_end,
image_type,
)
futures.append((range_start, future))

try:
# 收集结果并按页码排序
all_results = []
for range_start, future in futures:
images_list = future.result(timeout=timeout)
all_results.append((range_start, images_list))

# 按起始页码排序并合并结果
all_results.sort(key=lambda x: x[0])
images_list = []
for _, imgs in all_results:
images_list.extend(imgs)

return images_list, pdf_doc
except FuturesTimeoutError:
futures.append(future)
future_to_range[future] = range_start

# 使用 wait() 设置单一全局超时
done, not_done = wait(futures, timeout=timeout, return_when=ALL_COMPLETED)

# 检查是否有未完成的任务(超时情况)
if not_done:
# 超时:强制终止所有子进程
_terminate_executor_processes(executor)
pdf_doc.close()
executor.shutdown(wait=False, cancel_futures=True)
raise TimeoutError(f"PDF to images conversion timeout after {timeout}s")

# 所有任务完成,收集结果
all_results = []
for future in futures:
range_start = future_to_range[future]
# 这里不需要 timeout,因为任务已完成
images_list = future.result()
Comment on lines +149 to +150
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When collecting results from completed futures at lines 147-151, exceptions raised by individual futures are not caught. If a future.result() call raises an exception (e.g., from a failed worker process), the exception will propagate and be caught by the generic except block at line 161, but pdf_doc will be closed and the specific error information from the failed worker will be re-raised. Consider explicitly handling exceptions from future.result() to provide better error context about which page range failed.

Suggested change
# 这里不需要 timeout,因为任务已完成
images_list = future.result()
# 这里不需要 timeout,因为任务已完成,但需要捕获并标注异常范围
try:
images_list = future.result()
except Exception as e:
# 为调用方提供更清晰的错误上下文,指明是哪个页码范围失败
raise RuntimeError(
f"Error processing PDF pages starting at {range_start}"
) from e

Copilot uses AI. Check for mistakes.
all_results.append((range_start, images_list))

# 按起始页码排序并合并结果
all_results.sort(key=lambda x: x[0])
images_list = []
for _, imgs in all_results:
images_list.extend(imgs)

return images_list, pdf_doc

except Exception as e:
# 发生任何异常时,确保清理子进程
_terminate_executor_processes(executor)
Comment on lines +141 to +163
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _terminate_executor_processes function is called both at line 141 (for timeout) and at line 163 (for any exception). If a timeout occurs (raising TimeoutError at line 143), the exception handler at line 161 will catch it and call _terminate_executor_processes again at line 163. This means processes could be terminated twice in the timeout scenario. While this shouldn't cause errors due to exception handling in the function, it's inefficient. Consider restructuring to avoid the duplicate call.

Copilot uses AI. Check for mistakes.
pdf_doc.close()
if isinstance(e, TimeoutError):
raise
Comment on lines +165 to +166
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling logic has a subtle issue. At line 165-167, if the exception is a TimeoutError, it's re-raised, but this will only catch the TimeoutError raised at line 143. Any other exception will also be re-raised at line 167. The redundant check at line 165-166 doesn't add value since line 167 will re-raise all exceptions anyway. Consider simplifying by removing lines 165-166.

Suggested change
if isinstance(e, TimeoutError):
raise

Copilot uses AI. Check for mistakes.
raise
finally:
executor.shutdown(wait=False, cancel_futures=True)


def _terminate_executor_processes(executor):
"""强制终止 ProcessPoolExecutor 中的所有子进程"""
if hasattr(executor, '_processes'):
for pid, process in executor._processes.items():
if process.is_alive():
try:
# 先发送 SIGTERM 允许优雅退出
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, OSError):
pass

# 给子进程一点时间响应 SIGTERM
time.sleep(0.1)

# 对仍然存活的进程发送 SIGKILL 强制终止
for pid, process in executor._processes.items():
if process.is_alive():
try:
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, OSError):
pass


def load_images_from_pdf_core(
pdf_bytes: bytes,
Expand Down
Loading