Skip to content

Commit ae74fee

Browse files
committed
feat: optimize cache, add checkConnection
Signed-off-by: yuluo-yx <[email protected]>
1 parent 519d4d8 commit ae74fee

File tree

5 files changed

+98
-24
lines changed

5 files changed

+98
-24
lines changed

src/semantic-router/pkg/cache/cache_interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ type CacheBackend interface {
2020
// IsEnabled returns whether caching is currently active
2121
IsEnabled() bool
2222

23+
// CheckConnection verifies the cache backend connection is healthy
24+
// Returns nil if the connection is healthy, error otherwise
25+
// For local caches (in-memory), this may be a no-op
26+
CheckConnection() error
27+
2328
// AddPendingRequest stores a request awaiting its response
2429
AddPendingRequest(requestID string, model string, query string, requestBody []byte) error
2530

src/semantic-router/pkg/cache/hybrid_cache.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,21 @@ func (h *HybridCache) IsEnabled() bool {
191191
return h.enabled
192192
}
193193

194+
// CheckConnection verifies the cache backend connection is healthy
195+
// For hybrid cache, this checks the Milvus connection
196+
func (h *HybridCache) CheckConnection() error {
197+
if !h.enabled {
198+
return nil
199+
}
200+
201+
if h.milvusCache == nil {
202+
return fmt.Errorf("milvus cache is not initialized")
203+
}
204+
205+
// Delegate to Milvus cache connection check
206+
return h.milvusCache.CheckConnection()
207+
}
208+
194209
// RebuildFromMilvus rebuilds the in-memory HNSW index from persistent Milvus storage
195210
// This is called on startup to recover the index after a restart
196211
func (h *HybridCache) RebuildFromMilvus(ctx context.Context) error {

src/semantic-router/pkg/cache/inmemory_cache.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ func (c *InMemoryCache) IsEnabled() bool {
131131
return c.enabled
132132
}
133133

134+
// CheckConnection verifies the cache connection is healthy
135+
// For in-memory cache, this is always healthy (no external connection)
136+
func (c *InMemoryCache) CheckConnection() error {
137+
// In-memory cache has no external connection to check
138+
return nil
139+
}
140+
134141
// generateEmbedding generates an embedding using the configured model
135142
func (c *InMemoryCache) generateEmbedding(text string) ([]float32, error) {
136143
// Normalize to lowercase for case-insensitive comparison
@@ -1099,10 +1106,3 @@ func (h *maxHeap) bubbleDown(i int) {
10991106
i = largest
11001107
}
11011108
}
1102-
1103-
func min(a, b int) int {
1104-
if a < b {
1105-
return a
1106-
}
1107-
return b
1108-
}

src/semantic-router/pkg/cache/milvus_cache.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) {
154154
logging.Debugf("MilvusCache: failed to connect: %v", err)
155155
return nil, fmt.Errorf("failed to create Milvus client: %w", err)
156156
}
157-
logging.Debugf("MilvusCache: successfully connected to Milvus")
158157

159158
cache := &MilvusCache{
160159
client: milvusClient,
@@ -165,6 +164,14 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) {
165164
enabled: options.Enabled,
166165
}
167166

167+
// Test connection using the new CheckConnection method
168+
if err := cache.CheckConnection(); err != nil {
169+
logging.Debugf("MilvusCache: connection check failed: %v", err)
170+
milvusClient.Close()
171+
return nil, err
172+
}
173+
logging.Debugf("MilvusCache: successfully connected to Milvus")
174+
168175
// Set up the collection for caching
169176
logging.Debugf("MilvusCache: initializing collection '%s'", config.Collection.Name)
170177
if err := cache.initializeCollection(); err != nil {
@@ -392,6 +399,37 @@ func (c *MilvusCache) IsEnabled() bool {
392399
return c.enabled
393400
}
394401

402+
// CheckConnection verifies the Milvus connection is healthy
403+
func (c *MilvusCache) CheckConnection() error {
404+
if !c.enabled {
405+
return nil
406+
}
407+
408+
if c.client == nil {
409+
return fmt.Errorf("milvus client is not initialized")
410+
}
411+
412+
ctx := context.Background()
413+
if c.config != nil && c.config.Connection.Timeout > 0 {
414+
timeout := time.Duration(c.config.Connection.Timeout) * time.Second
415+
var cancel context.CancelFunc
416+
ctx, cancel = context.WithTimeout(ctx, timeout)
417+
defer cancel()
418+
}
419+
420+
// Check if we can query the collection to verify connection
421+
hasCollection, err := c.client.HasCollection(ctx, c.collectionName)
422+
if err != nil {
423+
return fmt.Errorf("milvus connection check failed: %w", err)
424+
}
425+
426+
if !hasCollection {
427+
return fmt.Errorf("milvus collection '%s' does not exist", c.collectionName)
428+
}
429+
430+
return nil
431+
}
432+
395433
// AddPendingRequest stores a request that is awaiting its response
396434
func (c *MilvusCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
397435
start := time.Now()

src/semantic-router/pkg/cache/redis_cache.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,6 @@ func NewRedisCache(options RedisCacheOptions) (*RedisCache, error) {
115115
Protocol: 2, // Use RESP2 protocol for compatibility
116116
})
117117

118-
// Test connection
119-
ctx := context.Background()
120-
if config.Connection.Timeout > 0 {
121-
timeout := time.Duration(config.Connection.Timeout) * time.Second
122-
var cancel context.CancelFunc
123-
ctx, cancel = context.WithTimeout(ctx, timeout)
124-
defer cancel()
125-
logging.Debugf("RedisCache: connection timeout set to %s", timeout)
126-
}
127-
128-
if err := redisClient.Ping(ctx).Err(); err != nil {
129-
logging.Debugf("RedisCache: failed to connect: %v", err)
130-
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
131-
}
132-
logging.Debugf("RedisCache: successfully connected to Redis")
133-
134118
cache := &RedisCache{
135119
client: redisClient,
136120
config: config,
@@ -140,6 +124,13 @@ func NewRedisCache(options RedisCacheOptions) (*RedisCache, error) {
140124
enabled: options.Enabled,
141125
}
142126

127+
// Test connection using the new CheckConnection method
128+
if err := cache.CheckConnection(); err != nil {
129+
logging.Debugf("RedisCache: failed to connect: %v", err)
130+
return nil, err
131+
}
132+
logging.Debugf("RedisCache: successfully connected to Redis")
133+
143134
// Set up the index for vector search
144135
logging.Debugf("RedisCache: initializing index '%s'", config.Index.Name)
145136
if err := cache.initializeIndex(); err != nil {
@@ -350,6 +341,31 @@ func (c *RedisCache) IsEnabled() bool {
350341
return c.enabled
351342
}
352343

344+
// CheckConnection verifies the Redis connection is healthy
345+
func (c *RedisCache) CheckConnection() error {
346+
if !c.enabled {
347+
return nil
348+
}
349+
350+
if c.client == nil {
351+
return fmt.Errorf("redis client is not initialized")
352+
}
353+
354+
ctx := context.Background()
355+
if c.config != nil && c.config.Connection.Timeout > 0 {
356+
timeout := time.Duration(c.config.Connection.Timeout) * time.Second
357+
var cancel context.CancelFunc
358+
ctx, cancel = context.WithTimeout(ctx, timeout)
359+
defer cancel()
360+
}
361+
362+
if err := c.client.Ping(ctx).Err(); err != nil {
363+
return fmt.Errorf("redis connection check failed: %w", err)
364+
}
365+
366+
return nil
367+
}
368+
353369
// AddPendingRequest stores a request that is awaiting its response
354370
func (c *RedisCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
355371
start := time.Now()

0 commit comments

Comments
 (0)