Skip to content

Commit e557749

Browse files
committed
Fix Milvus document counting to count unique documents not chunks
Changed count_documents(), count_documents_in_collection(), and get_collection_info() to query unique document_ids instead of using get_collection_stats() which returned total chunk count. Added support for legacy chunks without document_id by generating synthetic IDs from URLs for backwards compatibility. Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
1 parent 821190e commit e557749

File tree

1 file changed

+114
-23
lines changed

1 file changed

+114
-23
lines changed

src/db/vector_db_milvus.py

Lines changed: 114 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -912,11 +912,21 @@ async def list_documents(
912912
if not isinstance(metadata, dict):
913913
metadata = {}
914914
doc_id = metadata.get("document_id")
915+
916+
# Handle legacy chunks without document_id
915917
if not doc_id:
916-
logger.warning(
917-
f"DEBUG: Chunk without document_id: {chunk.get('url', 'N/A')}"
918-
)
919-
continue # Skip chunks without document_id
918+
# Generate synthetic document_id from URL for legacy data
919+
url = chunk.get("url", "")
920+
if url:
921+
# Use URL as synthetic document_id for legacy chunks
922+
doc_id = f"legacy_{url}"
923+
logger.debug(
924+
f"Generated synthetic doc_id for legacy chunk: {doc_id}"
925+
)
926+
else:
927+
# Skip chunks with no document_id and no URL
928+
logger.warning("Chunk has no document_id and no URL, skipping")
929+
continue
920930

921931
if doc_id not in docs_by_id:
922932
docs_by_id[doc_id] = {
@@ -975,7 +985,7 @@ async def list_documents(
975985
return []
976986

977987
async def count_documents(self) -> int:
978-
"""Get the current count of documents in the collection."""
988+
"""Get the current count of unique documents in the collection."""
979989
self._ensure_client()
980990
if self.client is None:
981991
warnings.warn("Milvus client is not available. Returning 0.")
@@ -987,11 +997,38 @@ async def count_documents(self) -> int:
987997
return 0
988998

989999
try:
990-
# Get collection statistics
991-
stats = await self.client.get_collection_stats(self.collection_name)
992-
return stats.get("row_count", 0)
1000+
# Ensure collection is loaded
1001+
try:
1002+
if hasattr(self.client, "load_collection"):
1003+
await self.client.load_collection(self.collection_name)
1004+
except Exception:
1005+
pass
1006+
1007+
# Query all chunks and count unique document_ids
1008+
results = await self.client.query(
1009+
self.collection_name,
1010+
filter="id > 0", # PK-indexed filter - reliable and efficient
1011+
output_fields=["url", "metadata"],
1012+
limit=16384, # High limit to get all chunks
1013+
)
1014+
1015+
# Count unique document_ids (including legacy chunks)
1016+
unique_doc_ids = set()
1017+
for chunk in results:
1018+
metadata = chunk.get("metadata", {})
1019+
if isinstance(metadata, dict):
1020+
doc_id = metadata.get("document_id")
1021+
# Handle legacy chunks without document_id
1022+
if not doc_id:
1023+
url = chunk.get("url", "")
1024+
if url:
1025+
doc_id = f"legacy_{url}"
1026+
if doc_id:
1027+
unique_doc_ids.add(doc_id)
1028+
1029+
return len(unique_doc_ids)
9931030
except Exception as e:
994-
warnings.warn(f"Could not get collection stats: {e}")
1031+
warnings.warn(f"Could not count documents: {e}")
9951032
return 0
9961033

9971034
async def list_collections(self) -> list[str]:
@@ -1064,7 +1101,7 @@ async def list_documents_in_collection(
10641101
return []
10651102

10661103
async def count_documents_in_collection(self, collection_name: str) -> int:
1067-
"""Get the current count of documents in a specific collection in Milvus."""
1104+
"""Get the current count of unique documents in a specific collection in Milvus."""
10681105
self._ensure_client()
10691106
if self.client is None:
10701107
warnings.warn("Milvus client is not available. Returning 0.")
@@ -1075,13 +1112,43 @@ async def count_documents_in_collection(self, collection_name: str) -> int:
10751112
if not await self.client.has_collection(collection_name):
10761113
return 0
10771114

1078-
# Get collection statistics for the specific collection
1079-
stats = await self.client.get_collection_stats(collection_name)
1080-
return stats.get("row_count", 0)
1081-
except Exception as e:
1082-
warnings.warn(
1083-
f"Could not get collection stats for '{collection_name}': {e}"
1115+
# Ensure collection is loaded
1116+
try:
1117+
if hasattr(self.client, "load_collection"):
1118+
await self.client.load_collection(collection_name)
1119+
except Exception:
1120+
pass
1121+
1122+
# Query all chunks and count unique document_ids
1123+
results = await self.client.query(
1124+
collection_name,
1125+
filter="id > 0", # PK-indexed filter - reliable and efficient
1126+
output_fields=["url", "metadata"],
1127+
limit=16384, # High limit to get all chunks
10841128
)
1129+
1130+
# Count unique document_ids (including legacy chunks)
1131+
unique_doc_ids = set()
1132+
for chunk in results:
1133+
metadata = chunk.get("metadata", {})
1134+
if isinstance(metadata, dict):
1135+
doc_id = metadata.get("document_id")
1136+
# Handle legacy chunks without document_id
1137+
if not doc_id:
1138+
url = chunk.get("url", "")
1139+
if url:
1140+
doc_id = f"legacy_{url}"
1141+
if doc_id:
1142+
unique_doc_ids.add(doc_id)
1143+
url = chunk.get("url", "")
1144+
if url:
1145+
doc_id = f"legacy_{url}"
1146+
if doc_id:
1147+
unique_doc_ids.add(doc_id)
1148+
1149+
return len(unique_doc_ids)
1150+
except Exception as e:
1151+
warnings.warn(f"Could not count documents for '{collection_name}': {e}")
10851152
return 0
10861153

10871154
async def get_collection_info(
@@ -1172,14 +1239,38 @@ async def get_collection_info(
11721239
"metadata": {"error": "Collection does not exist"},
11731240
}
11741241

1175-
# Get collection statistics
1176-
stats = await self.client.get_collection_stats(target_collection)
1242+
# Count unique documents instead of total chunks
11771243
try:
1178-
if isinstance(stats, dict):
1179-
document_count = stats.get("row_count", 0)
1180-
else:
1181-
# Some clients may return an object; try attribute access
1182-
document_count = getattr(stats, "row_count", 0)
1244+
# Ensure collection is loaded
1245+
try:
1246+
if hasattr(self.client, "load_collection"):
1247+
await self.client.load_collection(target_collection)
1248+
except Exception:
1249+
pass
1250+
1251+
# Query all chunks and count unique document_ids
1252+
results = await self.client.query(
1253+
target_collection,
1254+
filter="id > 0", # PK-indexed filter - reliable and efficient
1255+
output_fields=["url", "metadata"],
1256+
limit=16384, # High limit to get all chunks
1257+
)
1258+
1259+
# Count unique document_ids (including legacy chunks)
1260+
unique_doc_ids = set()
1261+
for chunk in results:
1262+
metadata = chunk.get("metadata", {})
1263+
if isinstance(metadata, dict):
1264+
doc_id = metadata.get("document_id")
1265+
# Handle legacy chunks without document_id
1266+
if not doc_id:
1267+
url = chunk.get("url", "")
1268+
if url:
1269+
doc_id = f"legacy_{url}"
1270+
if doc_id:
1271+
unique_doc_ids.add(doc_id)
1272+
1273+
document_count = len(unique_doc_ids)
11831274
except Exception:
11841275
document_count = 0
11851276

0 commit comments

Comments
 (0)