Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions haystack/components/retrievers/auto_merging_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,61 @@ def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Documen
return _try_merge_level(merged_docs, docs_to_return)

return {"documents": _try_merge_level(documents, [])}

@component.output_types(documents=list[Document])
async def run_async(self, documents: list[Document]):
"""
Asynchronously run the AutoMergingRetriever.

Recursively groups documents by their parents and merges them if they meet the threshold,
continuing up the hierarchy until no more merges are possible.

:param documents: List of leaf documents that were matched by a retriever
:returns:
List of documents (could be a mix of different hierarchy levels)
"""

AutoMergingRetriever._check_valid_documents(documents)

async def _get_parent_doc(parent_id: str) -> Document:
parent_docs = await self.document_store.filter_documents_async(
{"field": "id", "operator": "==", "value": parent_id}
)
if len(parent_docs) != 1:
raise ValueError(f"Expected 1 parent document with id {parent_id}, found {len(parent_docs)}")

parent_doc = parent_docs[0]
if not parent_doc.meta.get("__children_ids"):
raise ValueError(f"Parent document with id {parent_id} does not have any children.")

return parent_doc

async def _try_merge_level(docs_to_merge: list[Document], docs_to_return: list[Document]) -> list[Document]:
parent_doc_id_to_child_docs: dict[str, list[Document]] = defaultdict(list) # to group documents by parent

for doc in docs_to_merge:
if doc.meta.get("__parent_id"): # only docs that have parents
parent_doc_id_to_child_docs[doc.meta["__parent_id"]].append(doc)
else:
docs_to_return.append(doc) # keep docs that have no parents

# Process each parent group
merged_docs = []
for parent_doc_id, child_docs in parent_doc_id_to_child_docs.items():
parent_doc = await _get_parent_doc(parent_doc_id)

# Calculate merge score
score = len(child_docs) / len(parent_doc.meta["__children_ids"])
if score > self.threshold:
merged_docs.append(parent_doc) # Merge into parent
else:
docs_to_return.extend(child_docs) # Keep children separate

# if no new merges were made, we're done
if not merged_docs:
return merged_docs + docs_to_return

# Recursively try to merge the next level
return await _try_merge_level(merged_docs, docs_to_return)

return {"documents": await _try_merge_level(documents, [])}
14 changes: 14 additions & 0 deletions haystack/components/retrievers/filter_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,17 @@ def run(self, filters: Optional[dict[str, Any]] = None):
A list of retrieved documents.
"""
return {"documents": self.document_store.filter_documents(filters=filters or self.filters)}

@component.output_types(documents=list[Document])
async def run_async(self, filters: Optional[dict[str, Any]] = None):
"""
Asynchronously run the FilterRetriever on the given input data.

:param filters:
A dictionary with filters to narrow down the search space.
If not specified, the FilterRetriever uses the values provided at initialization.
:returns:
A list of retrieved documents.
"""
out_documents = await self.document_store.filter_documents_async(filters=filters or self.filters)
return {"documents": out_documents}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
enhancements:
- |
Add run_async method to FilterRetriever and AutoMergingRetriever components to support asynchronous execution.
218 changes: 218 additions & 0 deletions test/components/retrievers/test_auto_merging_retriever_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import pytest

from haystack import Document, Pipeline
from haystack.components.preprocessors import HierarchicalDocumentSplitter
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.retrievers.auto_merging_retriever import AutoMergingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore


class TestAutoMergingRetrieverAsync:
@pytest.mark.asyncio
async def test_run_missing_parent_id(self):
docs = [Document(content="test", meta={"__level": 1, "__block_size": 10})]
retriever = AutoMergingRetriever(InMemoryDocumentStore())
with pytest.raises(
ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'"
):
await retriever.run_async(documents=docs)

@pytest.mark.asyncio
async def test_run_missing_level(self):
docs = [Document(content="test", meta={"__parent_id": "parent1", "__block_size": 10})]

retriever = AutoMergingRetriever(InMemoryDocumentStore())
with pytest.raises(
ValueError, match="The matched leaf documents do not have the required meta field '__level'"
):
await retriever.run_async(documents=docs)

@pytest.mark.asyncio
async def test_run_missing_block_size(self):
docs = [Document(content="test", meta={"__parent_id": "parent1", "__level": 1})]

retriever = AutoMergingRetriever(InMemoryDocumentStore())
with pytest.raises(
ValueError, match="The matched leaf documents do not have the required meta field '__block_size'"
):
await retriever.run_async(documents=docs)

@pytest.mark.asyncio
async def test_run_mixed_valid_and_invalid_documents(self):
docs = [
Document(content="valid", meta={"__parent_id": "parent1", "__level": 1, "__block_size": 10}),
Document(content="invalid", meta={"__level": 1, "__block_size": 10}),
]
retriever = AutoMergingRetriever(InMemoryDocumentStore())
with pytest.raises(
ValueError, match="The matched leaf documents do not have the required meta field '__parent_id'"
):
await retriever.run_async(documents=docs)

@pytest.mark.asyncio
async def test_run_parent_not_found(self):
doc_store = InMemoryDocumentStore()
retriever = AutoMergingRetriever(doc_store, threshold=0.5)

# a leaf document with a non-existent parent_id
leaf_doc = Document(
content="test", meta={"__parent_id": "non_existent_parent", "__level": 1, "__block_size": 10}
)

with pytest.raises(ValueError, match="Expected 1 parent document with id non_existent_parent, found 0"):
await retriever.run_async([leaf_doc])

@pytest.mark.asyncio
async def test_run_parent_without_children_metadata(self):
"""Test case where a parent document exists but doesn't have the __children_ids metadata field"""
doc_store = InMemoryDocumentStore()

# Create and store a parent document without __children_ids metadata
parent_doc = Document(
content="parent content",
id="parent1",
meta={
"__level": 1, # Add other required metadata
"__block_size": 10,
},
)
doc_store.write_documents([parent_doc])

retriever = AutoMergingRetriever(doc_store, threshold=0.5)

# Create a leaf document that points to this parent
leaf_doc = Document(content="leaf content", meta={"__parent_id": "parent1", "__level": 2, "__block_size": 5})

with pytest.raises(ValueError, match="Parent document with id parent1 does not have any children"):
await retriever.run_async([leaf_doc])

@pytest.mark.asyncio
async def test_run_empty_documents(self):
retriever = AutoMergingRetriever(InMemoryDocumentStore())
assert await retriever.run_async([]) == {"documents": []}

@pytest.mark.asyncio
async def test_run_return_parent_document(self):
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."

docs = [Document(content=text)]
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
docs = builder.run(docs)

# store all non-leaf documents
doc_store_parents = InMemoryDocumentStore()
for doc in docs["documents"]:
if doc.meta["__children_ids"]:
doc_store_parents.write_documents([doc])
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.5)

# assume we retrieved 2 leaf docs from the same parent, the parent document should be returned,
# since it has 3 children and the threshold=0.5, and we retrieved 2 children (2/3 > 0.66(6))
leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
docs = await retriever.run_async(leaf_docs[4:6])
assert len(docs["documents"]) == 1
assert docs["documents"][0].content == "warm glow over the trees. Birds began to sing."
assert len(docs["documents"][0].meta["__children_ids"]) == 3

@pytest.mark.asyncio
async def test_run_return_leafs_document(self):
docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")]
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
docs = builder.run(docs)

doc_store_parents = InMemoryDocumentStore()
for doc in docs["documents"]:
if doc.meta["__level"] == 1:
doc_store_parents.write_documents([doc])

leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6)
result = await retriever.run_async([leaf_docs[4]])

assert len(result["documents"]) == 1
assert result["documents"][0].content == "eastern side of "
assert result["documents"][0].meta["__parent_id"] == docs["documents"][2].id

@pytest.mark.asyncio
async def test_run_return_leafs_document_different_parents(self):
docs = [Document(content="The monarch of the wild blue yonder rises from the eastern side of the horizon.")]
builder = HierarchicalDocumentSplitter(block_sizes={10, 3}, split_overlap=0, split_by="word")
docs = builder.run(docs)

doc_store_parents = InMemoryDocumentStore()
for doc in docs["documents"]:
if doc.meta["__level"] == 1:
doc_store_parents.write_documents([doc])

leaf_docs = [doc for doc in docs["documents"] if not doc.meta["__children_ids"]]
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.6)
result = await retriever.run_async([leaf_docs[4], leaf_docs[3]])

assert len(result["documents"]) == 2
assert result["documents"][0].meta["__parent_id"] != result["documents"][1].meta["__parent_id"]

@pytest.mark.asyncio
async def test_run_go_up_hierarchy_multiple_levels(self):
"""
Test if the retriever can go up the hierarchy multiple levels to find the parent document.

Simulate a scenario where we have 4 leaf-documents that matched some initial query. The leaf-documents
are continuously merged up the hierarchy until the threshold is no longer met.
In this case it goes from the 4th level in the hierarchy up the 1st level.
"""
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."

docs = [Document(content=text)]
builder = HierarchicalDocumentSplitter(block_sizes={6, 4, 2, 1}, split_overlap=0, split_by="word")
docs = builder.run(docs)

# store all non-leaf documents
doc_store_parents = InMemoryDocumentStore()
for doc in docs["documents"]:
if doc.meta["__children_ids"]:
doc_store_parents.write_documents([doc])
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.4)

# simulate a scenario where we have 4 leaf-documents that matched some initial query
retrieved_leaf_docs = [d for d in docs["documents"] if d.content in {"The ", "sun ", "rose ", "early "}]

result = await retriever.run_async(retrieved_leaf_docs)

assert len(result["documents"]) == 1
assert result["documents"][0].content == "The sun rose early in the "

@pytest.mark.asyncio
async def test_run_go_up_hierarchy_multiple_levels_hit_root_document(self):
"""
Test case where we go up hierarchy until the root document, so the root document is returned.

It's the only document in the hierarchy which has no parent.
"""
text = "The sun rose early in the morning. It cast a warm glow over the trees. Birds began to sing."

docs = [Document(content=text)]
builder = HierarchicalDocumentSplitter(block_sizes={6, 4}, split_overlap=0, split_by="word")
docs = builder.run(docs)

# store all non-leaf documents
doc_store_parents = InMemoryDocumentStore()
for doc in docs["documents"]:
if doc.meta["__children_ids"]:
doc_store_parents.write_documents([doc])
retriever = AutoMergingRetriever(doc_store_parents, threshold=0.1) # set a low threshold to hit root document

# simulate a scenario where we have 4 leaf-documents that matched some initial query
retrieved_leaf_docs = [
d
for d in docs["documents"]
if d.content in {"The sun rose early ", "in the ", "morning. It cast a ", "over the trees. Birds "}
]

result = await retriever.run_async(retrieved_leaf_docs)

assert len(result["documents"]) == 1
assert result["documents"][0].meta["__level"] == 0 # hit root document
96 changes: 96 additions & 0 deletions test/components/retrievers/test_filter_retriever_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from typing import Any

import pytest

from haystack import AsyncPipeline, DeserializationError, Pipeline
from haystack.components.retrievers.filter_retriever import FilterRetriever
from haystack.dataclasses import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.testing.factory import document_store_class


@pytest.fixture()
def sample_docs():
en_docs = [
Document(content="Javascript is a popular programming language", meta={"lang": "en"}),
Document(content="Python is a popular programming language", meta={"lang": "en"}),
Document(content="A chromosome is a package of DNA ", meta={"lang": "en"}),
]
de_docs = [
Document(content="python ist eine beliebte Programmiersprache", meta={"lang": "de"}),
Document(content="javascript ist eine beliebte Programmiersprache", meta={"lang": "de"}),
]
all_docs = en_docs + de_docs
return {"en_docs": en_docs, "de_docs": de_docs, "all_docs": all_docs}


@pytest.fixture()
def sample_document_store(sample_docs):
doc_store = InMemoryDocumentStore()
doc_store.write_documents(sample_docs["all_docs"])
return doc_store


class TestFilterRetrieverAsync:
@classmethod
def _documents_equal(cls, docs1: list[Document], docs2: list[Document]) -> bool:
# # Order doesn't matter; we sort before comparing
docs1.sort(key=lambda x: x.id)
docs2.sort(key=lambda x: x.id)
return docs1 == docs2

@pytest.mark.asyncio
async def test_retriever_init_filter(self, sample_document_store, sample_docs):
retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "en"})
result = await retriever.run_async()

assert "documents" in result
assert len(result["documents"]) == 3
assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["en_docs"])

@pytest.mark.asyncio
async def test_retriever_runtime_filter(self, sample_document_store, sample_docs):
retriever = FilterRetriever(sample_document_store)
result = await retriever.run_async(filters={"field": "lang", "operator": "==", "value": "en"})

assert "documents" in result
assert len(result["documents"]) == 3
assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["en_docs"])

@pytest.mark.asyncio
async def test_retriever_init_filter_run_filter_override(self, sample_document_store, sample_docs):
retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "en"})
result = await retriever.run_async(filters={"field": "lang", "operator": "==", "value": "de"})

assert "documents" in result
assert len(result["documents"]) == 2
assert TestFilterRetrieverAsync._documents_equal(result["documents"], sample_docs["de_docs"])

@pytest.mark.asyncio
@pytest.mark.integration
async def test_run_with_pipeline(self, sample_document_store, sample_docs):
retriever = FilterRetriever(sample_document_store, filters={"field": "lang", "operator": "==", "value": "de"})

pipeline = AsyncPipeline()
pipeline.add_component("retriever", retriever)
result: dict[str, Any] = await pipeline.run_async(data={"retriever": {}})

assert result
assert "retriever" in result
results_docs = result["retriever"]["documents"]
assert results_docs
assert TestFilterRetrieverAsync._documents_equal(results_docs, sample_docs["de_docs"])

result: dict[str, Any] = await pipeline.run_async(
data={"retriever": {"filters": {"field": "lang", "operator": "==", "value": "en"}}}
)

assert result
assert "retriever" in result
results_docs = result["retriever"]["documents"]
assert results_docs
assert TestFilterRetrieverAsync._documents_equal(results_docs, sample_docs["en_docs"])
Loading