Skip to content

Commit 973281c

Browse files
committed
review feedback
Signed-off-by: Huamin Chen <[email protected]>
1 parent c0d7918 commit 973281c

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed

src/semantic-router/pkg/cache/hybrid_cache.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,129 @@ func (h *HybridCache) FindSimilar(model string, query string) ([]byte, bool, err
583583
return nil, false, nil
584584
}
585585

586+
// FindSimilarWithThreshold searches for semantically similar cached requests using a specific threshold
587+
func (h *HybridCache) FindSimilarWithThreshold(model string, query string, threshold float32) ([]byte, bool, error) {
588+
start := time.Now()
589+
590+
if !h.enabled {
591+
return nil, false, nil
592+
}
593+
594+
queryPreview := query
595+
if len(query) > 50 {
596+
queryPreview = query[:50] + "..."
597+
}
598+
observability.Debugf("HybridCache.FindSimilarWithThreshold: searching for model='%s', query='%s', threshold=%.3f",
599+
model, queryPreview, threshold)
600+
601+
// Generate query embedding
602+
queryEmbedding, err := candle_binding.GetEmbedding(query, 0)
603+
if err != nil {
604+
metrics.RecordCacheOperation("hybrid", "find_similar_threshold", "error", time.Since(start).Seconds())
605+
return nil, false, fmt.Errorf("failed to generate embedding: %w", err)
606+
}
607+
608+
// Search HNSW index for candidates above similarity threshold
609+
// For semantic cache, we only need the first match, so search with k=1
610+
// and stop early when finding a match above threshold
611+
h.mu.RLock()
612+
candidates := h.searchKNNHybridWithThreshold(queryEmbedding, 1, 20, threshold)
613+
h.mu.RUnlock()
614+
615+
// Filter by similarity threshold before fetching from Milvus
616+
var qualifiedCandidates []searchResult
617+
for _, candidate := range candidates {
618+
if candidate.similarity >= threshold {
619+
qualifiedCandidates = append(qualifiedCandidates, candidate)
620+
}
621+
}
622+
623+
// Map qualified candidates to Milvus IDs (need lock for idMap access)
624+
type candidateWithID struct {
625+
milvusID string
626+
similarity float32
627+
index int
628+
}
629+
630+
h.mu.RLock()
631+
candidatesWithIDs := make([]candidateWithID, 0, len(qualifiedCandidates))
632+
for _, candidate := range qualifiedCandidates {
633+
if milvusID, ok := h.idMap[candidate.index]; ok {
634+
candidatesWithIDs = append(candidatesWithIDs, candidateWithID{
635+
milvusID: milvusID,
636+
similarity: candidate.similarity,
637+
index: candidate.index,
638+
})
639+
}
640+
}
641+
h.mu.RUnlock()
642+
643+
if len(candidatesWithIDs) == 0 {
644+
atomic.AddInt64(&h.missCount, 1)
645+
if len(candidates) > 0 {
646+
observability.Debugf("HybridCache.FindSimilarWithThreshold: %d candidates found but none above threshold %.3f",
647+
len(candidates), threshold)
648+
} else {
649+
observability.Debugf("HybridCache.FindSimilarWithThreshold: no candidates found in HNSW")
650+
}
651+
metrics.RecordCacheOperation("hybrid", "find_similar_threshold", "miss", time.Since(start).Seconds())
652+
metrics.RecordCacheMiss()
653+
return nil, false, nil
654+
}
655+
656+
observability.Debugf("HybridCache.FindSimilarWithThreshold: HNSW returned %d candidates, %d above threshold",
657+
len(candidates), len(candidatesWithIDs))
658+
659+
// Fetch document from Milvus for qualified candidates
660+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
661+
defer cancel()
662+
663+
// Try candidates in order (already sorted by similarity from HNSW)
664+
for _, candidate := range candidatesWithIDs {
665+
// Fetch document from Milvus by ID (direct lookup by primary key)
666+
fetchCtx, fetchCancel := context.WithTimeout(ctx, 2*time.Second)
667+
responseBody, err := h.milvusCache.GetByID(fetchCtx, candidate.milvusID)
668+
fetchCancel()
669+
670+
if err != nil {
671+
observability.Debugf("HybridCache.FindSimilarWithThreshold: Milvus GetByID failed for %s: %v",
672+
candidate.milvusID, err)
673+
continue
674+
}
675+
676+
if responseBody != nil {
677+
atomic.AddInt64(&h.hitCount, 1)
678+
observability.Debugf("HybridCache.FindSimilarWithThreshold: MILVUS HIT - similarity=%.4f (threshold=%.3f)",
679+
candidate.similarity, threshold)
680+
observability.LogEvent("hybrid_cache_hit", map[string]interface{}{
681+
"backend": "hybrid",
682+
"source": "milvus",
683+
"similarity": candidate.similarity,
684+
"threshold": threshold,
685+
"model": model,
686+
"latency_ms": time.Since(start).Milliseconds(),
687+
})
688+
metrics.RecordCacheOperation("hybrid", "find_similar_threshold", "hit_milvus", time.Since(start).Seconds())
689+
metrics.RecordCacheHit()
690+
return responseBody, true, nil
691+
}
692+
}
693+
694+
// No match found above threshold
695+
atomic.AddInt64(&h.missCount, 1)
696+
observability.Debugf("HybridCache.FindSimilarWithThreshold: CACHE MISS - no match above threshold")
697+
observability.LogEvent("hybrid_cache_miss", map[string]interface{}{
698+
"backend": "hybrid",
699+
"threshold": threshold,
700+
"model": model,
701+
"candidates": len(candidatesWithIDs),
702+
})
703+
metrics.RecordCacheOperation("hybrid", "find_similar_threshold", "miss", time.Since(start).Seconds())
704+
metrics.RecordCacheMiss()
705+
706+
return nil, false, nil
707+
}
708+
586709
// Close releases all resources
587710
func (h *HybridCache) Close() error {
588711
if !h.enabled {

0 commit comments

Comments
 (0)