Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from chromadb.api.types import GetResult, QueryResult
from haystack import default_from_dict, default_to_dict, logging
from haystack.dataclasses import Document
from haystack.document_stores.errors import DocumentStoreError
from haystack.document_stores.types import DuplicatePolicy

from .filters import _convert_filters
Expand Down Expand Up @@ -113,6 +114,8 @@ def _ensure_initialized(self):
# Local persistent storage
client = chromadb.PersistentClient(path=self._persist_path)

self._client = client # store client for potential future use

self._metadata = self._metadata or {}
if "hnsw:space" not in self._metadata:
self._metadata["hnsw:space"] = self._distance_function
Expand Down Expand Up @@ -149,6 +152,8 @@ async def _ensure_initialized_async(self):
port=self._port,
)

self._async_client = client # store client for potential future use

self._metadata = self._metadata or {}
if "hnsw:space" not in self._metadata:
self._metadata["hnsw:space"] = self._distance_function
Expand Down Expand Up @@ -408,6 +413,82 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:

await self._async_collection.delete(ids=document_ids)

def delete_all_documents(self, *, recreate_index: bool = False) -> None:
"""
Deletes all documents in the document store.

A fast way to clear all documents from the document store while preserving any collection settings and mappings.
:param recreate_index: Whether to recreate the index after deleting all documents.
"""
self._ensure_initialized() # _ensure_initialized ensures _client is not None and an collection exists
assert self._collection is not None

if recreate_index:
# Store existing collection metadata and embedding function
metadata = self._collection.metadata
embedding_function = self._collection._embedding_function
collection_name = self._collection_name

# Delete the collection
self._client.delete_collection(name=collection_name)

# Recreate the collection with previous metadata
self._collection = self._client.create_collection(
name=collection_name,
metadata=metadata,
embedding_function=embedding_function,
)

else:
collection = self._collection.get()
ids = collection.get("ids", [])
self._collection.delete(ids=ids) # type: ignore
logger.info(
"Deleted all the {n_docs} documents from the collection '{name}'.",
name=self._collection_name,
n_docs=len(ids),
)

async def delete_all_documents_async(self, *, recreate_index: bool = False) -> None:
"""
Asynchronously deletes all documents in the document store.

A fast way to clear all documents from the document store while preserving any collection settings and mappings.
:param recreate_index: Whether to recreate the index after deleting all documents.
"""
await self._ensure_initialized_async() # ensures _async_client is not None
assert self._async_collection is not None

try:
if recreate_index:
# Store existing collection metadata and embedding function
metadata = self._async_collection.metadata
embedding_function = self._async_collection._embedding_function
collection_name = self._collection_name

# Delete the collection
await self._async_client.delete_collection(name=collection_name)

# Recreate the collection with previous metadata
self._async_collection = await self._async_client.create_collection(
name=collection_name,
metadata=metadata,
embedding_function=embedding_function,
)
else:
collection = await self._async_collection.get()
ids = collection.get("ids", [])
await self._async_collection.delete(ids=ids) # type: ignore
logger.info(
"Deleted all the {n_docs} documents from the collection '{name}'.",
name=self._collection_name,
n_docs=len(ids),
)

except Exception as e:
msg = f"Failed to delete all documents from ChromaDB: {e!s}"
raise DocumentStoreError(msg) from e

def search(
self,
queries: List[str],
Expand Down
39 changes: 39 additions & 0 deletions integrations/chroma/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import operator
import time
import uuid
from typing import List
from unittest import mock
Expand Down Expand Up @@ -381,3 +382,41 @@ def test_search(self):
# check that empty filters behave as no filters
result_empty_filters = document_store.search(["Third"], filters={}, top_k=1)
assert result == result_empty_filters

def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore):
# write some documents
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
document_store.write_documents(docs)

# get the current document_store config
config_before = document_store._collection.get(document_store._collection_name)

# delete all documents with recreating the index
document_store.delete_all_documents(recreate_index=True)
assert document_store.count_documents() == 0

# assure that with the same config
config_after = document_store._collection.get(document_store._collection_name)

assert config_before == config_after

# ensure the collection still exists by writing documents again
document_store.write_documents(docs)
assert document_store.count_documents() == 2

def test_delete_all_documents_no_index_recreation(self, document_store: ChromaDocumentStore):
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
document_store.write_documents(docs)
assert document_store.count_documents() == 2

document_store.delete_all_documents()
time.sleep(2) # need to wait for the deletion to be reflected in count_documents
assert document_store.count_documents() == 0

new_doc = Document(id="3", content="New document after delete all")
document_store.write_documents([new_doc])
assert document_store.count_documents() == 1

results = document_store.filter_documents()
assert len(results) == 1
assert results[0].content == "New document after delete all"
50 changes: 50 additions & 0 deletions integrations/chroma/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,53 @@ async def test_search_async(self):
# check that empty filters behave as no filters
result_empty_filters = document_store.search(["Third"], filters={}, top_k=1)
assert result == result_empty_filters

@pytest.mark.asyncio
async def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore):
# write some documents
docs = [
Document(id="1", content="First document", meta={"category": "test"}),
Document(id="2", content="Second document", meta={"category": "test"}),
Document(id="3", content="Third document", meta={"category": "other"}),
]
await document_store.write_documents_async(docs)

# get the current document_store config
config_before = await document_store._async_collection.get(document_store._collection_name)

# delete all documents with recreating the index
await document_store.delete_all_documents_async(recreate_index=True)
assert await document_store.count_documents_async() == 0

# assure that with the same config
config_after = await document_store._async_collection.get(document_store._collection_name)

assert config_before == config_after

# ensure the collection still exists by writing documents again
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

@pytest.mark.asyncio
async def test_delete_all_documents_async(self, document_store):
docs = [
Document(id="1", content="First document", meta={"category": "test"}),
Document(id="2", content="Second document", meta={"category": "test"}),
Document(id="3", content="Third document", meta={"category": "other"}),
]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

# delete all documents
await document_store.delete_all_documents_async()
assert await document_store.count_documents_async() == 0

# verify index still exists and can accept new documents and retrieve
new_doc = Document(id="4", content="New document after delete all")
await document_store.write_documents_async([new_doc])
assert await document_store.count_documents_async() == 1

results = await document_store.filter_documents_async()
assert len(results) == 1
assert results[0].id == "4"
assert results[0].content == "New document after delete all"