Skip to content

Commit d5663cc

Browse files
hubert-rutkowski85rbiseck3Filip Knefelmpolomdeepsense
authored
Fix: Milvus error handling (#292)
Add test and improve logging messages --------- Co-authored-by: Roman Isecke <[email protected]> Co-authored-by: Filip Knefel <[email protected]> Co-authored-by: mpolomdeepsense <[email protected]>
1 parent 2ac6c81 commit d5663cc

File tree

6 files changed

+32
-8
lines changed

6 files changed

+32
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### Enhancements
44

55
* **Migrate Vectara Destination Connector to v2**
6+
* **Improved Milvus error handling**
67

78
### Fixes
89
* **Register Neo4j Upload Stager**

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,6 @@ omit=[
7474
"unstructured_ingest/ingest_backoff/*",
7575
"unstructured_ingest/enhanced_dataclass/*"
7676
]
77+
78+
[tool.pytest.ini_options]
79+
asyncio_default_fixture_loop_scope="function"

test/integration/connectors/test_milvus.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,19 @@ def test_precheck_fails_on_nonexistent_collection(collection: str):
174174
uploader.precheck()
175175

176176

177+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
178+
def test_precheck_fails_on_nonexisting_db(collection: str):
179+
uploader = MilvusUploader(
180+
connection_config=MilvusConnectionConfig(uri=DB_URI),
181+
upload_config=MilvusUploaderConfig(db_name="nonexisting_db", collection_name=collection),
182+
)
183+
with pytest.raises(
184+
DestinationConnectionError,
185+
match="database not found",
186+
):
187+
uploader.precheck()
188+
189+
177190
@pytest.mark.parametrize("upload_file_str", ["upload_file_ndjson", "upload_file"])
178191
def test_milvus_stager(
179192
request: TopRequest,

unstructured_ingest/v2/processes/connectors/chroma.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ def precheck(self) -> None:
138138

139139
@DestinationConnectionError.wrap
140140
def upsert_batch(self, collection, batch):
141-
142141
try:
143142
# Chroma wants lists even if there is only one element
144143
# Upserting to prevent duplicates

unstructured_ingest/v2/processes/connectors/milvus.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,18 @@ class MilvusUploader(Uploader):
156156

157157
@DestinationConnectionError.wrap
158158
def precheck(self):
159-
with self.get_client() as client:
160-
if not client.has_collection(self.upload_config.collection_name):
161-
raise DestinationConnectionError(
162-
f"Collection '{self.upload_config.collection_name}' does not exist"
163-
)
159+
from pymilvus import MilvusException
160+
161+
try:
162+
with self.get_client() as client:
163+
if not client.has_collection(self.upload_config.collection_name):
164+
raise DestinationConnectionError(
165+
f"Collection '{self.upload_config.collection_name}' does not exist"
166+
)
167+
except MilvusException as milvus_exception:
168+
raise DestinationConnectionError(
169+
f"failed to precheck Milvus: {str(milvus_exception.message)}"
170+
) from milvus_exception
164171

165172
@contextmanager
166173
def get_client(self) -> Generator["MilvusClient", None, None]:
@@ -197,7 +204,9 @@ def insert_results(self, data: Union[dict, list[dict]]):
197204
try:
198205
res = client.insert(collection_name=self.upload_config.collection_name, data=data)
199206
except MilvusException as milvus_exception:
200-
raise WriteError("failed to upload records to milvus") from milvus_exception
207+
raise WriteError(
208+
f"failed to upload records to Milvus: {str(milvus_exception.message)}"
209+
) from milvus_exception
201210
if "err_count" in res and isinstance(res["err_count"], int) and res["err_count"] > 0:
202211
err_count = res["err_count"]
203212
raise WriteError(f"failed to upload {err_count} docs")

unstructured_ingest/v2/processes/connectors/qdrant/qdrant.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ async def run_data_async(
128128
file_data: FileData,
129129
**kwargs: Any,
130130
) -> None:
131-
132131
batches = list(batch_generator(data, batch_size=self.upload_config.batch_size))
133132
logger.debug(
134133
"Elements split into %i batches of size %i.",

0 commit comments

Comments
 (0)