Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,132 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def delete_by_filter(self, filters: Dict[str, Any]) -> int:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
self._ensure_initialized()
assert self._client is not None

try:
normalized_filters = normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = self._client.delete_by_query(index=self._index, body=body)
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
n_docs=deleted_count,
index=self._index,
)
return deleted_count
except Exception as e:
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
self._ensure_initialized()
assert self._async_client is not None

try:
normalized_filters = normalize_filters(filters)
body = {"query": {"bool": {"filter": normalized_filters}}}
result = await self._async_client.delete_by_query(index=self._index, body=body)
deleted_count = result.get("deleted", 0)
logger.info(
"Deleted {n_docs} documents from index '{index}' using filters.",
n_docs=deleted_count,
index=self._index,
)
return deleted_count
except Exception as e:
msg = f"Failed to delete documents by filter from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
"""
Updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:returns: The number of documents updated.
"""
self._ensure_initialized()
assert self._client is not None

try:
normalized_filters = normalize_filters(filters)
# Build the update script to modify metadata fields
# Documents are stored with flattened metadata, so update fields directly in ctx._source
update_script_lines = []
for key in meta.keys():
update_script_lines.append(f"ctx._source.{key} = params.{key};")
update_script = " ".join(update_script_lines)

body = {
"query": {"bool": {"filter": normalized_filters}},
"script": {"source": update_script, "params": meta, "lang": "painless"},
}
result = self._client.update_by_query(index=self._index, body=body)
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
n_docs=updated_count,
index=self._index,
)
return updated_count
except Exception as e:
msg = f"Failed to update documents by filter in OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:returns: The number of documents updated.
"""
self._ensure_initialized()
assert self._async_client is not None

try:
normalized_filters = normalize_filters(filters)
# Build the update script to modify metadata fields
# Documents are stored with flattened metadata, so update fields directly in ctx._source
update_script_lines = []
for key in meta.keys():
update_script_lines.append(f"ctx._source.{key} = params.{key};")
update_script = " ".join(update_script_lines)

body = {
"query": {"bool": {"filter": normalized_filters}},
"script": {"source": update_script, "params": meta, "lang": "painless"},
}
result = await self._async_client.update_by_query(index=self._index, body=body)
updated_count = result.get("updated", 0)
logger.info(
"Updated {n_docs} documents in index '{index}' using filters.",
n_docs=updated_count,
index=self._index,
)
return updated_count
except Exception as e:
msg = f"Failed to update documents by filter in OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def _prepare_bm25_search_request(
self,
*,
Expand Down
54 changes: 54 additions & 0 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,3 +523,57 @@ def test_delete_all_documents_no_index_recreation(self, document_store: OpenSear
results = document_store.filter_documents()
assert len(results) == 1
assert results[0].content == "New document after delete all"

def test_delete_by_filter(self, document_store: OpenSearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Delete documents with category="A"
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
time.sleep(2) # wait for deletion to be reflected
assert deleted_count == 2
assert document_store.count_documents() == 1

# Verify only category B remains
remaining_docs = document_store.filter_documents()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

def test_update_by_filter(self, document_store: OpenSearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Update status for category="A" documents
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
time.sleep(2) # wait for update to be reflected
assert updated_count == 2

# Verify the updates
published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"

# Verify category B still has draft status
draft_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "draft"}
)
assert len(draft_docs) == 1
assert draft_docs[0].meta["category"] == "B"
54 changes: 54 additions & 0 deletions integrations/opensearch/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,57 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op
results = await document_store.filter_documents_async()
assert len(results) == 1
assert results[0].content == "New document after delete all"

async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

# Delete documents with category="A"
deleted_count = await document_store.delete_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
time.sleep(2) # wait for deletion to be reflected
assert deleted_count == 2
assert await document_store.count_documents_async() == 1

# Verify only category B remains
remaining_docs = await document_store.filter_documents_async()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

async def test_update_by_filter_async(self, document_store: OpenSearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

# Update status for category="A" documents
updated_count = await document_store.update_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
time.sleep(2) # wait for update to be reflected
assert updated_count == 2

# Verify the updates
published_docs = await document_store.filter_documents_async(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"

# Verify category B still has draft status
draft_docs = await document_store.filter_documents_async(
filters={"field": "meta.status", "operator": "==", "value": "draft"}
)
assert len(draft_docs) == 1
assert draft_docs[0].meta["category"] == "B"