Skip to content

Commit b77f2ba

Browse files
authored
feat: Add async run to DocumentWriter (#8962)
* add async run to DocumentWriter * reno
1 parent bb0e36f commit b77f2ba

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

haystack/components/writers/document_writer.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class DocumentWriter:
2525
Document(content="Python is a popular programming language"),
2626
]
2727
doc_store = InMemoryDocumentStore()
28-
doc_store.write_documents(docs)
28+
writer = DocumentWriter(document_store=doc_store)
29+
writer.run(docs)
2930
```
3031
"""
3132

@@ -100,3 +101,34 @@ def run(self, documents: List[Document], policy: Optional[DuplicatePolicy] = Non
100101

101102
documents_written = self.document_store.write_documents(documents=documents, policy=policy)
102103
return {"documents_written": documents_written}
104+
105+
@component.output_types(documents_written=int)
106+
async def run_async(self, documents: List[Document], policy: Optional[DuplicatePolicy] = None):
107+
"""
108+
Asynchronously run the DocumentWriter on the given input data.
109+
110+
This is the asynchronous version of the `run` method. It has the same parameters and return values
111+
but can be used with `await` in async code.
112+
113+
:param documents:
114+
A list of documents to write to the document store.
115+
:param policy:
116+
The policy to use when encountering duplicate documents.
117+
:returns:
118+
Number of documents written to the document store.
119+
120+
:raises ValueError:
121+
If the specified document store is not found.
122+
:raises TypeError:
123+
If the specified document store does not implement `write_documents_async`.
124+
"""
125+
if policy is None:
126+
policy = self.policy
127+
128+
if not hasattr(self.document_store, "write_documents_async"):
129+
raise TypeError(f"Document store {type(self.document_store).__name__} does not provide async support.")
130+
131+
documents_written = await self.document_store.write_documents_async( # type: ignore
132+
documents=documents, policy=policy
133+
)
134+
return {"documents_written": documents_written}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
features:
3+
- |
4+
Add `run_async` method to `DocumentWriter`. This method supports the same parameters as
5+
the `run` method and relies on the DocumentStore to implement `write_documents_async`.
6+
It returns a coroutine that can be awaited.

test/components/writers/test_document_writer.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,43 @@ def test_run_skip_policy(self):
9292

9393
result = writer.run(documents=documents)
9494
assert result["documents_written"] == 0
95+
96+
@pytest.mark.asyncio
97+
async def test_run_async_invalid_docstore(self):
98+
document_store = document_store_class("MockedDocumentStore")
99+
100+
writer = DocumentWriter(document_store)
101+
documents = [
102+
Document(content="This is the text of a document."),
103+
Document(content="This is the text of another document."),
104+
]
105+
106+
with pytest.raises(TypeError, match="does not provide async support"):
107+
await writer.run_async(documents=documents)
108+
109+
@pytest.mark.asyncio
110+
async def test_run_async(self):
111+
document_store = InMemoryDocumentStore()
112+
writer = DocumentWriter(document_store)
113+
documents = [
114+
Document(content="This is the text of a document."),
115+
Document(content="This is the text of another document."),
116+
]
117+
118+
result = await writer.run_async(documents=documents)
119+
assert result["documents_written"] == 2
120+
121+
@pytest.mark.asyncio
122+
async def test_run_async_skip_policy(self):
123+
document_store = InMemoryDocumentStore()
124+
writer = DocumentWriter(document_store, policy=DuplicatePolicy.SKIP)
125+
documents = [
126+
Document(content="This is the text of a document."),
127+
Document(content="This is the text of another document."),
128+
]
129+
130+
result = await writer.run_async(documents=documents)
131+
assert result["documents_written"] == 2
132+
133+
result = await writer.run_async(documents=documents)
134+
assert result["documents_written"] == 0

0 commit comments

Comments
 (0)