Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9525d5f
exmaples+website+sdks/python: update docs and exmaples for milvus tra…
mohamedawnallah Jun 27, 2025
f746a78
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Jun 30, 2025
2e1ae71
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Aug 16, 2025
d482464
examples: update jupyter notebook example
mohamedawnallah Aug 16, 2025
104868e
CHANGES.md: add release note
mohamedawnallah Aug 16, 2025
9951901
sdks/python: update import err exception
mohamedawnallah Aug 16, 2025
62c6a5e
sdks/python: experiment with setting milvus as extra dependency this way
mohamedawnallah Aug 16, 2025
508a8ad
sdks/python: revert pytest marker to use test containers
mohamedawnallah Aug 16, 2025
bc79236
.github: trigger postcommit python
mohamedawnallah Aug 16, 2025
ab170a3
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Sep 16, 2025
435d9e8
sdks/python: undo `require_docker_in_docker` pytest marker
mohamedawnallah Sep 16, 2025
084687b
sdks/python: fix formatting issues
mohamedawnallah Sep 16, 2025
924282a
python: mark `test_enrichment_with_milvus` with require_docker_in_docker
mohamedawnallah Sep 16, 2025
c687fcf
sdks/python: test milvus example
mohamedawnallah Sep 16, 2025
733c233
sdks/python: update jupyter notebook example
mohamedawnallah Sep 16, 2025
2c78633
CHANGES.md: update release notes
mohamedawnallah Sep 16, 2025
e889882
sdks/python: fix linting issues
mohamedawnallah Sep 16, 2025
116a95c
sdks/python: properly skip milvus test on any container startup failures
mohamedawnallah Sep 24, 2025
3a3b03a
sdks/python: properly skip sql tests on any container startup failure
mohamedawnallah Sep 24, 2025
8bab624
sdks/python: fix linting issues
mohamedawnallah Sep 24, 2025
f8af037
examples: address comments on milvus jupyter notebook
mohamedawnallah Oct 18, 2025
c764314
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Oct 18, 2025
b7c064f
ml/rag: enforce running etcd in milvus itests in standalone mode
mohamedawnallah Oct 19, 2025
aa50907
examples: update jupyter notebook mainly to pin milvus db version
mohamedawnallah Oct 19, 2025
fa80a1f
website: remove `Related transforms` section
mohamedawnallah Oct 19, 2025
e30ab53
sdks/python: pin milvus db version in py examples
mohamedawnallah Oct 19, 2025
a0299e9
sdks/python: skip validation if there's no enrichment data
mohamedawnallah Oct 20, 2025
d96a884
sdks/python: pin milvus db version `v2.5.10`
mohamedawnallah Oct 20, 2025
98c5ef9
milvus: add descriptive comments about updating db version in tests
mohamedawnallah Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#35473](https://github.com/apache/beam/issues/36095)).
* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#36095](https://github.com/apache/beam/issues/36095)).
* Support for batch mode execution in WriteToPubSub transform added (Python) ([#35990](https://github.com/apache/beam/issues/35990)).
* Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/)
including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)).

## Breaking Changes

Expand Down
2,373 changes: 2,373 additions & 0 deletions examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,15 @@ def enrichment_with_google_cloudsql_pg():
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
|
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
| "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_google_cloudsql_pg]

Expand Down Expand Up @@ -327,3 +326,75 @@ def enrichment_with_external_sqlserver():
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_sqlserver]


def enrichment_with_milvus():
# [START enrichment_with_milvus]
import os
import apache_beam as beam
from apache_beam.ml.rag.types import Content
from apache_beam.ml.rag.types import Chunk
from apache_beam.ml.rag.types import Embedding
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.ml.rag.enrichment.milvus_search import (
MilvusSearchEnrichmentHandler,
MilvusConnectionParameters,
MilvusSearchParameters,
MilvusCollectionLoadParameters,
VectorSearchParameters,
VectorSearchMetrics)

uri = os.environ.get("MILVUS_VECTOR_DB_URI")
user = os.environ.get("MILVUS_VECTOR_DB_USER")
password = os.environ.get("MILVUS_VECTOR_DB_PASSWORD")
db_id = os.environ.get("MILVUS_VECTOR_DB_ID")
token = os.environ.get("MILVUS_VECTOR_DB_TOKEN")
collection_name = os.environ.get("MILVUS_VECTOR_DB_COLLECTION_NAME")

data = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content())
]

connection_parameters = MilvusConnectionParameters(
uri, user, password, db_id, token)

# The first condition (language == "en") excludes documents in other
# languages. Initially, this gives us two documents. After applying the second
# condition (cost < 50), only the first document returns in search results.
filter_expr = 'metadata["language"] == "en" AND cost < 50'

search_params = {"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1}

vector_search_params = VectorSearchParameters(
anns_field="dense_embedding_cosine",
limit=3,
filter=filter_expr,
search_params=search_params)

search_parameters = MilvusSearchParameters(
collection_name=collection_name,
search_strategy=vector_search_params,
output_fields=["id", "content", "domain", "cost", "metadata"],
round_decimal=2)

# The collection load parameters are optional. They provide fine-graine
# control over how collections are loaded into memory. For simple use cases or
# when getting started, this parameter can be omitted to use default loading
# behavior. Consider using it in resource-constrained environments to optimize
# memory usage and query performance.
collection_load_parameters = MilvusCollectionLoadParameters()

milvus_search_handler = MilvusSearchEnrichmentHandler(
connection_parameters=connection_parameters,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Milvus" >> Enrichment(milvus_search_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_milvus]
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
enrichment_with_google_cloudsql_pg,
enrichment_with_external_pg,
enrichment_with_external_mysql,
enrichment_with_external_sqlserver)
enrichment_with_external_sqlserver,
enrichment_with_milvus)
from apache_beam.transforms.enrichment_handlers.cloudsql import (
DatabaseTypeAdapter)
from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import (
Expand All @@ -51,9 +52,16 @@
ConnectionConfig,
CloudSQLConnectionConfig,
ExternalSQLDBConnectionConfig)
from apache_beam.ml.rag.enrichment.milvus_search import (
MilvusConnectionParameters)
from apache_beam.ml.rag.enrichment.milvus_search_it_test import (
MilvusEnrichmentTestHelper,
MilvusDBContainerInfo,
parse_chunk_strings,
assert_chunks_equivalent)
from apache_beam.io.requestresponse import RequestResponseIO
except ImportError as e:
raise unittest.SkipTest(f'RequestResponseIO dependencies not installed: {e}')
raise unittest.SkipTest(f'Examples dependencies are not installed: {str(e)}')


def validate_enrichment_with_bigtable():
Expand Down Expand Up @@ -119,6 +127,13 @@ def validate_enrichment_with_external_sqlserver():
return expected


def validate_enrichment_with_milvus():
expected = '''[START enrichment_with_milvus]
Chunk(content=Content(text=None), id='query1', index=0, metadata={'enrichment_data': defaultdict(<class 'list'>, {'id': [1], 'distance': [1.0], 'fields': [{'content': 'This is a test document', 'cost': 49, 'domain': 'medical', 'id': 1, 'metadata': {'language': 'en'}}]})}, embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3], sparse_embedding=None))
[END enrichment_with_milvus]'''.splitlines()[1:-1]
return expected


@mock.patch('sys.stdout', new_callable=StringIO)
@pytest.mark.uses_testcontainer
class EnrichmentTest(unittest.TestCase):
Expand Down Expand Up @@ -190,6 +205,19 @@ def test_enrichment_with_external_sqlserver(self, mock_stdout):
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")

def test_enrichment_with_milvus(self, mock_stdout):
with EnrichmentTestHelpers.milvus_test_context():
try:
enrichment_with_milvus()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_milvus()
self.maxDiff = None
output = parse_chunk_strings(output)
expected = parse_chunk_strings(expected)
assert_chunks_equivalent(output, expected)
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")


@dataclass
class CloudSQLEnrichmentTestDataConstruct:
Expand All @@ -200,6 +228,7 @@ class CloudSQLEnrichmentTestDataConstruct:


class EnrichmentTestHelpers:
@staticmethod
@contextmanager
def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
result: Optional[CloudSQLEnrichmentTestDataConstruct] = None
Expand All @@ -211,6 +240,17 @@ def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
if result:
EnrichmentTestHelpers.post_sql_enrichment_test(result)

@staticmethod
@contextmanager
def milvus_test_context():
db: Optional[MilvusDBContainerInfo] = None
try:
db = EnrichmentTestHelpers.pre_milvus_enrichment()
yield
finally:
if db:
EnrichmentTestHelpers.post_milvus_enrichment(db)

@staticmethod
def pre_sql_enrichment_test(
is_cloudsql: bool,
Expand Down Expand Up @@ -310,6 +350,41 @@ def post_sql_enrichment_test(res: CloudSQLEnrichmentTestDataConstruct):
os.environ.pop('GOOGLE_CLOUD_SQL_DB_PASSWORD', None)
os.environ.pop('GOOGLE_CLOUD_SQL_DB_TABLE_ID', None)

@staticmethod
def pre_milvus_enrichment() -> MilvusDBContainerInfo:
db = MilvusEnrichmentTestHelper.start_db_container()

connection_params = MilvusConnectionParameters(
uri=db.uri,
user=db.user,
password=db.password,
db_id=db.id,
token=db.token)

collection_name = MilvusEnrichmentTestHelper.initialize_db_with_data(
connection_params)

# Setup environment variables for db and collection configuration. This will
# be used downstream by the milvus enrichment handler.
os.environ['MILVUS_VECTOR_DB_URI'] = db.uri
os.environ['MILVUS_VECTOR_DB_USER'] = db.user
os.environ['MILVUS_VECTOR_DB_PASSWORD'] = db.password
os.environ['MILVUS_VECTOR_DB_ID'] = db.id
os.environ['MILVUS_VECTOR_DB_TOKEN'] = db.token
os.environ['MILVUS_VECTOR_DB_COLLECTION_NAME'] = collection_name

return db

@staticmethod
def post_milvus_enrichment(db: MilvusDBContainerInfo):
MilvusEnrichmentTestHelper.stop_db_container(db)
os.environ.pop('MILVUS_VECTOR_DB_URI', None)
os.environ.pop('MILVUS_VECTOR_DB_USER', None)
os.environ.pop('MILVUS_VECTOR_DB_PASSWORD', None)
os.environ.pop('MILVUS_VECTOR_DB_ID', None)
os.environ.pop('MILVUS_VECTOR_DB_TOKEN', None)
os.environ.pop('MILVUS_VECTOR_DB_COLLECTION_NAME', None)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(
class MilvusEnrichmentTestHelper:
@staticmethod
def start_db_container(
image="milvusdb/milvus:v2.3.9",
image="milvusdb/milvus:v2.5.10",
max_vec_fields=5,
vector_client_max_retries=3,
tc_max_retries=TC_MAX_TRIES) -> Optional[MilvusDBContainerInfo]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
title: "Enrichment with Milvus"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Use Milvus to enrich data

{{< localstorage language language-py >}}

<table>
<tr>
<td>
<a>
{{< button-pydoc path="apache_beam.ml.rag.enrichment.milvus_search" class="MilvusSearchEnrichmentHandler" >}}
</a>
</td>
</tr>
</table>

In Apache Beam 2.67.0 and later versions, the enrichment transform includes
a built-in enrichment handler for
[Milvus](https://milvus.io/).
The following example demonstrates how to create a pipeline that use the enrichment transform with the [`MilvusSearchEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.enrichment.milvus_search.html#apache_beam.ml.rag.enrichment.milvus_search.MilvusSearchEnrichmentHandler) handler.

The data in the Milvus instance collection `docs_catalog` follows this format:

{{< table >}}
| id | content | domain | cost | metadata | dense_embedding | sparse_embedding |
|:--:|:-------:|:------:|:----:|:--------:|:--------------:|:----------------:|
| 1 | This is a test document | medical | 49 | {"language": "en"} | [0.1, 0.2, 0.3] | [auto-generated by Milvus] |
| 2 | Another test document | legal | 75 | {"language": "en"} | [0.2, 0.3, 0.4] | [auto-generated by Milvus] |
| 3 | وثيقة اختبار | financial | 149 | {"language": "ar"} | [0.3, 0.4, 0.5] | [auto-generated by Milvus] |
{{< /table >}}


{{< highlight language="py" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_milvus >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_milvus >}}
{{< /highlight >}}

## Notebook exmaple

<a href="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb" target="_blank">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" width="150" height="auto" style="max-width: 100%"/>
</a>

## Related transforms

Not applicable.

{{< button-pydoc path="apache_beam.ml.rag.enrichment.milvus_search" class="MilvusSearchEnrichmentHandler" >}}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The following examples demonstrate how to create a pipeline that use the enrichm
| Service | Example |
|:-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Cloud Bigtable | [Enrichment with Bigtable](/documentation/transforms/python/elementwise/enrichment-bigtable/#example) |
| Milvus | [Enrichment with Milvus](/documentation/transforms/python/elementwise/enrichment-milvus/#example) |
| Cloud SQL (PostgreSQL, MySQL, SQLServer) | [Enrichment with CloudSQL](/documentation/transforms/python/elementwise/enrichment-cloudsql) |
| Vertex AI Feature Store | [Enrichment with Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-1-enrichment-with-vertex-ai-feature-store) |
| Vertex AI Feature Store (Legacy) | [Enrichment with Legacy Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) |
Expand Down Expand Up @@ -100,4 +101,4 @@ enriched_data = (input_data

Not applicable.

{{< button-pydoc path="apache_beam.transforms.enrichment" class="Enrichment" >}}
{{< button-pydoc path="apache_beam.transforms.enrichment" class="Enrichment" >}}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@
<ul class="section-nav-list">
<li><a href="/documentation/transforms/python/elementwise/enrichment/">Overview</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-bigtable/">Bigtable example</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-milvus/">Milvus example</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-cloudsql/">CloudSQL example</a></li>
<li><a href="/documentation/transforms/python/elementwise/enrichment-vertexai/">Vertex AI Feature Store examples</a></li>
</ul>
Expand Down
Loading