Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ def get_batches_from_generator(iterable: List, n: int) -> Generator:

class QdrantDocumentStore:
"""
A QdrantDocumentStore implementation that you
can use with any Qdrant instance: in-memory, disk-persisted, Docker-based,
and Qdrant Cloud Cluster deployments.
A QdrantDocumentStore implementation that you can use with any Qdrant instance: in-memory, disk-persisted,
Docker-based, and Qdrant Cloud Cluster deployments.

Usage example by creating an in-memory instance:

Expand All @@ -65,7 +64,8 @@ class QdrantDocumentStore:

document_store = QdrantDocumentStore(
":memory:",
recreate_index=True
recreate_index=True,
embedding_dim=5
)
document_store.write_documents([
Document(content="This is first", embedding=[0.0]*5),
Expand Down Expand Up @@ -135,6 +135,8 @@ def __init__(
payload_fields_to_index: Optional[List[dict]] = None,
) -> None:
"""
Initializes a QdrantDocumentStore.

:param location:
If `":memory:"` - use in-memory Qdrant instance.
If `str` - use it as a URL parameter.
Expand Down Expand Up @@ -350,7 +352,7 @@ def filter_documents(
# No need to initialize client here as _get_documents_generator
# will handle client initialization internally

self._validate_filters(filters)
QdrantDocumentStore._validate_filters(filters)
return list(
self._get_documents_generator(
filters,
Expand All @@ -367,7 +369,7 @@ async def filter_documents_async(
# No need to initialize client here as _get_documents_generator_async
# will handle client initialization internally

self._validate_filters(filters)
QdrantDocumentStore._validate_filters(filters)
return [doc async for doc in self._get_documents_generator_async(filters)]

def write_documents(
Expand Down Expand Up @@ -521,6 +523,108 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
"Called QdrantDocumentStore.delete_documents_async() on a non-existing ID",
)

def delete_all_documents(self, recreate_index: bool = False) -> None:
"""
Deletes all documents from the document store.

:param recreate_index: Whether to recreate the index after deleting all documents.
"""

self._initialize_client()
assert self._client is not None

if recreate_index:
# get current collection config as json
collection_info = self._client.get_collection(collection_name=self.index)
info_json = collection_info.model_dump()

# deal with the Optional use_sparse_embeddings
sparse_vectors = info_json["config"]["params"]["sparse_vectors"]
use_sparse_embeddings = sparse_vectors if sparse_vectors else False

# deal with the Optional sparse_idf
hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None)
sparse_idf = hnsw_config if use_sparse_embeddings and hnsw_config else False

# recreate collection
self._set_up_collection(
collection_name=self.index,
embedding_dim=info_json["config"]["params"]["vectors"]["size"],
recreate_collection=True,
similarity=info_json["config"]["params"]["vectors"]["distance"].lower(),
use_sparse_embeddings=use_sparse_embeddings,
sparse_idf=sparse_idf,
on_disk=info_json["config"]["hnsw_config"]["on_disk"],
payload_fields_to_index=info_json["payload_schema"],
)

else:
try:
self._client.delete(
collection_name=self.index,
points_selector=rest.FilterSelector(
filter=rest.Filter(
must=[],
)
),
wait=self.wait_result_from_api,
)
except Exception as e:
logger.warning(
f"Error {e} when calling QdrantDocumentStore.delete_all_documents()",
)

async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
"""
Asynchronously deletes all documents from the document store.

:param recreate_index: Whether to recreate the index after deleting all documents.
"""

await self._initialize_async_client()
assert self._async_client is not None

if recreate_index:
# get current collection config as json
collection_info = await self._async_client.get_collection(collection_name=self.index)
info_json = collection_info.model_dump()

# deal with the Optional use_sparse_embeddings
sparse_vectors = info_json["config"]["params"]["sparse_vectors"]
use_sparse_embeddings = sparse_vectors if sparse_vectors else False

# deal with the Optional sparse_idf
hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None)
sparse_idf = hnsw_config if use_sparse_embeddings and hnsw_config else False

# recreate collection
await self._set_up_collection_async(
collection_name=self.index,
embedding_dim=info_json["config"]["params"]["vectors"]["size"],
recreate_collection=True,
similarity=info_json["config"]["params"]["vectors"]["distance"].lower(),
use_sparse_embeddings=use_sparse_embeddings,
sparse_idf=sparse_idf,
on_disk=info_json["config"]["hnsw_config"]["on_disk"],
payload_fields_to_index=info_json["payload_schema"],
)

else:
try:
await self._async_client.delete(
collection_name=self.index,
points_selector=rest.FilterSelector(
filter=rest.Filter(
must=[],
)
),
wait=self.wait_result_from_api,
)
except Exception as e:
logger.warning(
f"Error {e} when calling QdrantDocumentStore.delete_all_documents_async()",
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QdrantDocumentStore":
"""
Expand Down Expand Up @@ -1214,7 +1318,8 @@ def get_distance(self, similarity: str) -> rest.Distance:

def _create_payload_index(self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None) -> None:
"""
Create payload index for the collection if payload_fields_to_index is provided
Create payload index for the collection if payload_fields_to_index is provided.

See: https://qdrant.tech/documentation/concepts/indexing/#payload-index
"""
if payload_fields_to_index is not None:
Expand All @@ -1233,7 +1338,8 @@ async def _create_payload_index_async(
self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None
) -> None:
"""
Asynchronously create payload index for the collection if payload_fields_to_index is provided
Asynchronously create payload index for the collection if payload_fields_to_index is provided.

See: https://qdrant.tech/documentation/concepts/indexing/#payload-index
"""
if payload_fields_to_index is not None:
Expand Down Expand Up @@ -1261,6 +1367,7 @@ def _set_up_collection(
) -> None:
"""
Sets up the Qdrant collection with the specified parameters.

:param collection_name:
The name of the collection to set up.
:param embedding_dim:
Expand Down Expand Up @@ -1317,6 +1424,7 @@ async def _set_up_collection_async(
) -> None:
"""
Asynchronously sets up the Qdrant collection with the specified parameters.

:param collection_name:
The name of the collection to set up.
:param embedding_dim:
Expand Down Expand Up @@ -1601,7 +1709,8 @@ def _prepare_collection_config(

return vectors_config, sparse_vectors_config

def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None:
@staticmethod
def _validate_filters(filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None:
"""
Validates the filters provided for querying.

Expand Down
38 changes: 38 additions & 0 deletions integrations/qdrant/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,41 @@ def test_set_up_collection_with_dimension_mismatch(self):
):
with pytest.raises(ValueError, match="different vector size"):
document_store._set_up_collection("test_collection", 768, False, "cosine", False, False)

def test_delete_all_documents_no_index_recreation(self, document_store):
document_store._initialize_client()

# write some documents
docs = [Document(id=str(i)) for i in range(5)]
document_store.write_documents(docs)

# delete all documents without recreating the index
document_store.delete_all_documents(recreate_index=False)
assert document_store.count_documents() == 0

# ensure the collection still exists by writing documents again
document_store.write_documents(docs)
assert document_store.count_documents() == 5

def test_delete_all_documents_index_recreation(self, document_store):
document_store._initialize_client()

# write some documents
docs = [Document(id=str(i)) for i in range(5)]
document_store.write_documents(docs)

# get the current document_store config
config_before = document_store._client.get_collection(document_store.index)

# delete all documents with recreating the index
document_store.delete_all_documents(recreate_index=True)
assert document_store.count_documents() == 0

# assure that with the same config
config_after = document_store._client.get_collection(document_store.index)

assert config_before.config == config_after.config

# ensure the collection still exists by writing documents again
document_store.write_documents(docs)
assert document_store.count_documents() == 5
40 changes: 40 additions & 0 deletions integrations/qdrant/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,43 @@ async def test_set_up_collection_with_distance_mismatch_async(self):
):
with pytest.raises(ValueError, match="different similarity"):
await document_store._set_up_collection_async("test_collection", 768, False, "cosine", False, False)

@pytest.mark.asyncio
async def test_delete_all_documents_async_no_index_recreation(self, document_store):
await document_store._initialize_async_client()

# write some documents
docs = [Document(id=str(i)) for i in range(5)]
await document_store.write_documents_async(docs)

# delete all documents without recreating the index
await document_store.delete_all_documents_async(recreate_index=False)
assert await document_store.count_documents_async() == 0

# ensure the collection still exists by writing documents again
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 5

@pytest.mark.asyncio
async def test_delete_all_documents_async_index_recreation(self, document_store):
await document_store._initialize_async_client()

# write some documents
docs = [Document(id=str(i)) for i in range(5)]
await document_store.write_documents_async(docs)

# get the current document_store config
config_before = await document_store._async_client.get_collection(document_store.index)

# delete all documents with recreating the index
await document_store.delete_all_documents_async(recreate_index=True)
assert await document_store.count_documents_async() == 0

# assure that with the same config
config_after = await document_store._async_client.get_collection(document_store.index)

assert config_before.config == config_after.config

# ensure the collection still exists by writing documents again
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 5