Skip to content

Commit 4ea39f4

Browse files
cryo-zdrootfs
authored andcommitted
feat: enforce milvus dial timeout if set (vllm-project#503)
Signed-off-by: cryo <[email protected]> Signed-off-by: Huamin Chen <[email protected]>
1 parent a82edd0 commit 4ea39f4

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

src/semantic-router/pkg/cache/cache_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cache_test
22

33
import (
4+
"fmt"
45
"os"
56
"path/filepath"
67
"strings"
@@ -182,6 +183,44 @@ development:
182183
})
183184
})
184185

186+
Context("Milvus connection timeouts", func() {
187+
It("should respect connection timeout when endpoint is unreachable", func() {
188+
unreachableConfigPath := filepath.Join(tempDir, "milvus-unreachable.yaml")
189+
unreachableHost := "10.255.255.1" // unroutable address to simulate a hanging dial
190+
unreachableConfig := fmt.Sprintf(`
191+
connection:
192+
host: "%s"
193+
port: 19530
194+
database: "test_cache"
195+
timeout: 1
196+
`, unreachableHost)
197+
198+
err := os.WriteFile(unreachableConfigPath, []byte(unreachableConfig), 0o644)
199+
Expect(err).NotTo(HaveOccurred())
200+
201+
done := make(chan struct{})
202+
var cacheErr error
203+
204+
go func() {
205+
defer GinkgoRecover()
206+
_, cacheErr = cache.NewMilvusCache(cache.MilvusCacheOptions{
207+
Enabled: true,
208+
SimilarityThreshold: 0.85,
209+
TTLSeconds: 60,
210+
ConfigPath: unreachableConfigPath,
211+
})
212+
close(done)
213+
}()
214+
215+
Eventually(done, 2*time.Second, 100*time.Millisecond).Should(BeClosed())
216+
Expect(cacheErr).To(HaveOccurred())
217+
Expect(cacheErr.Error()).To(Or(
218+
ContainSubstring("context deadline exceeded"),
219+
ContainSubstring("timeout"),
220+
))
221+
})
222+
})
223+
185224
Context("with unsupported backend type", func() {
186225
It("should return error for unsupported backend type", func() {
187226
config := cache.CacheConfig{

src/semantic-router/pkg/cache/milvus_cache.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,16 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) {
138138
// Establish connection to Milvus server
139139
connectionString := fmt.Sprintf("%s:%d", config.Connection.Host, config.Connection.Port)
140140
observability.Debugf("MilvusCache: connecting to Milvus at %s", connectionString)
141-
milvusClient, err := client.NewGrpcClient(context.Background(), connectionString)
141+
dialCtx := context.Background()
142+
var cancel context.CancelFunc
143+
if config.Connection.Timeout > 0 {
144+
// If a timeout is specified, apply it to the connection context
145+
timeout := time.Duration(config.Connection.Timeout) * time.Second
146+
dialCtx, cancel = context.WithTimeout(dialCtx, timeout)
147+
defer cancel()
148+
observability.Debugf("MilvusCache: connection timeout set to %s", timeout)
149+
}
150+
milvusClient, err := client.NewGrpcClient(dialCtx, connectionString)
142151
if err != nil {
143152
observability.Debugf("MilvusCache: failed to connect: %v", err)
144153
return nil, fmt.Errorf("failed to create Milvus client: %w", err)

0 commit comments

Comments
 (0)