Skip to content

Commit 761f7f6

Browse files
authored
fix: optimize PDF parsing by implementing concurrent processing with … (#177)
* fix: optimize PDF parsing by implementing concurrent processing with ThreadPoolExecutor * Refactor to async processing for file extraction Refactor the file processing to use asyncio for improved performance and concurrency.
1 parent 8113840 commit 761f7f6

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

runtime/ops/formatter/mineru_formatter/process.py

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
Description: MinerU PDF文本抽取
66
Create: 2025/10/29 17:24
77
"""
8+
import asyncio
89
import os
910
import shutil
1011
import time
1112
from typing import Dict, Any
1213

13-
from datamate.common.utils.rest_client import http_request
1414
from datamate.core.base_op import Mapper
1515
from loguru import logger
16-
from mineru.cli.common import do_parse, read_fn
16+
from mineru.cli.common import aio_do_parse, read_fn
1717
from mineru.cli.fast_api import get_infer_result
1818
from pypdf import PdfReader
1919

@@ -30,33 +30,38 @@ def __init__(self, *args, **kwargs):
3030
def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]:
3131
start = time.time()
3232
filename = sample[self.filename_key]
33-
filename_without_ext = os.path.splitext(filename)[0]
3433
if not filename.lower().endswith((".png", ".jpeg", ".jpg", ".webp", ".gif", ".pdf")):
3534
return sample
3635
try:
37-
filepath = sample[self.filepath_key]
38-
parse_dir = os.path.join(self.output_dir, filename_without_ext, "vlm")
39-
pdf_bytes = read_fn(filepath)
40-
total_page = len(PdfReader(filepath).pages)
41-
content = ""
42-
for page in range(0, total_page, 10):
43-
do_parse(
44-
output_dir=self.output_dir,
45-
pdf_file_names=[filename_without_ext],
46-
pdf_bytes_list=[pdf_bytes],
47-
p_lang_list=["ch"],
48-
backend=self.backend,
49-
server_url=self.server_url,
50-
start_page_id=page,
51-
end_page_id=min(page + 9, total_page - 1),
52-
)
53-
if os.path.exists(parse_dir):
54-
content += get_infer_result(".md", filename_without_ext, parse_dir)
55-
shutil.rmtree(parse_dir)
56-
sample[self.text_key] = content
36+
sample[self.text_key] = asyncio.run(self.async_process_file(sample))
5737
logger.info(
5838
f"fileName: {filename}, method: MineruFormatter costs {(time.time() - start):6f} s")
5939
except Exception as e:
6040
logger.exception(f"fileName: {filename}, method: MineruFormatter causes error: {e}")
6141
raise
6242
return sample
43+
44+
async def async_process_file(self, sample):
45+
filename = sample[self.filename_key]
46+
filename_without_ext = os.path.splitext(filename)[0]
47+
filepath = sample[self.filepath_key]
48+
parse_dir = os.path.join(self.output_dir, filename_without_ext, "vlm")
49+
pdf_bytes = read_fn(filepath)
50+
total_page = len(PdfReader(filepath).pages)
51+
content = ""
52+
for page in range(0, total_page, 10):
53+
logger.info(f"fileName: {filename}, total_page: {total_page}, page: {page}.")
54+
await aio_do_parse(
55+
output_dir=self.output_dir,
56+
pdf_file_names=[filename_without_ext],
57+
pdf_bytes_list=[pdf_bytes],
58+
p_lang_list=["ch"],
59+
backend=self.backend,
60+
server_url=self.server_url,
61+
start_page_id=page,
62+
end_page_id=min(page + 9, total_page - 1),
63+
)
64+
if os.path.exists(parse_dir):
65+
content += get_infer_result(".md", filename_without_ext, parse_dir)
66+
shutil.rmtree(parse_dir)
67+
return content

0 commit comments

Comments
 (0)