Skip to content

Commit b084802

Browse files
website: update documentation
1 parent a8b4f46 commit b084802

File tree

3 files changed

+335
-0
lines changed

3 files changed

+335
-0
lines changed
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
---
2+
title: "Milvus I/O connector"
3+
---
4+
<!--
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
18+
[Built-in I/O Transforms](/documentation/io/built-in/)
19+
20+
# Milvus I/O connector
21+
22+
The Beam SDKs include built-in transforms that can write data to [Milvus](https://milvus.io/) vector databases. Milvus is a high-performance, cloud-native vector database designed for machine learning and AI applications.
23+
24+
## Before you start
25+
26+
To use MilvusIO, you need to install the required dependencies. The Milvus I/O connector is part of the ML/RAG functionality in Apache Beam.
27+
28+
```python
29+
pip install apache-beam[milvus,gcp]
30+
```
31+
32+
**Additional resources:**
33+
34+
* [MilvusIO source code](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/ingestion/milvus_search.py)
35+
* [MilvusIO Pydoc](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.ingestion.milvus_search.html)
36+
* [Milvus Documentation](https://milvus.io/docs)
37+
38+
## Overview
39+
40+
The Milvus I/O connector provides a sink for writing vector embeddings and
41+
associated metadata to Milvus collections. This connector is specifically
42+
designed for RAG (Retrieval-Augmented Generation) use cases where you need to
43+
store document chunks with their vector embeddings for similarity search.
44+
45+
### Key Features
46+
47+
- **Vector Database Integration**: Write embeddings and metadata to Milvus collections
48+
- **RAG-Optimized**: Built specifically for RAG workflows with document chunks
49+
- **Batch Processing**: Efficient batched writes to optimize performance
50+
- **Flexible Schema**: Configurable column mappings for different data schemas
51+
- **Connection Management**: Proper connection lifecycle management with context managers
52+
53+
## Writing to Milvus
54+
55+
### Basic Usage
56+
57+
```python
58+
import apache_beam as beam
59+
from apache_beam.ml.rag.ingestion.milvus_search import MilvusVectorWriterConfig
60+
from apache_beam.ml.rag.utils import MilvusConnectionConfig
61+
62+
# Configure connection to Milvus.
63+
connection_config = MilvusConnectionConfig(
64+
uri="http://localhost:19530", # Milvus server URI
65+
db_name="default" # Database name
66+
)
67+
68+
# Configure write settings.
69+
write_config = MilvusWriteConfig(
70+
collection_name="document_embeddings",
71+
write_batch_size=1000
72+
)
73+
74+
# Create the writer configuration.
75+
milvus_config = MilvusVectorWriterConfig(
76+
connection_params=connection_config,
77+
write_config=write_config
78+
)
79+
80+
# Use in a pipeline.
81+
with beam.Pipeline() as pipeline:
82+
chunks = (
83+
pipeline
84+
| "Read Data" >> beam.io.ReadFromText("input.txt")
85+
| "Process to Chunks" >> beam.Map(process_to_chunks)
86+
)
87+
88+
# Write to Milvus.
89+
chunks | "Write to Milvus" >> milvus_config.create_write_transform()
90+
```
91+
92+
### Configuration Options
93+
94+
#### Connection Configuration
95+
96+
```python
97+
from apache_beam.ml.rag.utils import MilvusConnectionConfig
98+
99+
connection_config = MilvusConnectionConfig(
100+
uri="http://localhost:19530", # Milvus server URI
101+
token="your_token", # Authentication token (optional)
102+
db_name="vector_db", # Database name
103+
timeout=30.0 # Connection timeout in seconds
104+
)
105+
```
106+
107+
#### Write Configuration
108+
109+
```python
110+
from apache_beam.ml.rag.ingestion.milvus_search import MilvusWriteConfig
111+
112+
write_config = MilvusWriteConfig(
113+
collection_name="embeddings", # Target collection name
114+
partition_name="", # Partition name (optional)
115+
timeout=60.0, # Write operation timeout
116+
write_batch_size=1000 # Number of records per batch
117+
)
118+
```
119+
120+
### Working with Chunks
121+
122+
The Milvus I/O connector is designed to work with `Chunk` objects that contain
123+
document content and embeddings:
124+
125+
```python
126+
from apache_beam.ml.rag.types import Chunk
127+
import numpy as np
128+
129+
def create_chunk_example():
130+
return Chunk(
131+
id="doc_1_chunk_1",
132+
content="This is the document content...",
133+
embedding=[0.1, 0.2, 0.3, 0.4, 0.5], # Dense embedding vector
134+
sparse_embedding={"token_1": 0.5, "token_2": 0.3}, # Sparse embedding (optional)
135+
metadata={"source": "document.pdf", "page": 1}
136+
)
137+
```
138+
139+
### Custom Column Specifications
140+
141+
You can customize how chunk fields are mapped to Milvus collection fields:
142+
143+
```python
144+
from apache_beam.ml.rag.ingestion.postgres_common import ColumnSpec
145+
146+
# Define custom column mappings.
147+
custom_column_specs = [
148+
ColumnSpec(
149+
column_name="doc_id",
150+
value_fn=lambda chunk: chunk.id
151+
),
152+
ColumnSpec(
153+
column_name="vector",
154+
value_fn=lambda chunk: list(chunk.embedding)
155+
),
156+
ColumnSpec(
157+
column_name="text_content",
158+
value_fn=lambda chunk: chunk.content
159+
),
160+
ColumnSpec(
161+
column_name="document_metadata",
162+
value_fn=lambda chunk: dict(chunk.metadata)
163+
)
164+
]
165+
166+
# Use custom column specs.
167+
milvus_config = MilvusVectorWriterConfig(
168+
connection_params=connection_config,
169+
write_config=write_config,
170+
column_specs=custom_column_specs
171+
)
172+
```
173+
174+
## Complete Example
175+
176+
Here's a complete example that processes documents and writes them to Milvus:
177+
178+
```python
179+
import apache_beam as beam
180+
from apache_beam.ml.rag.ingestion.milvus_search import (
181+
MilvusVectorWriterConfig,
182+
MilvusWriteConfig
183+
)
184+
from apache_beam.ml.rag.utils import MilvusConnectionConfig
185+
from apache_beam.ml.rag.types import Chunk
186+
import numpy as np
187+
188+
def process_document(document_text):
189+
"""Process a document into chunks with embeddings."""
190+
# This is a simplified example - in practice you would:
191+
# 1. Split document into chunks
192+
# 2. Generate embeddings using a model
193+
# 3. Extract metadata
194+
195+
chunks = []
196+
sentences = document_text.split('.')
197+
198+
for i, sentence in enumerate(sentences):
199+
if sentence.strip():
200+
# Generate mock embedding (replace with real embedding model).
201+
embedding = np.random.rand(384).tolist() # 384-dimensional vector
202+
203+
chunk = Chunk(
204+
id=f"doc_chunk_{i}",
205+
content=sentence.strip(),
206+
embedding=embedding,
207+
metadata={"chunk_index": i, "length": len(sentence)}
208+
)
209+
chunks.append(chunk)
210+
211+
return chunks
212+
213+
def run_pipeline():
214+
# Configure Milvus connection.
215+
connection_config = MilvusConnectionConfig(
216+
uri="http://localhost:19530",
217+
db_name="rag_database"
218+
)
219+
220+
# Configure write settings.
221+
write_config = MilvusWriteConfig(
222+
collection_name="document_chunks",
223+
write_batch_size=500
224+
)
225+
226+
# Create writer configuration.
227+
milvus_config = MilvusVectorWriterConfig(
228+
connection_params=connection_config,
229+
write_config=write_config
230+
)
231+
232+
# Define pipeline.
233+
with beam.Pipeline() as pipeline:
234+
documents = (
235+
pipeline
236+
| "Create Sample Documents" >> beam.Create([
237+
"First document content. It has multiple sentences.",
238+
"Second document with different content. More sentences here."
239+
])
240+
)
241+
242+
chunks = (
243+
documents
244+
| "Process Documents" >> beam.FlatMap(process_document)
245+
)
246+
247+
# Write to Milvus.
248+
chunks | "Write to Milvus" >> milvus_config.create_write_transform()
249+
250+
if __name__ == "__main__":
251+
run_pipeline()
252+
```
253+
254+
## Performance Considerations
255+
256+
### Batch Size Optimization
257+
258+
The write batch size significantly affects performance. Larger batches reduce
259+
the number of network round-trips but consume more memory:
260+
261+
```python
262+
# For high-throughput scenarios.
263+
write_config = MilvusWriteConfig(
264+
collection_name="large_collection",
265+
write_batch_size=2000 # Larger batches for better throughput
266+
)
267+
268+
# For memory-constrained environments.
269+
write_config = MilvusWriteConfig(
270+
collection_name="small_collection",
271+
write_batch_size=100 # Smaller batches to reduce memory usage
272+
)
273+
```
274+
275+
### Production Configuration
276+
277+
For production deployments, consider using appropriate timeout settings and
278+
connection parameters:
279+
280+
```python
281+
connection_config = MilvusConnectionConfig(
282+
uri="http://milvus-cluster:19530",
283+
timeout=120.0, # Longer timeout for production workloads
284+
db_name="production_db",
285+
token="your_production_token" # Using authentication in production
286+
)
287+
```
288+
289+
## Error Handling
290+
291+
The connector includes built-in error handling and logging. Monitor your
292+
pipeline logs for any connection or write failures:
293+
294+
```python
295+
import logging
296+
297+
# Enable debug logging to see detailed operation information.
298+
logging.basicConfig(level=logging.DEBUG)
299+
logger = logging.getLogger(__name__)
300+
301+
# In your processing function.
302+
def safe_process_document(document):
303+
try:
304+
return process_document(document)
305+
except Exception as e:
306+
logger.error(f"Failed to process document: {e}")
307+
return [] # Return empty list on failure
308+
```
309+
310+
## Notebook exmaple
311+
312+
<a href="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_vector_ingestion_and_search.ipynb" target="_blank">
313+
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" width="150" height="auto" style="max-width: 100%"/>
314+
</a>
315+
316+
317+
## Related transforms
318+
319+
- [Milvus Enrichment Handler in Apache Beam](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/enrichment/milvus_search.py)

website/www/site/content/en/documentation/io/connectors.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,21 @@ This table provides a consolidated, at-a-glance overview of the available built-
10491049
<td class="present">✔</td>
10501050
<td class="absent">✘</td>
10511051
</tr>
1052+
<tr>
1053+
<td>MilvusIO (<a href="/documentation/io/built-in/milvus/">guide</a>)</td>
1054+
<td class="absent">✘</td>
1055+
<td class="present">✔</td>
1056+
<td>Not available</td>
1057+
<td class="present">
1058+
1059+
<a href="https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.ingestion.milvus_search.html">native</a>
1060+
</td>
1061+
<td>Not available</td>
1062+
<td>Not available</td>
1063+
<td class="present">✔</td>
1064+
<td class="absent">✘</td>
1065+
<td class="absent">✘</td>
1066+
</tr>
10521067
<tr>
10531068
<td>
10541069
Iceberg (Managed I/O)

website/www/site/layouts/partials/section-menu/en/documentation.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
<li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
7474
<li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
7575
<li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
76+
<li><a href="/documentation/io/built-in/milvus/">Milvus I/O connector</a></li>
7677
<li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
7778
<li><a href="/documentation/io/built-in/cdap/">CDAP I/O connector</a></li>
7879
<li><a href="/documentation/io/built-in/sparkreceiver/">Spark Receiver I/O connector</a></li>

0 commit comments

Comments
 (0)