From f5ecaf20c7862a627239d80d616f6128424eca78 Mon Sep 17 00:00:00 2001 From: Chinmay Bansal Date: Sun, 19 Oct 2025 23:21:02 -0700 Subject: [PATCH 1/4] add delete by filter and update by filer to OpenSearchDocumentStore --- .../opensearch/document_store.py | 116 ++++++++++++++++++ .../opensearch/tests/test_document_store.py | 55 +++++++++ .../tests/test_document_store_async.py | 54 ++++++++ 3 files changed, 225 insertions(+) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index c48864134..04ef55ad6 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -608,6 +608,122 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None msg = f"Failed to delete all documents from OpenSearch: {e!s}" raise DocumentStoreError(msg) from e + def delete_by_filter(self, filters: Dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + self._ensure_initialized() + assert self._client is not None + + try: + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + result = self._client.delete_by_query(index=self._index, body=body) + deleted_count = result.get("deleted", 0) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self._index, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + self._ensure_initialized() + assert self._async_client is not None + + try: + normalized_filters = normalize_filters(filters) + body = {"query": {"bool": {"filter": normalized_filters}}} + result = await self._async_client.delete_by_query(index=self._index, body=body) + deleted_count = result.get("deleted", 0) + logger.info( + "Deleted {n_docs} documents from index '{index}' using filters.", + n_docs=deleted_count, + index=self._index, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + self._ensure_initialized() + assert self._client is not None + + try: + normalized_filters = normalize_filters(filters) + # Build the update script to modify metadata fields + update_script = "".join([f"ctx._source.metadata.{key} = params.{key}; " for key in meta.keys()]) + body = { + "query": {"bool": {"filter": normalized_filters}}, + "script": {"source": update_script, "params": meta, "lang": "painless"}, + } + result = self._client.update_by_query(index=self._index, body=body) + updated_count = result.get("updated", 0) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self._index, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + + async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + self._ensure_initialized() + assert self._async_client is not None + + try: + normalized_filters = normalize_filters(filters) + # Build the update script to modify metadata fields + update_script = "".join([f"ctx._source.metadata.{key} = params.{key}; " for key in meta.keys()]) + body = { + "query": {"bool": {"filter": normalized_filters}}, + "script": {"source": update_script, "params": meta, "lang": "painless"}, + } + result = await self._async_client.update_by_query(index=self._index, body=body) + updated_count = result.get("updated", 0) + logger.info( + "Updated {n_docs} documents in index '{index}' using filters.", + n_docs=updated_count, + index=self._index, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + def _prepare_bm25_search_request( self, *, diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 72c587922..a652e29e3 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -523,3 +523,58 @@ def test_delete_all_documents_no_index_recreation(self, document_store: OpenSear results = document_store.filter_documents() assert len(results) == 1 assert results[0].content == "New document after delete all" + + def test_delete_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Delete documents with category="A" + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + time.sleep(2) # wait for deletion to be reflected + # TEMPORARY: Intentionally failing to verify CI/CD runs integration tests + assert deleted_count == 999 # Should be 2, but set to 999 to make it fail + assert document_store.count_documents() == 1 + + # Verify only category B remains + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + def test_update_by_filter(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update status for category="A" documents + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + time.sleep(2) # wait for update to be reflected + assert updated_count == 2 + + # Verify the updates + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + # Verify category B still has draft status + draft_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B" diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index 65b1555f8..086bdbdcf 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -302,3 +302,57 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op results = await document_store.filter_documents_async() assert len(results) == 1 assert results[0].content == "New document after delete all" + + async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Delete documents with category="A" + deleted_count = await document_store.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + time.sleep(2) # wait for deletion to be reflected + assert deleted_count == 2 + assert await document_store.count_documents_async() == 1 + + # Verify only category B remains + remaining_docs = await document_store.filter_documents_async() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + async def test_update_by_filter_async(self, document_store: OpenSearchDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A", "status": "draft"}), + Document(content="Doc 2", meta={"category": "B", "status": "draft"}), + Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update status for category="A" documents + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + time.sleep(2) # wait for update to be reflected + assert updated_count == 2 + + # Verify the updates + published_docs = await document_store.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["category"] == "A" + assert doc.meta["status"] == "published" + + # Verify category B still has draft status + draft_docs = await document_store.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "draft"} + ) + assert len(draft_docs) == 1 + assert draft_docs[0].meta["category"] == "B" From 481b3310b80269812ead236e382f1f4888f79641 Mon Sep 17 00:00:00 2001 From: Chinmay Bansal Date: Sun, 19 Oct 2025 23:29:35 -0700 Subject: [PATCH 2/4] fix temp bug --- integrations/opensearch/tests/test_document_store.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index a652e29e3..fdb8e3f41 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -538,8 +538,7 @@ def test_delete_by_filter(self, document_store: OpenSearchDocumentStore): filters={"field": "meta.category", "operator": "==", "value": "A"} ) time.sleep(2) # wait for deletion to be reflected - # TEMPORARY: Intentionally failing to verify CI/CD runs integration tests - assert deleted_count == 999 # Should be 2, but set to 999 to make it fail + assert deleted_count == 2 assert document_store.count_documents() == 1 # Verify only category B remains From 2e341d7045a3fa9c3f1f353162db50f4983eb561 Mon Sep 17 00:00:00 2001 From: Chinmay Bansal Date: Sun, 19 Oct 2025 23:38:08 -0700 Subject: [PATCH 3/4] metadata field exist --- .../document_stores/opensearch/document_store.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 04ef55ad6..b06d29e4e 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -675,7 +675,12 @@ def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int try: normalized_filters = normalize_filters(filters) # Build the update script to modify metadata fields - update_script = "".join([f"ctx._source.metadata.{key} = params.{key}; " for key in meta.keys()]) + # Ensure metadata object exists before updating fields + update_script_lines = ["if (ctx._source.metadata == null) { ctx._source.metadata = [:]; }"] + for key in meta.keys(): + update_script_lines.append(f"ctx._source.metadata.{key} = params.{key};") + update_script = " ".join(update_script_lines) + body = { "query": {"bool": {"filter": normalized_filters}}, "script": {"source": update_script, "params": meta, "lang": "painless"}, @@ -707,7 +712,12 @@ async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, try: normalized_filters = normalize_filters(filters) # Build the update script to modify metadata fields - update_script = "".join([f"ctx._source.metadata.{key} = params.{key}; " for key in meta.keys()]) + # Ensure metadata object exists before updating fields + update_script_lines = ["if (ctx._source.metadata == null) { ctx._source.metadata = [:]; }"] + for key in meta.keys(): + update_script_lines.append(f"ctx._source.metadata.{key} = params.{key};") + update_script = " ".join(update_script_lines) + body = { "query": {"bool": {"filter": normalized_filters}}, "script": {"source": update_script, "params": meta, "lang": "painless"}, From f113a5e59835efef0bb915ec5238a02a5a1ba8d7 Mon Sep 17 00:00:00 2001 From: Chinmay Bansal Date: Sun, 19 Oct 2025 23:47:33 -0700 Subject: [PATCH 4/4] fix metadata --- .../document_stores/opensearch/document_store.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index b06d29e4e..8d58ba6c8 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -675,10 +675,10 @@ def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int try: normalized_filters = normalize_filters(filters) # Build the update script to modify metadata fields - # Ensure metadata object exists before updating fields - update_script_lines = ["if (ctx._source.metadata == null) { ctx._source.metadata = [:]; }"] + # Documents are stored with flattened metadata, so update fields directly in ctx._source + update_script_lines = [] for key in meta.keys(): - update_script_lines.append(f"ctx._source.metadata.{key} = params.{key};") + update_script_lines.append(f"ctx._source.{key} = params.{key};") update_script = " ".join(update_script_lines) body = { @@ -712,10 +712,10 @@ async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, try: normalized_filters = normalize_filters(filters) # Build the update script to modify metadata fields - # Ensure metadata object exists before updating fields - update_script_lines = ["if (ctx._source.metadata == null) { ctx._source.metadata = [:]; }"] + # Documents are stored with flattened metadata, so update fields directly in ctx._source + update_script_lines = [] for key in meta.keys(): - update_script_lines.append(f"ctx._source.metadata.{key} = params.{key};") + update_script_lines.append(f"ctx._source.{key} = params.{key};") update_script = " ".join(update_script_lines) body = {