Skip to content

Commit 9581fea

Browse files
feat: adding async version of InMemoryDocumentStore and associated retrievers (#8963)
* adding classes from experimental * adding release notes * adding tests * merging all into a single class * adding async retriever methods * Update haystack/document_stores/in_memory/document_store.py Co-authored-by: Stefano Fiorucci <[email protected]> * adding missed tests --------- Co-authored-by: Stefano Fiorucci <[email protected]>
1 parent 9da6696 commit 9581fea

File tree

5 files changed

+365
-0
lines changed

5 files changed

+365
-0
lines changed

haystack/components/retrievers/in_memory/bm25_retriever.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,43 @@ def run(
161161

162162
docs = self.document_store.bm25_retrieval(query=query, filters=filters, top_k=top_k, scale_score=scale_score)
163163
return {"documents": docs}
164+
165+
@component.output_types(documents=List[Document])
166+
async def run_async(
167+
self,
168+
query: str,
169+
filters: Optional[Dict[str, Any]] = None,
170+
top_k: Optional[int] = None,
171+
scale_score: Optional[bool] = None,
172+
):
173+
"""
174+
Run the InMemoryBM25Retriever on the given input data.
175+
176+
:param query:
177+
The query string for the Retriever.
178+
:param filters:
179+
A dictionary with filters to narrow down the search space when retrieving documents.
180+
:param top_k:
181+
The maximum number of documents to return.
182+
:param scale_score:
183+
When `True`, scales the score of retrieved documents to a range of 0 to 1, where 1 means extremely relevant.
184+
When `False`, uses raw similarity scores.
185+
:returns:
186+
The retrieved documents.
187+
188+
:raises ValueError:
189+
If the specified DocumentStore is not found or is not a InMemoryDocumentStore instance.
190+
"""
191+
if self.filter_policy == FilterPolicy.MERGE and filters:
192+
filters = {**(self.filters or {}), **filters}
193+
else:
194+
filters = filters or self.filters
195+
if top_k is None:
196+
top_k = self.top_k
197+
if scale_score is None:
198+
scale_score = self.scale_score
199+
200+
docs = await self.document_store.bm25_retrieval_async(
201+
query=query, filters=filters, top_k=top_k, scale_score=scale_score
202+
)
203+
return {"documents": docs}

haystack/components/retrievers/in_memory/embedding_retriever.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,54 @@ def run( # pylint: disable=too-many-positional-arguments
192192
)
193193

194194
return {"documents": docs}
195+
196+
@component.output_types(documents=List[Document])
197+
async def run_async( # pylint: disable=too-many-positional-arguments
198+
self,
199+
query_embedding: List[float],
200+
filters: Optional[Dict[str, Any]] = None,
201+
top_k: Optional[int] = None,
202+
scale_score: Optional[bool] = None,
203+
return_embedding: Optional[bool] = None,
204+
):
205+
"""
206+
Run the InMemoryEmbeddingRetriever on the given input data.
207+
208+
:param query_embedding:
209+
Embedding of the query.
210+
:param filters:
211+
A dictionary with filters to narrow down the search space when retrieving documents.
212+
:param top_k:
213+
The maximum number of documents to return.
214+
:param scale_score:
215+
When `True`, scales the score of retrieved documents to a range of 0 to 1, where 1 means extremely relevant.
216+
When `False`, uses raw similarity scores.
217+
:param return_embedding:
218+
When `True`, returns the embedding of the retrieved documents.
219+
When `False`, returns just the documents, without their embeddings.
220+
:returns:
221+
The retrieved documents.
222+
223+
:raises ValueError:
224+
If the specified DocumentStore is not found or is not an InMemoryDocumentStore instance.
225+
"""
226+
if self.filter_policy == FilterPolicy.MERGE and filters:
227+
filters = {**(self.filters or {}), **filters}
228+
else:
229+
filters = filters or self.filters
230+
if top_k is None:
231+
top_k = self.top_k
232+
if scale_score is None:
233+
scale_score = self.scale_score
234+
if return_embedding is None:
235+
return_embedding = self.return_embedding
236+
237+
docs = await self.document_store.embedding_retrieval_async(
238+
query_embedding=query_embedding,
239+
filters=filters,
240+
top_k=top_k,
241+
scale_score=scale_score,
242+
return_embedding=return_embedding,
243+
)
244+
245+
return {"documents": docs}

haystack/document_stores/in_memory/document_store.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import asyncio
56
import json
67
import math
78
import re
89
import uuid
910
from collections import Counter
11+
from concurrent.futures import ThreadPoolExecutor
1012
from dataclasses import dataclass
1113
from pathlib import Path
1214
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
@@ -65,6 +67,7 @@ def __init__( # pylint: disable=too-many-positional-arguments
6567
bm25_parameters: Optional[Dict] = None,
6668
embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product",
6769
index: Optional[str] = None,
70+
async_executor: Optional[ThreadPoolExecutor] = None,
6871
):
6972
"""
7073
Initializes the DocumentStore.
@@ -79,6 +82,9 @@ def __init__( # pylint: disable=too-many-positional-arguments
7982
about your embedding model.
8083
:param index: A specific index to store the documents. If not specified, a random UUID is used.
8184
Using the same index allows you to store documents across multiple InMemoryDocumentStore instances.
85+
:param async_executor:
86+
Optional ThreadPoolExecutor to use for async calls. If not provided, a single-threaded
87+
executor will be initialized and used.
8288
"""
8389
self.bm25_tokenization_regex = bm25_tokenization_regex
8490
self.tokenizer = re.compile(bm25_tokenization_regex).findall
@@ -105,6 +111,12 @@ def __init__( # pylint: disable=too-many-positional-arguments
105111
if self.index not in _FREQ_VOCAB_FOR_IDF_STORAGES:
106112
_FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter()
107113

114+
self.executor = (
115+
ThreadPoolExecutor(thread_name_prefix=f"async-inmemory-docstore-executor-{id(self)}", max_workers=1)
116+
if async_executor is None
117+
else async_executor
118+
)
119+
108120
@property
109121
def storage(self) -> Dict[str, Document]:
110122
"""
@@ -620,3 +632,91 @@ def _compute_query_embedding_similarity_scores(
620632
scores = [(score + 1) / 2 for score in scores]
621633

622634
return scores
635+
636+
async def count_documents_async(self) -> int:
637+
"""
638+
Returns the number of how many documents are present in the DocumentStore.
639+
"""
640+
return len(self.storage.keys())
641+
642+
async def filter_documents_async(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
643+
"""
644+
Returns the documents that match the filters provided.
645+
646+
For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol
647+
documentation.
648+
649+
:param filters: The filters to apply to the document list.
650+
:returns: A list of Documents that match the given filters.
651+
"""
652+
return await asyncio.get_event_loop().run_in_executor(
653+
self.executor, lambda: self.filter_documents(filters=filters)
654+
)
655+
656+
async def write_documents_async(
657+
self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
658+
) -> int:
659+
"""
660+
Refer to the DocumentStore.write_documents() protocol documentation.
661+
662+
If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
663+
"""
664+
return await asyncio.get_event_loop().run_in_executor(
665+
self.executor, lambda: self.write_documents(documents=documents, policy=policy)
666+
)
667+
668+
async def delete_documents_async(self, document_ids: List[str]) -> None:
669+
"""
670+
Deletes all documents with matching document_ids from the DocumentStore.
671+
672+
:param document_ids: The object_ids to delete.
673+
"""
674+
await asyncio.get_event_loop().run_in_executor(
675+
self.executor, lambda: self.delete_documents(document_ids=document_ids)
676+
)
677+
678+
async def bm25_retrieval_async(
679+
self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
680+
) -> List[Document]:
681+
"""
682+
Retrieves documents that are most relevant to the query using BM25 algorithm.
683+
684+
:param query: The query string.
685+
:param filters: A dictionary with filters to narrow down the search space.
686+
:param top_k: The number of top documents to retrieve. Default is 10.
687+
:param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
688+
:returns: A list of the top_k documents most relevant to the query.
689+
"""
690+
return await asyncio.get_event_loop().run_in_executor(
691+
self.executor,
692+
lambda: self.bm25_retrieval(query=query, filters=filters, top_k=top_k, scale_score=scale_score),
693+
)
694+
695+
async def embedding_retrieval_async( # pylint: disable=too-many-positional-arguments
696+
self,
697+
query_embedding: List[float],
698+
filters: Optional[Dict[str, Any]] = None,
699+
top_k: int = 10,
700+
scale_score: bool = False,
701+
return_embedding: bool = False,
702+
) -> List[Document]:
703+
"""
704+
Retrieves documents that are most similar to the query embedding using a vector similarity metric.
705+
706+
:param query_embedding: Embedding of the query.
707+
:param filters: A dictionary with filters to narrow down the search space.
708+
:param top_k: The number of top documents to retrieve. Default is 10.
709+
:param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
710+
:param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
711+
:returns: A list of the top_k documents most relevant to the query.
712+
"""
713+
return await asyncio.get_event_loop().run_in_executor(
714+
self.executor,
715+
lambda: self.embedding_retrieval(
716+
query_embedding=query_embedding,
717+
filters=filters,
718+
top_k=top_k,
719+
scale_score=scale_score,
720+
return_embedding=return_embedding,
721+
),
722+
)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
The `InMemoryDocumentStore` and the associated `InMemoryBM25Retriever` and `InMemoryEmbeddingRetriever` retrievers now support async mode.

0 commit comments

Comments
 (0)