Skip to content

Commit 2c7f6de

Browse files
sdks/python: handle flushing gracefully
1 parent bb84826 commit 2c7f6de

File tree

1 file changed

+25
-12
lines changed

1 file changed

+25
-12
lines changed

sdks/python/apache_beam/ml/rag/ingestion/milvus_search.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,31 @@ def write(self, documents):
288288
if not self._client:
289289
self._client = MilvusClient(
290290
**unpack_dataclass_with_kwargs(self._connection_params))
291-
resp = self._client.upsert(
292-
collection_name=self._write_config.collection_name,
293-
partition_name=self._write_config.partition_name,
294-
data = documents,
295-
timeout=self._write_config.timeout,
296-
**self._write_config.kwargs)
297-
self._client.flush(self._write_config.collection_name)
298-
_LOGGER.debug(
299-
"Upserted into Milvus: upsert_count=%d, cost=%d",
300-
resp.get("upsert_count", 0),
301-
resp.get("cost", 0)
302-
)
291+
292+
try:
293+
resp = self._client.upsert(
294+
collection_name=self._write_config.collection_name,
295+
partition_name=self._write_config.partition_name,
296+
data = documents,
297+
timeout=self._write_config.timeout,
298+
**self._write_config.kwargs)
299+
300+
# Try to flush, but handle connection issues gracefully.
301+
try:
302+
self._client.flush(self._write_config.collection_name)
303+
except Exception as e:
304+
# If flush fails due to connection issues, log but don't fail the write.
305+
_LOGGER.warning(
306+
"Flush operation failed, but upsert was successful: %s",e)
307+
308+
_LOGGER.debug(
309+
"Upserted into Milvus: upsert_count=%d, cost=%d",
310+
resp.get("upsert_count", 0),
311+
resp.get("cost", 0)
312+
)
313+
except Exception as e:
314+
_LOGGER.error("Failed to write to Milvus: %s", e)
315+
raise
303316

304317
def __enter__(self):
305318
"""Enters the context manager and establishes Milvus connection.

0 commit comments

Comments
 (0)