Skip to content

Commit bad2937

Browse files
OscarPindaroAmnah199davidsbatista
authored
feat: add run async filter and automerging retriever (#9897)
* added run_async to filter retriever. implementation identical to run * created a test for the FilterRetriever, with the same logic of the original sync test * AutoMergingRetriever has now a run_async method identical to run, but with an async call on the document_store * added async test, which contains async version of original run tests. discovered incorrect async await call in my previous commit * modified documentation to specify that the method is asyncronous * added patch file * fixing typing error for filter_documents_async * updating release notes * fixing unit test --------- Co-authored-by: Amna Mubashar <[email protected]> Co-authored-by: David S. Batista <[email protected]>
1 parent dfd678e commit bad2937

File tree

6 files changed

+393
-1
lines changed

6 files changed

+393
-1
lines changed

haystack/components/retrievers/auto_merging_retriever.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,62 @@ def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Documen
167167
return _try_merge_level(merged_docs, docs_to_return)
168168

169169
return {"documents": _try_merge_level(documents, [])}
170+
171+
@component.output_types(documents=list[Document])
172+
async def run_async(self, documents: list[Document]):
173+
"""
174+
Asynchronously run the AutoMergingRetriever.
175+
176+
Recursively groups documents by their parents and merges them if they meet the threshold,
177+
continuing up the hierarchy until no more merges are possible.
178+
179+
:param documents: List of leaf documents that were matched by a retriever
180+
:returns:
181+
List of documents (could be a mix of different hierarchy levels)
182+
"""
183+
184+
AutoMergingRetriever._check_valid_documents(documents)
185+
186+
async def _get_parent_doc(parent_id: str) -> Document:
187+
# 'ignore' since filter_documents_async is not defined in the Protocol but exists in the implementations
188+
parent_docs = await self.document_store.filter_documents_async( # type: ignore[attr-defined]
189+
{"field": "id", "operator": "==", "value": parent_id}
190+
)
191+
if len(parent_docs) != 1:
192+
raise ValueError(f"Expected 1 parent document with id {parent_id}, found {len(parent_docs)}")
193+
194+
parent_doc = parent_docs[0]
195+
if not parent_doc.meta.get("__children_ids"):
196+
raise ValueError(f"Parent document with id {parent_id} does not have any children.")
197+
198+
return parent_doc
199+
200+
async def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Document]) -> list[Document]:
201+
parent_doc_id_to_child_docs: dict[str, list[Document]] = defaultdict(list) # to group documents by parent
202+
203+
for doc in docs_to_merge:
204+
if doc.meta.get("__parent_id"): # only docs that have parents
205+
parent_doc_id_to_child_docs[doc.meta["__parent_id"]].append(doc)
206+
else:
207+
docs_to_return.append(doc) # keep docs that have no parents
208+
209+
# Process each parent group
210+
merged_docs = []
211+
for parent_doc_id, child_docs in parent_doc_id_to_child_docs.items():
212+
parent_doc = await _get_parent_doc(parent_doc_id)
213+
214+
# Calculate merge score
215+
score = len(child_docs) / len(parent_doc.meta["__children_ids"])
216+
if score > self.threshold:
217+
merged_docs.append(parent_doc) # Merge into parent
218+
else:
219+
docs_to_return.extend(child_docs) # Keep children separate
220+
221+
# if no new merges were made, we're done
222+
if not merged_docs:
223+
return merged_docs + docs_to_return
224+
225+
# Recursively try to merge the next level
226+
return await _try_merge_level(merged_docs, docs_to_return)
227+
228+
return {"documents": await _try_merge_level(documents, [])}

haystack/components/retrievers/filter_retriever.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,18 @@ def run(self, filters: Optional[dict[str, Any]] = None):
9292
A list of retrieved documents.
9393
"""
9494
return {"documents": self.document_store.filter_documents(filters=filters or self.filters)}
95+
96+
@component.output_types(documents=list[Document])
97+
async def run_async(self, filters: Optional[dict[str, Any]] = None):
98+
"""
99+
Asynchronously run the FilterRetriever on the given input data.
100+
101+
:param filters:
102+
A dictionary with filters to narrow down the search space.
103+
If not specified, the FilterRetriever uses the values provided at initialization.
104+
:returns:
105+
A list of retrieved documents.
106+
"""
107+
# 'ignore' since filter_documents_async is not defined in the Protocol but exists in the implementations
108+
out_documents = await self.document_store.filter_documents_async(filters=filters or self.filters) # type: ignore[attr-defined]
109+
return {"documents": out_documents}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
enhancements:
2+
- |
3+
The `FilterRetriever` and `AutoMergingRetriever` components now support asynchronous execution.

test/components/generators/chat/test_openai_responses.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,8 @@ def warm_up(self):
780780
component.warm_up()
781781
assert len(warm_up_calls) == call_count
782782

783-
def test_run(self, openai_mock_responses):
783+
def test_run(self, openai_mock_responses, monkeypatch):
784+
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
784785
chat_messages = [ChatMessage.from_user("What's the capital of France")]
785786
component = OpenAIResponsesChatGenerator(
786787
model="gpt-4", generation_kwargs={"include": ["message.output_text.logprobs"]}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import pytest
6+
7+
from haystack import Document, Pipeline
8+
from haystack.components.preprocessors import HierarchicalDocumentSplitter
9+
from haystack.components.retrievers import InMemoryBM25Retriever
10+
from haystack.components.retrievers.auto_merging_retriever import AutoMergingRetriever
11+
from haystack.document_stores.in_memory import InMemoryDocumentStore
12+
13+
14+
class TestAutoMergingRetrieverAsync:
15+
@pytest.mark.asyncio
16+
async def test_run_missing_parent_id(self):
17+
docs = [Document(content="test", meta={"__level": 1, "__block_size": 10})]
18+
retriever = AutoMergingRetriever(InMemoryDocumentStore())
19+
with pytest.raises(
20+
ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'"
21+
):
22+
await retriever.run_async(documents=docs)
23+
24+
@pytest.mark.asyncio
25+
async def test_run_missing_level(self):
26+
docs = [Document(content="test", meta={"__parent_id": "parent1", "__block_size": 10})]
27+
28+
retriever = AutoMergingRetriever(InMemoryDocumentStore())
29+
with pytest.raises(
30+
ValueError, match="The matched leaf documents do not have the required meta field '__level'"
31+
):
32+
await retriever.run_async(documents=docs)
33+
34+
@pytest.mark.asyncio
35+
async def test_run_missing_block_size(self):
36+
docs = [Document(content="test", meta={"__parent_id": "parent1", "__level": 1})]
37+
38+
retriever = AutoMergingRetriever(InMemoryDocumentStore())
39+
with pytest.raises(
40+
ValueError, match="The matched leaf documents do not have the required meta field '__block_size'"
41+
):
42+
await retriever.run_async(documents=docs)
43+
44+
@pytest.mark.asyncio
45+
async def test_run_mixed_valid_and_invalid_documents(self):
46+
docs = [
47+
Document(content="valid", meta={"__parent_id": "parent1", "__level": 1, "__block_size": 10}),
48+
Document(content="invalid", meta={"__level": 1, "__block_size": 10}),
49+
]
50+
retriever = AutoMergingRetriever(InMemoryDocumentStore())
51+
with pytest.raises(
52+
ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'"
53+
):
54+
await retriever.run_async(documents=docs)
55+
56+
@pytest.mark.asyncio
57+
async def test_run_parent_not_found(self):
58+
doc_store = InMemoryDocumentStore()
59+
retriever = AutoMergingRetriever(doc_store, threshold=0.5)
60+
61+
# a leaf document with a non-existent parent_id
62+
leaf_doc = Document(
63+
content="test", meta={"__parent_id": "non_existent_parent", "__level": 1, "__block_size": 10}
64+
)
65+
66+
with pytest.raises(ValueError, match="Expected 1 parent document with id non_existent_parent, found 0"):
67+
await retriever.run_async([leaf_doc])
68+
69+
@pytest.mark.asyncio
70+
async def test_run_parent_without_children_metadata(self):
71+
"""Test case where a parent document exists but doesn't have the __children_ids metadata field"""
72+
doc_store = InMemoryDocumentStore()
73+
74+
# Create and store a parent document without __children_ids metadata
75+
parent_doc = Document(
76+
content="parent content",
77+
id="parent1",
78+
meta={
79+
"__level": 1, # Add other required metadata
80+
"__block_size": 10,
81+
},
82+
)
83+
doc_store.write_documents([parent_doc])
84+
85+
retriever = AutoMergingRetriever(doc_store, threshold=0.5)
86+
87+
# Create a leaf document that points to this parent
88+
leaf_doc = Document(content="leaf content", meta={"__parent_id": "parent1", "__level": 2, "__block_size": 5})
89+
90+
with pytest.raises(ValueError, match="Parent document with id parent1 does not have any children"):
91+
await retriever.run_async([leaf_doc])
92+
93+
@pytest.mark.asyncio
94+
async def test_run_empty_documents(self):
95+
retriever = AutoMergingRetriever(InMemoryDocumentStore())
96+
assert await retriever.run_async([]) == {"documents": []}
97+
98+
@pytest.mark.asyncio
99+
async def test_run_return_parent_document(self):
100+
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."
101+
102+
docs = [Document(content=text)]
103+
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
104+
docs = builder.run(docs)
105+
106+
# store all non-leaf documents
107+
doc_store_parents = InMemoryDocumentStore()
108+
for doc in docs["documents"]:
109+
if doc.meta["__children_ids"]:
110+
doc_store_parents.write_documents([doc])
111+
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.5)
112+
113+
# assume we retrieved 2 leaf docs from the same parent, the parent document should be returned,
114+
# since it has 3 children and the threshold=0.5, and we retrieved 2 children (2/3 > 0.66(6))
115+
leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
116+
docs = await retriever.run_async(leaf_docs[4:6])
117+
assert len(docs["documents"]) == 1
118+
assert docs["documents"][0].content == "warm glow over the trees. Birds began to sing."
119+
assert len(docs["documents"][0].meta["__children_ids"]) == 3
120+
121+
@pytest.mark.asyncio
122+
async def test_run_return_leafs_document(self):
123+
docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")]
124+
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
125+
docs = builder.run(docs)
126+
127+
doc_store_parents = InMemoryDocumentStore()
128+
for doc in docs["documents"]:
129+
if doc.meta["__level"] == 1:
130+
doc_store_parents.write_documents([doc])
131+
132+
leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
133+
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6)
134+
result = await retriever.run_async([leaf_docs[4]])
135+
136+
assert len(result["documents"]) == 1
137+
assert result["documents"][0].content == "eastern side of "
138+
assert result["documents"][0].meta["__parent_id"] == docs["documents"][2].id
139+
140+
@pytest.mark.asyncio
141+
async def test_run_return_leafs_document_different_parents(self):
142+
docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")]
143+
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
144+
docs = builder.run(docs)
145+
146+
doc_store_parents = InMemoryDocumentStore()
147+
for doc in docs["documents"]:
148+
if doc.meta["__level"] == 1:
149+
doc_store_parents.write_documents([doc])
150+
151+
leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
152+
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6)
153+
result = await retriever.run_async([leaf_docs[4], leaf_docs[3]])
154+
155+
assert len(result["documents"]) == 2
156+
assert result["documents"][0].meta["__parent_id"] != result["documents"][1].meta["__parent_id"]
157+
158+
@pytest.mark.asyncio
159+
async def test_run_go_up_hierarchy_multiple_levels(self):
160+
"""
161+
Test if the retriever can go up the hierarchy multiple levels to find the parent document.
162+
163+
Simulate a scenario where we have 4 leaf-documents that matched some initial query. The leaf-documents
164+
are continuously merged up the hierarchy until the threshold is no longer met.
165+
In this case it goes from the 4th level in the hierarchy up the 1st level.
166+
"""
167+
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."
168+
169+
docs = [Document(content=text)]
170+
builder = HierarchicalDocumentSplitter(block_sizes={6, 4, 2, 1}, split_overlap=0, split_by="word")
171+
docs = builder.run(docs)
172+
173+
# store all non-leaf documents
174+
doc_store_parents = InMemoryDocumentStore()
175+
for doc in docs["documents"]:
176+
if doc.meta["__children_ids"]:
177+
doc_store_parents.write_documents([doc])
178+
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.4)
179+
180+
# simulate a scenario where we have 4 leaf-documents that matched some initial query
181+
retrieved_leaf_docs = [d for d in docs["documents"] if d.content in {"The ", "sun ", "rose ", "early "}]
182+
183+
result = await retriever.run_async(retrieved_leaf_docs)
184+
185+
assert len(result["documents"]) == 1
186+
assert result["documents"][0].content == "The sun rose early in the "
187+
188+
@pytest.mark.asyncio
189+
async def test_run_go_up_hierarchy_multiple_levels_hit_root_document(self):
190+
"""
191+
Test case where we go up hierarchy until the root document, so the root document is returned.
192+
193+
It's the only document in the hierarchy which has no parent.
194+
"""
195+
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."
196+
197+
docs = [Document(content=text)]
198+
builder = HierarchicalDocumentSplitter(block_sizes={6, 4}, split_overlap=0, split_by="word")
199+
docs = builder.run(docs)
200+
201+
# store all non-leaf documents
202+
doc_store_parents = InMemoryDocumentStore()
203+
for doc in docs["documents"]:
204+
if doc.meta["__children_ids"]:
205+
doc_store_parents.write_documents([doc])
206+
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.1) # set a low threshold to hit root document
207+
208+
# simulate a scenario where we have 4 leaf-documents that matched some initial query
209+
retrieved_leaf_docs = [
210+
d
211+
for d in docs["documents"]
212+
if d.content in {"The sun rose early ", "in the ", "morning. It cast a ", "over the trees. Birds "}
213+
]
214+
215+
result = await retriever.run_async(retrieved_leaf_docs)
216+
217+
assert len(result["documents"]) == 1
218+
assert result["documents"][0].meta["__level"] == 0 # hit root document

0 commit comments

Comments
 (0)