Skip to content

Commit e371b6f

Browse files
Create a custom directory loader for RAGAS (#1122)
### Custom Document Loader: RAGASLoader #### Overview The `RAGASLoader` is a custom document loader class that extends the `UnstructuredBaseLoader` from the LangChain library. This loader is designed to handle various file types and load them in different modes. #### Key Features - **Flexible File Handling** - Can process both single file, list of files and directories. - **Support for Multiple Loading Modes** - **"elements"**: Each element of a file is returned as a separate `Document`. This provides the most granular level of document segmentation. - **"paged"**: Elements from the same page are combined into a single `Document`. This mode preserves the page-level structure of the original document. - **"single"**: All elements from a particular file are combined into a single `Document`. - **Loading Capabilities** - **Synchronous Loading** - `lazy_load()`: For synchronous loading. - **Asynchronous Loading** - `lazy_aload()`: For asynchronous loading. - **Automatic Encoding Detection** - Handles encoding issues seamlessly. - **Metadata Extraction** - Extracts metadata for loaded documents, including source file path and raw content. - **Error Handling and Logging** - Manages various potential issues during loading with robust error handling and logging mechanisms. - **Support for Multiple File Types** - Supports various file types, including `xml`, `md`, `txt`, `html`, `ppt`, `ppx`, and `pdf`. - **Utilization of the unstructured Library** - Leverages the unstructured library for parsing various document types.
1 parent 95d8318 commit e371b6f

File tree

4 files changed

+348
-0
lines changed

4 files changed

+348
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ragas_experimental.testset.loaders.ragas_loader import RAGASLoader
2+
3+
__all__ = ["RAGASLoader"]
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
from typing import Iterator, List, Dict, Any, Union, AsyncIterator
2+
from pathlib import Path
3+
import os
4+
import subprocess
5+
import asyncio
6+
import aiofiles
7+
import logging
8+
from langchain_core.documents import Document
9+
from langchain_community.document_loaders.unstructured import UnstructuredBaseLoader
10+
from langchain_community.document_loaders.helpers import detect_file_encodings
11+
12+
# Setup logging
13+
logger = logging.getLogger(__name__)
14+
15+
class RAGASLoader(UnstructuredBaseLoader):
16+
def __init__(
17+
self,
18+
file_path: Union[str, Path, List[Union[str, Path]]],
19+
mode: str = "single",
20+
autodetect_encoding: bool = False,
21+
**unstructured_kwargs: Any
22+
):
23+
super().__init__(mode=mode, **unstructured_kwargs)
24+
self.autodetect_encoding = autodetect_encoding
25+
if isinstance(file_path, list):
26+
self.file_paths = [Path(fp) for fp in file_path]
27+
else:
28+
self.file_paths = [Path(file_path)]
29+
30+
def _get_elements(self, file_path: Path) -> List:
31+
from unstructured.partition.auto import partition
32+
try:
33+
return partition(filename=str(file_path), **self.unstructured_kwargs)
34+
except Exception as e:
35+
logger.error("Error in _get_elements for file %s: %s", file_path, e)
36+
return []
37+
38+
def _get_metadata(self, file_path: Path) -> Dict:
39+
try:
40+
return {
41+
"source": str(file_path),
42+
"raw_content": self._read_file(file_path)
43+
}
44+
except Exception as e:
45+
logger.error("Error in _get_metadata for file %s: %s", file_path, e)
46+
return {"source": str(file_path), "raw_content": ""}
47+
48+
async def _aget_metadata(self, file_path: Path) -> Dict:
49+
try:
50+
return {
51+
"source": str(file_path),
52+
"raw_content": await self._aread_file(file_path)
53+
}
54+
except Exception as e:
55+
logger.error("Error in _aget_metadata for file %s: %s", file_path, e)
56+
return {"source": str(file_path), "raw_content": ""}
57+
58+
def _read_file(self, file_path: Path) -> str:
59+
try:
60+
with open(file_path, 'r', encoding='utf-8') as f:
61+
return f.read()
62+
except UnicodeDecodeError as e:
63+
if self.autodetect_encoding:
64+
detected_encodings = detect_file_encodings(file_path)
65+
for encoding in detected_encodings:
66+
try:
67+
with file_path.open(encoding=encoding.encoding) as f:
68+
return f.read()
69+
except UnicodeDecodeError:
70+
continue
71+
logger.error("Failed to decode %s with detected encodings.", file_path)
72+
raise RuntimeError(f"Failed to decode {file_path} with detected encodings.")
73+
else:
74+
logger.error("Error loading %s due to encoding issue.", file_path, exc_info=e)
75+
raise RuntimeError(f"Error loading {file_path} due to encoding issue.") from e
76+
except Exception as e:
77+
logger.error("Error loading %s due to an unexpected error: %s", file_path, e)
78+
raise RuntimeError(f"Error loading {file_path} due to an unexpected error: {e}") from e
79+
80+
async def _aread_file(self, file_path: Path) -> str:
81+
try:
82+
async with aiofiles.open(file_path, mode='r', encoding='utf-8') as f:
83+
return await f.read()
84+
except UnicodeDecodeError as e:
85+
if self.autodetect_encoding:
86+
detected_encodings = await asyncio.to_thread(detect_file_encodings, file_path)
87+
for encoding in detected_encodings:
88+
try:
89+
async with aiofiles.open(file_path, mode='r', encoding=encoding.encoding) as f:
90+
return await f.read()
91+
except UnicodeDecodeError:
92+
continue
93+
logger.error("Failed to decode %s with detected encodings.", file_path)
94+
raise RuntimeError(f"Failed to decode {file_path} with detected encodings.")
95+
else:
96+
logger.error("Error loading %s due to encoding issue.", file_path, exc_info=e)
97+
raise RuntimeError(f"Error loading {file_path} due to encoding issue.") from e
98+
except Exception as e:
99+
logger.error("Error loading %s due to an unexpected error: %s", file_path, e)
100+
raise RuntimeError(f"Error loading {file_path} due to an unexpected error: {e}") from e
101+
102+
def _load_directory(self, directory: Path) -> Iterator[Document]:
103+
file_extensions = ['xml', 'md', 'txt', 'html', 'ppt', 'ppx','pdf']
104+
for root, _, files in os.walk(directory):
105+
for file in files:
106+
if any(file.endswith(ext) for ext in file_extensions):
107+
file_path = Path(root) / file
108+
if file.endswith('pdf'):
109+
output_dir = Path("experimental_notebook/markdown")
110+
output_file = output_dir / file_path.stem / file.replace('.pdf', '.md')
111+
command = [
112+
"marker_single",
113+
str(file_path),
114+
str(output_dir),
115+
"--batch_multiplier", "2",
116+
"--max_pages", "10",
117+
"--langs", "English"
118+
]
119+
try:
120+
result = subprocess.run(command, check=True, capture_output=True, text=True)
121+
logger.info("Processed %s to %s", file_path, output_file)
122+
file_path = output_file
123+
except subprocess.CalledProcessError as e:
124+
logger.error("An error occurred while processing %s:", file_path)
125+
logger.error(e.stderr)
126+
continue
127+
except FileNotFoundError:
128+
logger.error("The 'marker_single' command was not found. Make sure it's installed and in your PATH.")
129+
continue
130+
131+
if file_path.is_file():
132+
yield from self._load_file(file_path)
133+
134+
async def _aload_directory(self, directory: Path) -> AsyncIterator[Document]:
135+
file_extensions = ['xml', 'md', 'txt', 'html', 'ppt', 'ppx', 'pdf']
136+
for root, _, files in os.walk(directory):
137+
for file in files:
138+
if any(file.endswith(ext) for ext in file_extensions):
139+
file_path = Path(root) / file
140+
if file.endswith('pdf'):
141+
output_dir = Path("experimental_notebook/markdown")
142+
output_file = output_dir / file_path.stem / file.replace('.pdf', '.md')
143+
command = [
144+
"marker_single",
145+
str(file_path),
146+
str(output_dir),
147+
"--batch_multiplier", "2",
148+
"--max_pages", "10",
149+
"--langs", "English"
150+
]
151+
try:
152+
result = subprocess.run(command, check=True, capture_output=True, text=True)
153+
logger.info("Processed %s to %s", file_path, output_file)
154+
file_path = output_file
155+
except subprocess.CalledProcessError as e:
156+
logger.error("An error occurred while processing %s:", file_path)
157+
logger.error(e.stderr)
158+
continue
159+
except FileNotFoundError:
160+
logger.error("The 'marker_single' command was not found. Make sure it's installed and in your PATH.")
161+
continue
162+
163+
if file_path.is_file():
164+
async for document in self._aload_file(file_path):
165+
yield document
166+
167+
def lazy_load(self) -> Iterator[Document]:
168+
try:
169+
for file_path in self.file_paths:
170+
if file_path.is_dir():
171+
yield from self._load_directory(file_path)
172+
elif file_path.is_file():
173+
yield from self._load_file(file_path)
174+
else:
175+
logger.error("The path %s does not exist or is neither a directory nor a file.", file_path)
176+
raise ValueError(f"The path {file_path} does not exist or is neither a directory nor a file.")
177+
except Exception as e:
178+
logger.error("Error loading file or directory: %s", e)
179+
raise RuntimeError(f"Error loading file or directory: {e}")
180+
181+
async def lazy_aload(self) -> AsyncIterator[Document]:
182+
try:
183+
for file_path in self.file_paths:
184+
if file_path.is_dir():
185+
async for document in self._aload_directory(file_path):
186+
yield document
187+
elif file_path.is_file():
188+
async for document in self._aload_file(file_path):
189+
yield document
190+
else:
191+
logger.error("The path %s does not exist or is neither a directory nor a file.", file_path)
192+
raise ValueError(f"The path {file_path} does not exist or is neither a directory nor a file.")
193+
except Exception as e:
194+
logger.error("Error loading file or directory: %s", e)
195+
raise RuntimeError(f"Error loading file or directory: {e}")
196+
197+
def _load_file(self, file_path: Path) -> Iterator[Document]:
198+
"""Load file."""
199+
elements = self._get_elements(file_path)
200+
self._post_process_elements(elements)
201+
if self.mode == "elements":
202+
for element in elements:
203+
metadata = self._get_metadata(file_path)
204+
if hasattr(element, "metadata"):
205+
metadata.update(element.metadata.to_dict())
206+
if hasattr(element, "category"):
207+
metadata["category"] = element.category
208+
yield Document(page_content=str(element), metadata=metadata)
209+
elif self.mode == "paged":
210+
text_dict: Dict[int, str] = {}
211+
meta_dict: Dict[int, Dict] = {}
212+
213+
for idx, element in enumerate(elements):
214+
metadata = self._get_metadata(file_path)
215+
if hasattr(element, "metadata"):
216+
metadata.update(element.metadata.to_dict())
217+
page_number = metadata.get("page_number", 1)
218+
219+
# Check if this page_number already exists in docs_dict
220+
if page_number not in text_dict:
221+
# If not, create new entry with initial text and metadata
222+
text_dict[page_number] = str(element) + "\n\n"
223+
meta_dict[page_number] = metadata
224+
else:
225+
# If exists, append to text and update the metadata
226+
text_dict[page_number] += str(element) + "\n\n"
227+
meta_dict[page_number].update(metadata)
228+
229+
# Convert the dict to a list of Document objects
230+
for key in text_dict.keys():
231+
yield Document(page_content=text_dict[key], metadata=meta_dict[key])
232+
elif self.mode == "single":
233+
metadata = self._get_metadata(file_path)
234+
text = "\n\n".join([str(el) for el in elements])
235+
yield Document(page_content=text, metadata=metadata)
236+
else:
237+
logger.error("Mode %s not supported.", self.mode)
238+
raise ValueError(f"mode of {self.mode} not supported.")
239+
240+
async def _aload_file(self, file_path: Path) -> AsyncIterator[Document]:
241+
elements = await asyncio.to_thread(self._get_elements, file_path)
242+
self._post_process_elements(elements)
243+
metadata = await self._aget_metadata(file_path)
244+
if self.mode == "elements":
245+
for element in elements:
246+
element_metadata = metadata.copy()
247+
if hasattr(element, "metadata"):
248+
element_metadata.update(element.metadata.to_dict())
249+
if hasattr(element, "category"):
250+
element_metadata["category"] = element.category
251+
yield Document(page_content=str(element), metadata=element_metadata)
252+
elif self.mode == "paged":
253+
text_dict = {}
254+
meta_dict = {}
255+
for idx, element in enumerate(elements):
256+
element_metadata = metadata.copy()
257+
if hasattr(element, "metadata"):
258+
element_metadata.update(element.metadata.to_dict())
259+
page_number = element_metadata.get("page_number", 1)
260+
261+
if page_number not in text_dict:
262+
text_dict[page_number] = str(element) + "\n\n"
263+
meta_dict[page_number] = element_metadata
264+
else:
265+
text_dict[page_number] += str(element) + "\n\n"
266+
meta_dict[page_number].update(element_metadata)
267+
268+
for key in text_dict.keys():
269+
yield Document(page_content=text_dict[key], metadata=meta_dict[key])
270+
elif self.mode == "single":
271+
text = "\n\n".join([str(el) for el in elements])
272+
yield Document(page_content=text, metadata=metadata)
273+
else:
274+
logger.error("Mode %s not supported.", self.mode)
275+
raise ValueError(f"mode of {self.mode} not supported.")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
import traceback
3+
import asyncio
4+
from ragas_loader import RAGASLoader
5+
6+
# Setup logging
7+
logger = logging.getLogger(__name__)
8+
9+
async def main():
10+
loader = RAGASLoader(file_path="./path", mode="single", autodetect_encoding=False)
11+
try:
12+
docs = [doc async for doc in loader.lazy_aload()]
13+
logger.info("Number of documents loaded: %d", len(docs))
14+
for i, doc in enumerate(docs):
15+
file_name = doc.metadata["source"].split("/")[-1]
16+
17+
logger.info("\n%s", '=' * 80)
18+
logger.info("File %d: %s", i, file_name)
19+
logger.info("%s", '=' * 80)
20+
21+
logger.info("\nParsed Content:")
22+
logger.info("%s", '-' * 40)
23+
logger.info("%s", doc.page_content)
24+
25+
logger.info("\nRaw Content (first 1000 characters):")
26+
logger.info("%s", '-' * 40)
27+
logger.info("%s", doc.metadata["raw_content"][:5000])
28+
29+
logger.info("\n%s\n", '=' * 80)
30+
except Exception as e:
31+
logger.error("Error in main execution: %s", e)
32+
logger.error("%s", traceback.format_exc())
33+
34+
# Test Async Loading
35+
if __name__ == "__main__":
36+
asyncio.run(main())
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import logging
2+
import traceback
3+
from ragas_loader import RAGASLoader
4+
5+
# Setup logging
6+
logger = logging.getLogger(__name__)
7+
8+
def main():
9+
loader = RAGASLoader(file_path="./path", mode="single", autodetect_encoding=False)
10+
try:
11+
docs = loader.load()
12+
logger.info("Number of documents loaded: %d", len(docs))
13+
for i, doc in enumerate(docs):
14+
file_name = doc.metadata["source"].split("/")[-1]
15+
16+
logger.info("\n%s", '=' * 80)
17+
logger.info("File %d: %s", i, file_name)
18+
logger.info("%s", '=' * 80)
19+
20+
logger.info("\nParsed Content (first 1000 characters):")
21+
logger.info("%s", '-' * 40)
22+
logger.info("%s", doc.page_content[:1000])
23+
24+
logger.info("\nRaw Content (first 1000 characters):")
25+
logger.info("%s", '-' * 40)
26+
logger.info("%s", doc.metadata["raw_content"][:1000])
27+
28+
logger.info("\n%s\n", '=' * 80)
29+
except Exception as e:
30+
logger.error("Error in main execution: %s", e)
31+
logger.error("%s", traceback.format_exc())
32+
33+
if __name__ == "__main__":
34+
main()

0 commit comments

Comments
 (0)