Skip to content

Commit d96e7b2

Browse files
committed
feat(04-02): convert EmlLoader, DocxLoader, and DocLoader to non-blocking I/O
- EmlLoader: Wrap file reads, temp file ops, and cleanup with asyncio.to_thread - Created _read_file_bytes and _write_temp_file sync helpers - Wrap os.path.exists and os.unlink in finally block - Replace manual file write with await self.save_content() - DocxLoader: Wrap MarkItDown converter and zipfile extraction with asyncio.to_thread - await asyncio.to_thread for converter.convert and get_images_from_zip - Updated save_content call to use await - DocLoader: Wrap Spire.Doc operations in thread pool with proper cleanup - Created _convert_doc_to_docx sync helper - Wrap conversion and os.remove with asyncio.to_thread - Proper try/finally for temp file cleanup
1 parent 7820478 commit d96e7b2

File tree

3 files changed

+41
-23
lines changed

3 files changed

+41
-23
lines changed
Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import os
23
import tempfile
34

@@ -14,16 +15,22 @@ def __init__(self, **kwargs) -> None:
1415
super().__init__(**kwargs)
1516
self.MDLoader = DocxLoader(**kwargs)
1617

17-
async def aload_document(self, file_path, metadata, save_markdown=False):
18-
"""Here we convert the document to docx format, save it in local and then use the MarkItDownLoader
19-
to convert it to markdown."""
18+
def _convert_doc_to_docx(self, file_path):
19+
"""Convert .doc to .docx using Spire.Doc (blocking operations in thread pool)."""
2020
document = Document()
2121
document.LoadFromFile(str(file_path))
22-
# file_path = "converted/sample2.docx"
2322
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file:
24-
file_path = temp_file.name
25-
document.SaveToFile(file_path, FileFormat.Docx2016)
26-
result_string = await self.MDLoader.aload_document(file_path, metadata, save_markdown)
27-
os.remove(file_path)
28-
document.Close()
23+
temp_path = temp_file.name
24+
document.SaveToFile(temp_path, FileFormat.Docx2016)
25+
return document, temp_path
26+
27+
async def aload_document(self, file_path, metadata, save_markdown=False):
28+
"""Here we convert the document to docx format, save it in local and then use the MarkItDownLoader
29+
to convert it to markdown."""
30+
document, temp_path = await asyncio.to_thread(self._convert_doc_to_docx, file_path)
31+
try:
32+
result_string = await self.MDLoader.aload_document(temp_path, metadata, save_markdown)
33+
finally:
34+
await asyncio.to_thread(os.remove, temp_path)
35+
document.Close()
2936
return result_string

openrag/components/indexer/loaders/docx.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import re
23
import zipfile
34
from io import BytesIO
@@ -28,11 +29,12 @@ def __init__(self, **kwargs):
2829
self.converter = MarkItDown()
2930

3031
async def aload_document(self, file_path, metadata, save_markdown=False):
31-
result = self.converter.convert(file_path).text_content
32+
convert_result = await asyncio.to_thread(self.converter.convert, file_path)
33+
result = convert_result.text_content
3234

3335
if self.image_captioning:
3436
# Handle embedded images (extracted from docx zip)
35-
images = self.get_images_from_zip(file_path)
37+
images = await asyncio.to_thread(self.get_images_from_zip, file_path)
3638
captions = await self.caption_images(images, desc="Captioning embedded images")
3739
for caption in captions:
3840
result = re.sub(
@@ -54,7 +56,7 @@ async def aload_document(self, file_path, metadata, save_markdown=False):
5456

5557
doc = Document(page_content=result, metadata=metadata)
5658
if save_markdown:
57-
self.save_content(result, str(file_path))
59+
await self.save_content(result, str(file_path))
5860
return doc
5961

6062
def get_images_from_zip(self, input_file):

openrag/components/indexer/loaders/eml_loader.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import datetime
23
import email
34
import email.errors
@@ -28,10 +29,20 @@ def __init__(self, **kwargs):
2829
# Get available loaders for processing attachments
2930
self.loader_classes = get_loader_classes(config=self.config)
3031

32+
def _read_file_bytes(self, file_path):
33+
"""Read file bytes (blocking operation for thread pool)."""
34+
with open(file_path, "rb") as fhdl:
35+
return fhdl.read()
36+
37+
def _write_temp_file(self, suffix, data):
38+
"""Write temp file (blocking operation for thread pool)."""
39+
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_file:
40+
temp_file.write(data)
41+
return temp_file.name
42+
3143
async def aload_document(self, file_path, metadata: dict | None = None, save_markdown: bool = False):
3244
try:
33-
with open(file_path, "rb") as fhdl:
34-
raw_email = fhdl.read()
45+
raw_email = await asyncio.to_thread(self._read_file_bytes, file_path)
3546

3647
# Parse email using standard email library
3748
email_msg = email.message_from_bytes(raw_email)
@@ -131,9 +142,9 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
131142

132143
if loader_cls:
133144
# Save attachment to temporary file
134-
with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as temp_file:
135-
temp_file.write(attachment["raw"])
136-
temp_file_path = temp_file.name
145+
temp_file_path = await asyncio.to_thread(
146+
self._write_temp_file, file_ext, attachment["raw"]
147+
)
137148

138149
try:
139150
# Use appropriate loader to process attachment
@@ -236,8 +247,8 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
236247
attachments_text += f"Text fallback failed: {str(text_e)[:100]}...\n"
237248
finally:
238249
# Clean up temporary file
239-
if os.path.exists(temp_file_path):
240-
os.unlink(temp_file_path)
250+
if await asyncio.to_thread(os.path.exists, temp_file_path):
251+
await asyncio.to_thread(os.unlink, temp_file_path)
241252

242253
# Special handling for images with captioning if no specific loader or captioning is enabled
243254
elif file_ext in [
@@ -321,10 +332,8 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
321332

322333
# Save content body to a file if requested
323334
if save_markdown:
324-
markdown_path = Path(file_path).with_suffix(".md")
325-
with open(markdown_path, "w", encoding="utf-8") as md_file:
326-
md_file.write(content_body)
327-
metadata["markdown_path"] = str(markdown_path)
335+
await self.save_content(content_body, str(file_path))
336+
metadata["markdown_path"] = str(Path(file_path).with_suffix(".md"))
328337
except OSError as e:
329338
# File I/O error reading email file
330339
raise ValueError(f"Cannot read email file: {e}")

0 commit comments

Comments
 (0)