diff --git a/integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py b/integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py index f7fadbbed8..84c126dcc6 100644 --- a/integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py +++ b/integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py @@ -9,6 +9,7 @@ from chromadb.api.types import GetResult, QueryResult from haystack import default_from_dict, default_to_dict, logging from haystack.dataclasses import Document +from haystack.document_stores.errors import DocumentStoreError from haystack.document_stores.types import DuplicatePolicy from .filters import _convert_filters @@ -113,6 +114,8 @@ def _ensure_initialized(self): # Local persistent storage client = chromadb.PersistentClient(path=self._persist_path) + self._client = client # store client for potential future use + self._metadata = self._metadata or {} if "hnsw:space" not in self._metadata: self._metadata["hnsw:space"] = self._distance_function @@ -149,6 +152,8 @@ async def _ensure_initialized_async(self): port=self._port, ) + self._async_client = client # store client for potential future use + self._metadata = self._metadata or {} if "hnsw:space" not in self._metadata: self._metadata["hnsw:space"] = self._distance_function @@ -408,6 +413,82 @@ async def delete_documents_async(self, document_ids: List[str]) -> None: await self._async_collection.delete(ids=document_ids) + def delete_all_documents(self, *, recreate_index: bool = False) -> None: + """ + Deletes all documents in the document store. + + A fast way to clear all documents from the document store while preserving any collection settings and mappings. + :param recreate_index: Whether to recreate the index after deleting all documents. + """ + self._ensure_initialized() # _ensure_initialized ensures _client is not None and an collection exists + assert self._collection is not None + + if recreate_index: + # Store existing collection metadata and embedding function + metadata = self._collection.metadata + embedding_function = self._collection._embedding_function + collection_name = self._collection_name + + # Delete the collection + self._client.delete_collection(name=collection_name) + + # Recreate the collection with previous metadata + self._collection = self._client.create_collection( + name=collection_name, + metadata=metadata, + embedding_function=embedding_function, + ) + + else: + collection = self._collection.get() + ids = collection.get("ids", []) + self._collection.delete(ids=ids) # type: ignore + logger.info( + "Deleted all the {n_docs} documents from the collection '{name}'.", + name=self._collection_name, + n_docs=len(ids), + ) + + async def delete_all_documents_async(self, *, recreate_index: bool = False) -> None: + """ + Asynchronously deletes all documents in the document store. + + A fast way to clear all documents from the document store while preserving any collection settings and mappings. + :param recreate_index: Whether to recreate the index after deleting all documents. + """ + await self._ensure_initialized_async() # ensures _async_client is not None + assert self._async_collection is not None + + try: + if recreate_index: + # Store existing collection metadata and embedding function + metadata = self._async_collection.metadata + embedding_function = self._async_collection._embedding_function + collection_name = self._collection_name + + # Delete the collection + await self._async_client.delete_collection(name=collection_name) + + # Recreate the collection with previous metadata + self._async_collection = await self._async_client.create_collection( + name=collection_name, + metadata=metadata, + embedding_function=embedding_function, + ) + else: + collection = await self._async_collection.get() + ids = collection.get("ids", []) + await self._async_collection.delete(ids=ids) # type: ignore + logger.info( + "Deleted all the {n_docs} documents from the collection '{name}'.", + name=self._collection_name, + n_docs=len(ids), + ) + + except Exception as e: + msg = f"Failed to delete all documents from ChromaDB: {e!s}" + raise DocumentStoreError(msg) from e + def search( self, queries: List[str], diff --git a/integrations/chroma/tests/test_document_store.py b/integrations/chroma/tests/test_document_store.py index 9fa16cd952..c0d998551d 100644 --- a/integrations/chroma/tests/test_document_store.py +++ b/integrations/chroma/tests/test_document_store.py @@ -4,6 +4,7 @@ import logging import operator +import time import uuid from typing import List from unittest import mock @@ -381,3 +382,41 @@ def test_search(self): # check that empty filters behave as no filters result_empty_filters = document_store.search(["Third"], filters={}, top_k=1) assert result == result_empty_filters + + def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore): + # write some documents + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] + document_store.write_documents(docs) + + # get the current document_store config + config_before = document_store._collection.get(document_store._collection_name) + + # delete all documents with recreating the index + document_store.delete_all_documents(recreate_index=True) + assert document_store.count_documents() == 0 + + # assure that with the same config + config_after = document_store._collection.get(document_store._collection_name) + + assert config_before == config_after + + # ensure the collection still exists by writing documents again + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + def test_delete_all_documents_no_index_recreation(self, document_store: ChromaDocumentStore): + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + document_store.delete_all_documents() + time.sleep(2) # need to wait for the deletion to be reflected in count_documents + assert document_store.count_documents() == 0 + + new_doc = Document(id="3", content="New document after delete all") + document_store.write_documents([new_doc]) + assert document_store.count_documents() == 1 + + results = document_store.filter_documents() + assert len(results) == 1 + assert results[0].content == "New document after delete all" diff --git a/integrations/chroma/tests/test_document_store_async.py b/integrations/chroma/tests/test_document_store_async.py index e66a926527..a929c4640e 100644 --- a/integrations/chroma/tests/test_document_store_async.py +++ b/integrations/chroma/tests/test_document_store_async.py @@ -120,3 +120,53 @@ async def test_search_async(self): # check that empty filters behave as no filters result_empty_filters = document_store.search(["Third"], filters={}, top_k=1) assert result == result_empty_filters + + @pytest.mark.asyncio + async def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore): + # write some documents + docs = [ + Document(id="1", content="First document", meta={"category": "test"}), + Document(id="2", content="Second document", meta={"category": "test"}), + Document(id="3", content="Third document", meta={"category": "other"}), + ] + await document_store.write_documents_async(docs) + + # get the current document_store config + config_before = await document_store._async_collection.get(document_store._collection_name) + + # delete all documents with recreating the index + await document_store.delete_all_documents_async(recreate_index=True) + assert await document_store.count_documents_async() == 0 + + # assure that with the same config + config_after = await document_store._async_collection.get(document_store._collection_name) + + assert config_before == config_after + + # ensure the collection still exists by writing documents again + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + @pytest.mark.asyncio + async def test_delete_all_documents_async(self, document_store): + docs = [ + Document(id="1", content="First document", meta={"category": "test"}), + Document(id="2", content="Second document", meta={"category": "test"}), + Document(id="3", content="Third document", meta={"category": "other"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # delete all documents + await document_store.delete_all_documents_async() + assert await document_store.count_documents_async() == 0 + + # verify index still exists and can accept new documents and retrieve + new_doc = Document(id="4", content="New document after delete all") + await document_store.write_documents_async([new_doc]) + assert await document_store.count_documents_async() == 1 + + results = await document_store.filter_documents_async() + assert len(results) == 1 + assert results[0].id == "4" + assert results[0].content == "New document after delete all"