feat(indexer): 优化 Milvus 索引器以支持批量处理和并发控制 #581
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What type of PR is this?
Check the PR title.
(Optional) Translate the PR title into Chinese.
feat(milvus): 支持并发批量插入与性能优化
(Optional) More detailed description for this PR(en: English/zh: Chinese).
en: This PR introduces a high-performance batch processing mechanism for the Milvus Indexer, significantly improving the efficiency of large-scale document insertions.
Key Changes:
Batch Processing & Concurrency:
Introduced BatchSize and MaxConcurrency parameters in IndexerConfig.
Implemented internal batching logic in the Store method.
Added semaphore-based concurrency control to process Embedding, Conversion, and InsertRows in parallel goroutines.
Optimized the workflow to perform a unified Flush operation only after all batches are processed, avoiding performance bottlenecks caused by frequent flushing.
Refactoring:
Refactored the Store method to handle large document lists automatically, removing the need for callers to manually split data.
Dependency Upgrades:
Updated eino-ext and related indirect dependencies to enhance stability.
Examples & Testing:
Added examples/test.md (146 docs) and updated examples/main.go.
Introduced markdown splitter in the example to demonstrate the new capability.
Performance Comparison: Testing with examples/test.md (146 documents) on the same environment:
Before (Manual Serial Batching): ~2m 40s (160.3s)
After (Concurrent Batching): ~2.74s


Improvement: ~58x faster
Before (Before: Manual serial processing took 2m40s)
After (After: Concurrent processing took 2.74s)
zh(optional): 本 PR 为 Milvus Indexer 引入了高性能的批处理机制,极大提升了大规模文档插入的效率。
主要变更:
批处理与并发:
在 IndexerConfig 中新增了 BatchSize 和 MaxConcurrency 配置项。
在 Store 方法内部实现了自动分批逻辑。
添加了基于信号量的并发控制,支持并行执行 Embedding、数据转换和 InsertRows 操作。
优化了 Flush 逻辑,确保所有批次插入完成后统一执行一次 Flush,避免频繁 Flush 导致的性能阻塞。
重构:
重构了 Store 方法,调用方不再需要手动切分数据,可以直接传入大量文档。
依赖升级:
更新了 eino-ext 组件版本及相关间接依赖。
示例与测试:
新增 examples/test.md 测试文件(包含146个文档片段)并在 examples/main.go 中引入了 markdown 分割器进行验证。
性能对比: 使用 examples/test.md (146个文档片段) 在相同环境下测试:
优化前 (手动串行分批): 耗时约 2分40秒 (160.3s)


优化后 (内部并发分批): 耗时约 2.74秒
提升: 性能提升约 58 倍
(Optional) Which issue(s) this PR fixes:
Fixes #579
(optional) The PR that updates user documentation: