Skip to content

Commit 8ac2c42

Browse files
sdks/python: delegate auto-flushing to milvus backend
1 parent 46a03c8 commit 8ac2c42

File tree

1 file changed

+15
-28
lines changed

1 file changed

+15
-28
lines changed

sdks/python/apache_beam/ml/rag/ingestion/milvus_search.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def expand(self, pcoll: beam.PCollection[Chunk]):
181181
pcoll: PCollection of Chunk objects to write to Milvus.
182182
183183
Returns:
184-
PCollection of the same Chunk objects after writing to Milvus.
184+
PCollection of dictionaries representing the records written to Milvus.
185185
"""
186186
return (
187187
pcoll
@@ -292,33 +292,20 @@ def write(self, documents):
292292
documents: List of dictionaries representing Milvus records to write.
293293
Each dictionary should contain fields matching the collection schema.
294294
"""
295-
if not self._client:
296-
self._client = MilvusClient(
297-
**unpack_dataclass_with_kwargs(self._connection_params))
298-
299-
try:
300-
resp = self._client.upsert(
301-
collection_name=self._write_config.collection_name,
302-
partition_name=self._write_config.partition_name,
303-
data=documents,
304-
timeout=self._write_config.timeout,
305-
**self._write_config.kwargs)
306-
307-
# Try to flush, but handle connection issues gracefully.
308-
try:
309-
self._client.flush(self._write_config.collection_name)
310-
except Exception as e:
311-
# If flush fails due to connection issues, log but don't fail the write.
312-
_LOGGER.warning(
313-
"Flush operation failed, but upsert was successful: %s", e)
314-
315-
_LOGGER.debug(
316-
"Upserted into Milvus: upsert_count=%d, cost=%d",
317-
resp.get("upsert_count", 0),
318-
resp.get("cost", 0))
319-
except Exception as e:
320-
_LOGGER.error("Failed to write to Milvus: %s", e)
321-
raise
295+
self._client = MilvusClient(
296+
**unpack_dataclass_with_kwargs(self._connection_params))
297+
298+
resp = self._client.upsert(
299+
collection_name=self._write_config.collection_name,
300+
partition_name=self._write_config.partition_name,
301+
data=documents,
302+
timeout=self._write_config.timeout,
303+
**self._write_config.kwargs)
304+
305+
_LOGGER.debug(
306+
"Upserted into Milvus: upsert_count=%d, cost=%d",
307+
resp.get("upsert_count", 0),
308+
resp.get("cost", 0))
322309

323310
def __enter__(self):
324311
"""Enters the context manager and establishes Milvus connection.

0 commit comments

Comments
 (0)