diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index c48864134..8d58ba6c8 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -608,6 +608,132 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None msg = f"Failed to delete all documents from OpenSearch: {e!s}" raise DocumentStoreError(msg) from e + def delete_by_filter(self, filters: Dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + self._ensure_initialized() + assert self._client is not None + + try: + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + result = self._client.delete_by_query(index=self._index, body=body) + deleted_count = result.get("deleted", 0) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self._index, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + self._ensure_initialized() + assert self._async_client is not None + + try: + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + result = await self._async_client.delete_by_query(index=self._index, body=body) + deleted_count = result.get("deleted", 0) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self._index, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + self._ensure_initialized() + assert self._client is not None + + try: + normalized_filters = normalize_filters(filters) + # Build the update script to modify metadata fields + # Documents are stored with flattened metadata, so update fields directly in ctx._source + update_script_lines = [] + for key in meta.keys(): + update_script_lines.append(f"ctx._source.{key} = params.{key};") + update_script = " ".join(update_script_lines) + + body = { + "query": {"bool": {"filter": normalized_filters}}, + "script": {"source": update_script, "params": meta, "lang": "painless"}, + } + result = self._client.update_by_query(index=self._index, body=body) + updated_count = result.get("updated", 0) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self._index, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + self._ensure_initialized() + assert self._async_client is not None + + try: + normalized_filters = normalize_filters(filters) + # Build the update script to modify metadata fields + # Documents are stored with flattened metadata, so update fields directly in ctx._source + update_script_lines = [] + for key in meta.keys(): + update_script_lines.append(f"ctx._source.{key} = params.{key};") + update_script = " ".join(update_script_lines) + + body = { + "query": {"bool": {"filter": normalized_filters}}, + "script": {"source": update_script, "params": meta, "lang": "painless"}, + } + result = await self._async_client.update_by_query(index=self._index, body=body) + updated_count = result.get("updated", 0) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self._index, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + def _prepare_bm25_search_request( self, *, diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 72c587922..fdb8e3f41 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -523,3 +523,57 @@ def test_delete_all_documents_no_index_recreation(self, document_store: OpenSear results = document_store.filter_documents() assert len(results) == 1 assert results[0].content == "New document after delete all" + + def test_delete_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Delete documents with category="A" + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + time.sleep(2) # wait for deletion to be reflected + assert deleted_count == 2 + assert document_store.count_documents() == 1 + + # Verify only category B remains + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + def test_update_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update status for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + time.sleep(2) # wait for update to be reflected + assert updated_count == 2 + + # Verify the updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + # Verify category B still has draft status + draft_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B" diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index 65b1555f8..086bdbdcf 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -302,3 +302,57 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op results = await document_store.filter_documents_async() assert len(results) == 1 assert results[0].content == "New document after delete all" + + async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Delete documents with category="A" + deleted_count = await document_store.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + time.sleep(2) # wait for deletion to be reflected + assert deleted_count == 2 + assert await document_store.count_documents_async() == 1 + + # Verify only category B remains + remaining_docs = await document_store.filter_documents_async() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + async def test_update_by_filter_async(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update status for category="A" documents + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + time.sleep(2) # wait for update to be reflected + assert updated_count == 2 + + # Verify the updates + published_docs = await document_store.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + # Verify category B still has draft status + draft_docs = await document_store.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B"