diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index 82be0e883f..af60dcdc76 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -53,9 +53,8 @@ def get_batches_from_generator(iterable: List, n: int) -> Generator: class QdrantDocumentStore: """ - A QdrantDocumentStore implementation that you - can use with any Qdrant instance: in-memory, disk-persisted, Docker-based, - and Qdrant Cloud Cluster deployments. + A QdrantDocumentStore implementation that you can use with any Qdrant instance: in-memory, disk-persisted, + Docker-based, and Qdrant Cloud Cluster deployments. Usage example by creating an in-memory instance: @@ -65,7 +64,8 @@ class QdrantDocumentStore: document_store = QdrantDocumentStore( ":memory:", - recreate_index=True + recreate_index=True, + embedding_dim=5 ) document_store.write_documents([ Document(content="This is first", embedding=[0.0]*5), @@ -135,6 +135,8 @@ def __init__( payload_fields_to_index: Optional[List[dict]] = None, ) -> None: """ + Initializes a QdrantDocumentStore. + :param location: If `":memory:"` - use in-memory Qdrant instance. If `str` - use it as a URL parameter. @@ -350,7 +352,7 @@ def filter_documents( # No need to initialize client here as _get_documents_generator # will handle client initialization internally - self._validate_filters(filters) + QdrantDocumentStore._validate_filters(filters) return list( self._get_documents_generator( filters, @@ -367,7 +369,7 @@ async def filter_documents_async( # No need to initialize client here as _get_documents_generator_async # will handle client initialization internally - self._validate_filters(filters) + QdrantDocumentStore._validate_filters(filters) return [doc async for doc in self._get_documents_generator_async(filters)] def write_documents( @@ -521,6 +523,108 @@ async def delete_documents_async(self, document_ids: List[str]) -> None: "Called QdrantDocumentStore.delete_documents_async() on a non-existing ID", ) + def delete_all_documents(self, recreate_index: bool = False) -> None: + """ + Deletes all documents from the document store. + + :param recreate_index: Whether to recreate the index after deleting all documents. + """ + + self._initialize_client() + assert self._client is not None + + if recreate_index: + # get current collection config as json + collection_info = self._client.get_collection(collection_name=self.index) + info_json = collection_info.model_dump() + + # deal with the Optional use_sparse_embeddings + sparse_vectors = info_json["config"]["params"]["sparse_vectors"] + use_sparse_embeddings = True if sparse_vectors else False + + # deal with the Optional sparse_idf + hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None) + sparse_idf = True if use_sparse_embeddings and hnsw_config else False + + # recreate collection + self._set_up_collection( + collection_name=self.index, + embedding_dim=info_json["config"]["params"]["vectors"]["size"], + recreate_collection=True, + similarity=info_json["config"]["params"]["vectors"]["distance"].lower(), + use_sparse_embeddings=use_sparse_embeddings, + sparse_idf=sparse_idf, + on_disk=info_json["config"]["hnsw_config"]["on_disk"], + payload_fields_to_index=info_json["payload_schema"], + ) + + else: + try: + self._client.delete( + collection_name=self.index, + points_selector=rest.FilterSelector( + filter=rest.Filter( + must=[], + ) + ), + wait=self.wait_result_from_api, + ) + except Exception as e: + logger.warning( + f"Error {e} when calling QdrantDocumentStore.delete_all_documents()", + ) + + async def delete_all_documents_async(self, recreate_index: bool = False) -> None: + """ + Asynchronously deletes all documents from the document store. + + :param recreate_index: Whether to recreate the index after deleting all documents. + """ + + await self._initialize_async_client() + assert self._async_client is not None + + if recreate_index: + # get current collection config as json + collection_info = await self._async_client.get_collection(collection_name=self.index) + info_json = collection_info.model_dump() + + # deal with the Optional use_sparse_embeddings + sparse_vectors = info_json["config"]["params"]["sparse_vectors"] + use_sparse_embeddings = sparse_vectors if sparse_vectors else False + + # deal with the Optional sparse_idf + hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None) + sparse_idf = hnsw_config if use_sparse_embeddings and hnsw_config else False + + # recreate collection + await self._set_up_collection_async( + collection_name=self.index, + embedding_dim=info_json["config"]["params"]["vectors"]["size"], + recreate_collection=True, + similarity=info_json["config"]["params"]["vectors"]["distance"].lower(), + use_sparse_embeddings=use_sparse_embeddings, + sparse_idf=sparse_idf, + on_disk=info_json["config"]["hnsw_config"]["on_disk"], + payload_fields_to_index=info_json["payload_schema"], + ) + + else: + try: + await self._async_client.delete( + collection_name=self.index, + points_selector=rest.FilterSelector( + filter=rest.Filter( + must=[], + ) + ), + wait=self.wait_result_from_api, + ) + except Exception as e: + logger.warning( + f"Error {e} when calling QdrantDocumentStore.delete_all_documents_async()", + ) + @classmethod def from_dict(cls, data: Dict[str, Any]) -> "QdrantDocumentStore": """ @@ -1214,7 +1318,8 @@ def get_distance(self, similarity: str) -> rest.Distance: def _create_payload_index(self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None) -> None: """ - Create payload index for the collection if payload_fields_to_index is provided + Create payload index for the collection if payload_fields_to_index is provided. + See: https://qdrant.tech/documentation/concepts/indexing/#payload-index """ if payload_fields_to_index is not None: @@ -1233,7 +1338,8 @@ async def _create_payload_index_async( self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None ) -> None: """ - Asynchronously create payload index for the collection if payload_fields_to_index is provided + Asynchronously create payload index for the collection if payload_fields_to_index is provided. + See: https://qdrant.tech/documentation/concepts/indexing/#payload-index """ if payload_fields_to_index is not None: @@ -1261,6 +1367,7 @@ def _set_up_collection( ) -> None: """ Sets up the Qdrant collection with the specified parameters. + :param collection_name: The name of the collection to set up. :param embedding_dim: @@ -1317,6 +1424,7 @@ async def _set_up_collection_async( ) -> None: """ Asynchronously sets up the Qdrant collection with the specified parameters. + :param collection_name: The name of the collection to set up. :param embedding_dim: @@ -1601,7 +1709,8 @@ def _prepare_collection_config( return vectors_config, sparse_vectors_config - def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None: + @staticmethod + def _validate_filters(filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None: """ Validates the filters provided for querying. diff --git a/integrations/qdrant/tests/test_document_store.py b/integrations/qdrant/tests/test_document_store.py index 5335299f1b..004c815e79 100644 --- a/integrations/qdrant/tests/test_document_store.py +++ b/integrations/qdrant/tests/test_document_store.py @@ -297,3 +297,41 @@ def test_set_up_collection_with_dimension_mismatch(self): ): with pytest.raises(ValueError, match="different vector size"): document_store._set_up_collection("test_collection", 768, False, "cosine", False, False) + + def test_delete_all_documents_no_index_recreation(self, document_store): + document_store._initialize_client() + + # write some documents + docs = [Document(id=str(i)) for i in range(5)] + document_store.write_documents(docs) + + # delete all documents without recreating the index + document_store.delete_all_documents(recreate_index=False) + assert document_store.count_documents() == 0 + + # ensure the collection still exists by writing documents again + document_store.write_documents(docs) + assert document_store.count_documents() == 5 + + def test_delete_all_documents_index_recreation(self, document_store): + document_store._initialize_client() + + # write some documents + docs = [Document(id=str(i)) for i in range(5)] + document_store.write_documents(docs) + + # get the current document_store config + config_before = document_store._client.get_collection(document_store.index) + + # delete all documents with recreating the index + document_store.delete_all_documents(recreate_index=True) + assert document_store.count_documents() == 0 + + # assure that with the same config + config_after = document_store._client.get_collection(document_store.index) + + assert config_before.config == config_after.config + + # ensure the collection still exists by writing documents again + document_store.write_documents(docs) + assert document_store.count_documents() == 5 diff --git a/integrations/qdrant/tests/test_document_store_async.py b/integrations/qdrant/tests/test_document_store_async.py index fa141f8156..5fbdd8b304 100644 --- a/integrations/qdrant/tests/test_document_store_async.py +++ b/integrations/qdrant/tests/test_document_store_async.py @@ -218,3 +218,43 @@ async def test_set_up_collection_with_distance_mismatch_async(self): ): with pytest.raises(ValueError, match="different similarity"): await document_store._set_up_collection_async("test_collection", 768, False, "cosine", False, False) + + @pytest.mark.asyncio + async def test_delete_all_documents_async_no_index_recreation(self, document_store): + await document_store._initialize_async_client() + + # write some documents + docs = [Document(id=str(i)) for i in range(5)] + await document_store.write_documents_async(docs) + + # delete all documents without recreating the index + await document_store.delete_all_documents_async(recreate_index=False) + assert await document_store.count_documents_async() == 0 + + # ensure the collection still exists by writing documents again + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 5 + + @pytest.mark.asyncio + async def test_delete_all_documents_async_index_recreation(self, document_store): + await document_store._initialize_async_client() + + # write some documents + docs = [Document(id=str(i)) for i in range(5)] + await document_store.write_documents_async(docs) + + # get the current document_store config + config_before = await document_store._async_client.get_collection(document_store.index) + + # delete all documents with recreating the index + await document_store.delete_all_documents_async(recreate_index=True) + assert await document_store.count_documents_async() == 0 + + # assure that with the same config + config_after = await document_store._async_client.get_collection(document_store.index) + + assert config_before.config == config_after.config + + # ensure the collection still exists by writing documents again + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 5