Skip to content

Commit 390e69f

Browse files
sdks/python: integrate milvus sink I/O
1 parent fa6d2f0 commit 390e69f

File tree

4 files changed

+110
-320
lines changed

4 files changed

+110
-320
lines changed

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@
5757
from apache_beam.ml.rag.enrichment.milvus_search_it_test import (
5858
MilvusEnrichmentTestHelper,
5959
MilvusDBContainerInfo,
60-
parse_chunk_strings,
6160
assert_chunks_equivalent)
61+
from apache_beam.ml.rag.utils import parse_chunk_strings
6262
from apache_beam.io.requestresponse import RequestResponseIO
6363
except ImportError as e:
6464
raise unittest.SkipTest(f'Examples dependencies are not installed: {str(e)}')

sdks/python/apache_beam/ml/rag/enrichment/milvus_search.py

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from typing import Optional
2626
from typing import Tuple
2727
from typing import Union
28+
import uuid
2829

2930
from google.protobuf.json_format import MessageToDict
3031
from pymilvus import AnnSearchRequest
@@ -35,6 +36,7 @@
3536

3637
from apache_beam.ml.rag.types import Chunk
3738
from apache_beam.ml.rag.types import Embedding
39+
from apache_beam.ml.rag.utils import MilvusHelpers, MilvusConnectionParameters
3840
from apache_beam.transforms.enrichment import EnrichmentSourceHandler
3941

4042

@@ -104,44 +106,6 @@ def __str__(self):
104106
return self.dict().__str__()
105107

106108

107-
@dataclass
108-
class MilvusConnectionParameters:
109-
"""Parameters for establishing connections to Milvus servers.
110-
111-
Args:
112-
uri: URI endpoint for connecting to Milvus server in the format
113-
"http(s)://hostname:port".
114-
user: Username for authentication. Required if authentication is enabled and
115-
not using token authentication.
116-
password: Password for authentication. Required if authentication is enabled
117-
and not using token authentication.
118-
db_id: Database ID to connect to. Specifies which Milvus database to use.
119-
Defaults to 'default'.
120-
token: Authentication token as an alternative to username/password.
121-
timeout: Connection timeout in seconds. Uses client default if None.
122-
max_retries: Maximum number of connection retry attempts. Defaults to 3.
123-
retry_delay: Initial delay between retries in seconds. Defaults to 1.0.
124-
retry_backoff_factor: Multiplier for retry delay after each attempt.
125-
Defaults to 2.0 (exponential backoff).
126-
kwargs: Optional keyword arguments for additional connection parameters.
127-
Enables forward compatibility.
128-
"""
129-
uri: str
130-
user: str = field(default_factory=str)
131-
password: str = field(default_factory=str)
132-
db_id: str = "default"
133-
token: str = field(default_factory=str)
134-
timeout: Optional[float] = None
135-
max_retries: int = 3
136-
retry_delay: float = 1.0
137-
retry_backoff_factor: float = 2.0
138-
kwargs: Dict[str, Any] = field(default_factory=dict)
139-
140-
def __post_init__(self):
141-
if not self.uri:
142-
raise ValueError("URI must be provided for Milvus connection")
143-
144-
145109
@dataclass
146110
class BaseSearchParameters:
147111
"""Base parameters for both vector and keyword search operations.
@@ -361,15 +325,15 @@ def __init__(
361325
**kwargs):
362326
"""
363327
Example Usage:
364-
connection_paramters = MilvusConnectionParameters(
328+
connection_parameters = MilvusConnectionParameters(
365329
uri="http://localhost:19530")
366330
search_parameters = MilvusSearchParameters(
367331
collection_name="my_collection",
368332
search_strategy=VectorSearchParameters(anns_field="embedding"))
369333
collection_load_parameters = MilvusCollectionLoadParameters(
370334
load_fields=["embedding", "metadata"]),
371335
milvus_handler = MilvusSearchEnrichmentHandler(
372-
connection_paramters,
336+
connection_parameters,
373337
search_parameters,
374338
collection_load_parameters=collection_load_parameters,
375339
min_batch_size=10,
@@ -534,10 +498,7 @@ def _get_keyword_search_data(self, chunk: Chunk):
534498
raise ValueError(
535499
f"Chunk {chunk.id} missing both text content and sparse embedding "
536500
"required for keyword search")
537-
538-
sparse_embedding = self.convert_sparse_embedding_to_milvus_format(
539-
chunk.sparse_embedding)
540-
501+
sparse_embedding = MilvusHelpers.sparse_embedding(chunk.sparse_embedding)
541502
return chunk.content.text or sparse_embedding
542503

543504
def _get_call_response(

0 commit comments

Comments
 (0)