Skip to content

Commit 86b6ae2

Browse files
cryo-zdrootfs
andauthored
fix: keep existing InMemory HNSW nodes searchable after eviction (#722)
Signed-off-by: cryo <[email protected]> Co-authored-by: Huamin Chen <[email protected]>
1 parent 13db404 commit 86b6ae2

File tree

2 files changed

+119
-24
lines changed

2 files changed

+119
-24
lines changed

src/semantic-router/pkg/cache/cache_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,44 @@ development:
759759
Expect(stats.HitCount).To(Equal(int64(0)))
760760
Expect(stats.MissCount).To(Equal(int64(0)))
761761
})
762+
763+
It("should keep existing HNSW nodes searchable after eviction", func() {
764+
cacheWithHNSW := NewInMemoryCache(InMemoryCacheOptions{
765+
Enabled: true,
766+
SimilarityThreshold: 0.1,
767+
MaxEntries: 2,
768+
TTLSeconds: 60, // Set TTL long enough to avoid expiration during test
769+
EvictionPolicy: FIFOEvictionPolicyType,
770+
UseHNSW: true,
771+
HNSWM: 4,
772+
HNSWEfConstruction: 8,
773+
HNSWEfSearch: 8,
774+
EmbeddingModel: "bert",
775+
})
776+
defer cacheWithHNSW.Close()
777+
778+
err := cacheWithHNSW.AddEntry("req-1", "test-model", "first query text", []byte("request-1"), []byte("response-1"))
779+
Expect(err).NotTo(HaveOccurred())
780+
781+
err = cacheWithHNSW.AddEntry("req-2", "test-model", "second query text", []byte("request-2"), []byte("response-2"))
782+
Expect(err).NotTo(HaveOccurred())
783+
784+
// Sanity check: the second entry should be retrievable before any eviction occurs.
785+
resp, found, err := cacheWithHNSW.FindSimilar("test-model", "second query text")
786+
Expect(err).NotTo(HaveOccurred())
787+
Expect(found).To(BeTrue())
788+
Expect(resp).To(Equal([]byte("response-2")))
789+
790+
// Adding a third entry triggers eviction (max entries = 2).
791+
err = cacheWithHNSW.AddEntry("req-3", "test-model", "third query text", []byte("request-3"), []byte("response-3"))
792+
Expect(err).NotTo(HaveOccurred())
793+
794+
// Entry 2 should still be searchable even after eviction reshuffles the slice.
795+
resp, found, err = cacheWithHNSW.FindSimilar("test-model", "second query text")
796+
Expect(err).NotTo(HaveOccurred())
797+
Expect(found).To(BeTrue())
798+
Expect(resp).To(Equal([]byte("response-2")))
799+
})
762800
})
763801

764802
Describe("Cache Backend Types", func() {

src/semantic-router/pkg/cache/inmemory_cache.go

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type InMemoryCache struct {
4949
evictionPolicy EvictionPolicy
5050
hnswIndex *HNSWIndex
5151
useHNSW bool
52+
hnswNeedsRebuild bool // true while the HNSW graph is stale relative to entries
5253
hnswEfSearch int // Search-time ef parameter
5354
embeddingModel string // "bert", "qwen3", or "gemma"
5455
}
@@ -177,8 +178,8 @@ func (c *InMemoryCache) AddPendingRequest(requestID string, model string, query
177178
c.mu.Lock()
178179
defer c.mu.Unlock()
179180

180-
// Remove expired entries to maintain cache hygiene
181-
c.cleanupExpiredEntries()
181+
// Remove expired entries to maintain cache hygiene, but defer the HNSW rebuild to the insertion below if HNSW is enabled.
182+
c.cleanupExpiredEntriesDeferred()
182183

183184
// Check if eviction is needed before adding the new entry
184185
if c.maxEntries > 0 && len(c.entries) >= c.maxEntries {
@@ -201,10 +202,8 @@ func (c *InMemoryCache) AddPendingRequest(requestID string, model string, query
201202
c.entries = append(c.entries, entry)
202203
entryIndex := len(c.entries) - 1
203204

204-
// Add to HNSW index if enabled
205-
if c.useHNSW && c.hnswIndex != nil {
206-
c.hnswIndex.addNode(entryIndex, embedding, c.entries)
207-
}
205+
// Add to HNSW index if enabled. Do not call c.hnswIndex.addNode directly to keep in sync with entries slice when evictions/cleanups occurred.
206+
c.addEntryToHNSWIndex(entryIndex, embedding)
208207

209208
logging.Debugf("InMemoryCache.AddPendingRequest: added pending entry (total entries: %d, embedding_dim: %d, useHNSW: %t)",
210209
len(c.entries), len(embedding), c.useHNSW)
@@ -269,8 +268,8 @@ func (c *InMemoryCache) AddEntry(requestID string, model string, query string, r
269268
c.mu.Lock()
270269
defer c.mu.Unlock()
271270

272-
// Clean up expired entries before adding new one
273-
c.cleanupExpiredEntries()
271+
// Remove expired entries to maintain cache hygiene, but defer the HNSW rebuild to the insertion below if HNSW is enabled.
272+
c.cleanupExpiredEntriesDeferred()
274273

275274
// Check if eviction is needed before adding the new entry
276275
if c.maxEntries > 0 && len(c.entries) >= c.maxEntries {
@@ -293,10 +292,8 @@ func (c *InMemoryCache) AddEntry(requestID string, model string, query string, r
293292
c.entries = append(c.entries, entry)
294293
entryIndex := len(c.entries) - 1
295294

296-
// Add to HNSW index if enabled
297-
if c.useHNSW && c.hnswIndex != nil {
298-
c.hnswIndex.addNode(entryIndex, embedding, c.entries)
299-
}
295+
// Add to HNSW index if enabled. Do not call c.hnswIndex.addNode directly to keep in sync with entries slice when evictions/cleanups occurred.
296+
c.addEntryToHNSWIndex(entryIndex, embedding)
300297

301298
logging.Debugf("InMemoryCache.AddEntry: added complete entry (total entries: %d, request_size: %d, response_size: %d, useHNSW: %t)",
302299
len(c.entries), len(requestBody), len(responseBody), c.useHNSW)
@@ -353,7 +350,20 @@ func (c *InMemoryCache) FindSimilarWithThreshold(model string, query string, thr
353350
now := time.Now()
354351

355352
// Use HNSW index for fast search if enabled
356-
if c.useHNSW && c.hnswIndex != nil && len(c.hnswIndex.nodes) > 0 {
353+
if c.useHNSW && c.hnswIndex != nil {
354+
// Defensive check and rebuild HNSW index if marked as needing rebuild.
355+
if c.hnswNeedsRebuild {
356+
logging.Debugf("InMemoryCache.FindSimilar: HNSW index marked as needing rebuild, rebuilding now")
357+
// Usually this is not reachable since we rebuild during insertions.
358+
c.mu.RUnlock()
359+
c.mu.Lock()
360+
if c.hnswNeedsRebuild { // Double-check under write lock
361+
c.rebuildHNSWIndex()
362+
}
363+
c.mu.Unlock()
364+
c.mu.RLock()
365+
}
366+
357367
// Search using HNSW index with configured ef parameter
358368
candidateIndices := c.hnswIndex.searchKNN(queryEmbedding, 10, c.hnswEfSearch, c.entries)
359369

@@ -538,9 +548,25 @@ func (c *InMemoryCache) GetStats() CacheStats {
538548
return stats
539549
}
540550

541-
// cleanupExpiredEntries removes entries that have exceeded their TTL and updates the cache entry count metric to keep metrics in sync.
542-
// Caller must hold a write lock
551+
// cleanupExpiredEntries removes entries that have exceeded their TTL and immediately rebuilds HNSW if HNSW is enabled and cleanup occurs.
552+
//
553+
// Caller must hold a write lock.
543554
func (c *InMemoryCache) cleanupExpiredEntries() {
555+
c.cleanupExpiredEntriesInternal(false)
556+
}
557+
558+
// cleanupExpiredEntriesDeferred removes expired entries.
559+
//
560+
// If HNSW is enabled and cleanup occurs, it marks HNSW as needing rebuild but defers the rebuild until next call to addEntryToHNSWIndex or rebuildHNSWIndex.
561+
// This is used in write paths that already plan to mutate the slice again (evictions, appends) so we only rebuild once per batch.
562+
//
563+
// Caller must hold a write lock.
564+
func (c *InMemoryCache) cleanupExpiredEntriesDeferred() {
565+
c.cleanupExpiredEntriesInternal(true)
566+
}
567+
568+
// cleanupExpiredEntriesInternal optionally postpones HNSW rebuild until the caller finishes batching updates.
569+
func (c *InMemoryCache) cleanupExpiredEntriesInternal(deferRebuild bool) {
544570
if c.ttlSeconds <= 0 {
545571
return
546572
}
@@ -572,9 +598,14 @@ func (c *InMemoryCache) cleanupExpiredEntries() {
572598
cleanupTime := time.Now()
573599
c.lastCleanupTime = &cleanupTime
574600

575-
// Rebuild HNSW index if entries were removed
601+
// Rebuild HNSW index if entries were removed and deferRebuild is false
576602
if expiredCount > 0 && c.useHNSW && c.hnswIndex != nil {
577-
c.rebuildHNSWIndex()
603+
logging.Debugf("InMemoryCache: TTL cleanup removed entries, marking HNSW index as needing rebuild")
604+
c.hnswNeedsRebuild = true
605+
c.hnswIndex.markStale()
606+
if !deferRebuild {
607+
c.rebuildHNSWIndex()
608+
}
578609
}
579610

580611
// Update metrics after cleanup
@@ -609,7 +640,28 @@ func (c *InMemoryCache) updateAccessInfo(entryIndex int, target CacheEntry) {
609640
}
610641
}
611642

612-
// evictOne removes one entry based on the configured eviction policy
643+
// addEntryToHNSWIndex adds a new entry to the HNSW index, rebuilding if hnswNeedsRebuild is true.
644+
// If HNSW is disabled, this is a no-op.
645+
//
646+
// Caller must hold a write lock.
647+
func (c *InMemoryCache) addEntryToHNSWIndex(entryIndex int, embedding []float32) {
648+
if !c.useHNSW || c.hnswIndex == nil {
649+
return
650+
}
651+
652+
if c.hnswNeedsRebuild {
653+
logging.Debugf("InMemoryCache.addEntryToHNSWIndex: HNSW index marked as needing rebuild, rebuilding now")
654+
c.rebuildHNSWIndex() // Rebuild HNSW index if stale
655+
} else {
656+
logging.Debugf("InMemoryCache.addEntryToHNSWIndex: adding new node to HNSW index for entryIndex=%d", entryIndex)
657+
c.hnswIndex.addNode(entryIndex, embedding, c.entries) // Not stale, just add the new node
658+
}
659+
}
660+
661+
// evictOne removes one entry based on the configured eviction policy.
662+
// It marks HNSW as needing rebuild if HNSW is enabled and an eviction occurs. HNSW will be rebuilt on next call to addEntryToHNSWIndex or rebuildHNSWIndex.
663+
//
664+
// Caller must hold a write lock.
613665
func (c *InMemoryCache) evictOne() {
614666
if len(c.entries) == 0 {
615667
return
@@ -625,8 +677,9 @@ func (c *InMemoryCache) evictOne() {
625677
// If using HNSW, we need to rebuild the index after eviction
626678
// For simplicity, we'll mark that a rebuild is needed
627679
if c.useHNSW && c.hnswIndex != nil {
628-
// Remove the node from HNSW index
629-
// Note: HNSW doesn't support efficient deletion, so we'll rebuild on next search if needed
680+
logging.Debugf("InMemoryCache.evictOne: HNSW index marked as needing rebuild due to eviction")
681+
// Note: HNSW doesn't support efficient deletion, leave the rebuild for the next insertion so we only rebuild once for eviction + append.
682+
c.hnswNeedsRebuild = true
630683
c.hnswIndex.markStale()
631684
}
632685

@@ -642,10 +695,11 @@ func (c *InMemoryCache) evictOne() {
642695

643696
// ===== HNSW Index Implementation =====
644697

645-
// rebuildHNSWIndex rebuilds the HNSW index from scratch
646-
// Caller must hold a write lock
698+
// rebuildHNSWIndex rebuilds the HNSW index from scratch.
699+
// Caller must hold a write lock.
647700
func (c *InMemoryCache) rebuildHNSWIndex() {
648-
if c.hnswIndex == nil {
701+
if !c.useHNSW || c.hnswIndex == nil {
702+
c.hnswNeedsRebuild = false
649703
return
650704
}
651705

@@ -665,6 +719,7 @@ func (c *InMemoryCache) rebuildHNSWIndex() {
665719
}
666720

667721
logging.Debugf("InMemoryCache: HNSW index rebuilt with %d nodes", len(c.hnswIndex.nodes))
722+
c.hnswNeedsRebuild = false
668723
}
669724

670725
// newHNSWIndex creates a new HNSW index
@@ -698,7 +753,9 @@ func (h *HNSWIndex) selectLevel() int {
698753
return int(r * h.ml)
699754
}
700755

701-
// addNode adds a new node to the HNSW index
756+
// addNode adds a new node to the HNSW index.
757+
//
758+
// For InMemoryCache, it is called via addEntryToHNSWIndex to keep in sync with entries slice.
702759
func (h *HNSWIndex) addNode(entryIndex int, embedding []float32, entries []CacheEntry) {
703760
level := h.selectLevel()
704761

0 commit comments

Comments
 (0)