Skip to content

Commit 214d639

Browse files
authored
feat(mongodb): add lazy loading support for MongoDB document loader (#334)
- Add lazy_load() method for memory-efficient document streaming - Add alazy_load() async method for async document streaming - Refactor document processing into _process_document() helper method - Add comprehensive tests for lazy loading functionality
1 parent 5e56619 commit 214d639

File tree

2 files changed

+110
-22
lines changed

2 files changed

+110
-22
lines changed

libs/community/langchain_community/document_loaders/mongodb.py

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import logging
3-
from typing import Dict, List, Optional, Sequence
3+
from typing import AsyncIterator, Dict, Iterator, List, Optional, Sequence
44

55
from langchain_core.documents import Document
66

@@ -85,6 +85,47 @@ def load(self) -> List[Document]:
8585
"""
8686
return asyncio.run(self.aload())
8787

88+
def lazy_load(self) -> Iterator[Document]:
89+
"""A lazy loader for MongoDB documents.
90+
91+
Attention:
92+
93+
This implementation starts an asyncio event loop which
94+
will only work if running in a sync env. In an async env, it should
95+
fail since there is already an event loop running.
96+
97+
This code should be updated to kick off the event loop from a separate
98+
thread if running within an async context.
99+
100+
Yields:
101+
Document: A document from the MongoDB collection.
102+
"""
103+
try:
104+
event_loop = asyncio.get_running_loop()
105+
except RuntimeError:
106+
event_loop = asyncio.new_event_loop()
107+
asyncio.set_event_loop(event_loop)
108+
109+
async_generator = self.alazy_load()
110+
111+
while True:
112+
try:
113+
document = event_loop.run_until_complete(async_generator.__anext__())
114+
yield document
115+
except StopAsyncIteration:
116+
break
117+
118+
async def alazy_load(self) -> AsyncIterator[Document]:
119+
"""Asynchronously yields Document objects one at a time.
120+
121+
Yields:
122+
Document: A document from the MongoDB collection.
123+
"""
124+
projection = self._construct_projection()
125+
126+
async for doc in self.collection.find(self.filter_criteria, projection):
127+
yield self._process_document(doc)
128+
88129
async def aload(self) -> List[Document]:
89130
"""Asynchronously loads data into Document objects."""
90131
result = []
@@ -93,26 +134,7 @@ async def aload(self) -> List[Document]:
93134
projection = self._construct_projection()
94135

95136
async for doc in self.collection.find(self.filter_criteria, projection):
96-
metadata = self._extract_fields(doc, self.metadata_names, default="")
97-
98-
# Optionally add database and collection names to metadata
99-
if self.include_db_collection_in_metadata:
100-
metadata.update(
101-
{
102-
"database": self.db_name,
103-
"collection": self.collection_name,
104-
}
105-
)
106-
107-
# Extract text content from filtered fields or use the entire document
108-
if self.field_names is not None:
109-
fields = self._extract_fields(doc, self.field_names, default="")
110-
texts = [str(value) for value in fields.values()]
111-
text = " ".join(texts)
112-
else:
113-
text = str(doc)
114-
115-
result.append(Document(page_content=text, metadata=metadata))
137+
result.append(self._process_document(doc))
116138

117139
if len(result) != total_docs:
118140
logger.warning(
@@ -122,6 +144,33 @@ async def aload(self) -> List[Document]:
122144

123145
return result
124146

147+
def _process_document(self, doc: Dict) -> Document:
148+
"""Process a single MongoDB document into a Document object.
149+
150+
Args:
151+
doc: The MongoDB document dictionary to process into a Document object.
152+
"""
153+
metadata = self._extract_fields(doc, self.metadata_names, default="")
154+
155+
# Optionally add database and collection names to metadata
156+
if self.include_db_collection_in_metadata:
157+
metadata.update(
158+
{
159+
"database": self.db_name,
160+
"collection": self.collection_name,
161+
}
162+
)
163+
164+
# Extract text content from filtered fields or use the entire document
165+
if self.field_names is not None:
166+
fields = self._extract_fields(doc, self.field_names, default="")
167+
texts = [str(value) for value in fields.values()]
168+
text = " ".join(texts)
169+
else:
170+
text = str(doc)
171+
172+
return Document(page_content=text, metadata=metadata)
173+
125174
def _construct_projection(self) -> Optional[Dict]:
126175
"""Constructs the projection dictionary for MongoDB query based
127176
on the specified field names and metadata names."""

libs/community/tests/unit_tests/document_loaders/test_mongodb.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, List
1+
from typing import AsyncIterator, Dict, List
22
from unittest.mock import AsyncMock, MagicMock, patch
33

44
import pytest
@@ -70,3 +70,42 @@ async def test_load_mocked_with_filters(expected_documents: List[Document]) -> N
7070
documents = await loader.aload()
7171

7272
assert documents == expected_documents
73+
74+
75+
@pytest.mark.requires("motor")
76+
def test_lazy_load_mocked_with_filters(expected_documents: List[Document]) -> None:
77+
filter_criteria = {"address.room": {"$eq": "2"}}
78+
field_names = ["address.building", "address.room"]
79+
metadata_names = ["_id"]
80+
include_db_collection_in_metadata = True
81+
82+
async def mock_async_generator() -> AsyncIterator[Document]:
83+
for doc in expected_documents:
84+
yield doc
85+
86+
mock_find = AsyncMock()
87+
mock_find.return_value = mock_async_generator()
88+
89+
mock_collection = MagicMock()
90+
mock_collection.find = mock_find
91+
92+
with (
93+
patch("motor.motor_asyncio.AsyncIOMotorClient", return_value=MagicMock()),
94+
patch(
95+
"langchain_community.document_loaders.mongodb.MongodbLoader.alazy_load",
96+
return_value=mock_async_generator(),
97+
),
98+
):
99+
loader = MongodbLoader(
100+
"mongodb://localhost:27017",
101+
"test_db",
102+
"test_collection",
103+
filter_criteria=filter_criteria,
104+
field_names=field_names,
105+
metadata_names=metadata_names,
106+
include_db_collection_in_metadata=include_db_collection_in_metadata,
107+
)
108+
loader.collection = mock_collection
109+
documents = list(loader.lazy_load())
110+
111+
assert documents == expected_documents

0 commit comments

Comments
 (0)