diff --git a/src/semantic-router/pkg/cache/cache_test.go b/src/semantic-router/pkg/cache/cache_test.go index 11c1efa9..4f537728 100644 --- a/src/semantic-router/pkg/cache/cache_test.go +++ b/src/semantic-router/pkg/cache/cache_test.go @@ -576,6 +576,30 @@ development: Expect(stats.HitRatio).To(Equal(0.5)) }) + It("should skip expired entries during similarity search", func() { + ttlCache := cache.NewInMemoryCache(cache.InMemoryCacheOptions{ + Enabled: true, + SimilarityThreshold: 0.1, + MaxEntries: 10, + TTLSeconds: 1, + }) + defer ttlCache.Close() + + err := ttlCache.AddEntry("ttl-request-id", "ttl-model", "time-sensitive query", []byte("request"), []byte("response")) + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(1100 * time.Millisecond) + + response, found, err := ttlCache.FindSimilar("ttl-model", "time-sensitive query") + Expect(err).NotTo(HaveOccurred()) + Expect(found).To(BeFalse()) + Expect(response).To(BeNil()) + + stats := ttlCache.GetStats() + Expect(stats.HitCount).To(Equal(int64(0))) + Expect(stats.MissCount).To(Equal(int64(1))) + }) + It("should handle error when updating non-existent pending request", func() { err := inMemoryCache.UpdateWithResponse("non-existent-query", []byte("response")) Expect(err).To(HaveOccurred()) diff --git a/src/semantic-router/pkg/cache/inmemory_cache.go b/src/semantic-router/pkg/cache/inmemory_cache.go index c76e92e5..10386420 100644 --- a/src/semantic-router/pkg/cache/inmemory_cache.go +++ b/src/semantic-router/pkg/cache/inmemory_cache.go @@ -230,21 +230,21 @@ func (c *InMemoryCache) FindSimilar(model string, query string) ([]byte, bool, e } c.mu.RLock() - - // Check for expired entries during search - c.cleanupExpiredEntriesReadOnly() - var ( bestIndex = -1 bestEntry CacheEntry bestSimilarity float32 entriesChecked int + expiredCount int ) + // Capture the lookup time after acquiring the read lock so TTL checks aren’t skewed by embedding work or lock wait + now := time.Now() // Compare with completed entries for the same model, tracking only the best match for entryIndex, entry := range c.entries { + // Skip incomplete entries if entry.ResponseBody == nil { - continue // Skip incomplete entries + continue } // Only consider entries for the same model @@ -252,6 +252,12 @@ func (c *InMemoryCache) FindSimilar(model string, query string) ([]byte, bool, e continue } + // Skip entries that have expired before considering them + if c.isExpired(entry, now) { + expiredCount++ + continue + } + // Compute semantic similarity using dot product var dotProduct float32 for i := 0; i < len(queryEmbedding) && i < len(entry.Embedding); i++ { @@ -272,6 +278,17 @@ func (c *InMemoryCache) FindSimilar(model string, query string) ([]byte, bool, e // Unlock the read lock since we need the write lock to update the access info c.mu.RUnlock() + // Log if any expired entries were skipped + if expiredCount > 0 { + observability.Debugf("InMemoryCache: excluded %d expired entries during search (TTL: %ds)", + expiredCount, c.ttlSeconds) + observability.LogEvent("cache_expired_entries_found", map[string]interface{}{ + "backend": "memory", + "expired_count": expiredCount, + "ttl_seconds": c.ttlSeconds, + }) + } + // Handle case where no suitable entries exist if bestIndex < 0 { atomic.AddInt64(&c.missCount, 1) @@ -371,7 +388,7 @@ func (c *InMemoryCache) cleanupExpiredEntries() { for _, entry := range c.entries { // Retain entries that are still within their TTL based on last access - if now.Sub(entry.LastAccessAt).Seconds() < float64(c.ttlSeconds) { + if !c.isExpired(entry, now) { validEntries = append(validEntries, entry) } } @@ -397,31 +414,13 @@ func (c *InMemoryCache) cleanupExpiredEntries() { metrics.UpdateCacheEntries("memory", len(c.entries)) } -// cleanupExpiredEntriesReadOnly identifies expired entries without modifying the cache -// Used during read operations with only a read lock held -func (c *InMemoryCache) cleanupExpiredEntriesReadOnly() { +// isExpired checks if a cache entry has expired based on its last access time +func (c *InMemoryCache) isExpired(entry CacheEntry, now time.Time) bool { if c.ttlSeconds <= 0 { - return - } - - now := time.Now() - expiredCount := 0 - - for _, entry := range c.entries { - if now.Sub(entry.LastAccessAt).Seconds() >= float64(c.ttlSeconds) { - expiredCount++ - } + return false } - if expiredCount > 0 { - observability.Debugf("InMemoryCache: found %d expired entries during read (TTL: %ds)", - expiredCount, c.ttlSeconds) - observability.LogEvent("cache_expired_entries_found", map[string]interface{}{ - "backend": "memory", - "expired_count": expiredCount, - "ttl_seconds": c.ttlSeconds, - }) - } + return now.Sub(entry.LastAccessAt) >= time.Duration(c.ttlSeconds)*time.Second } // updateAccessInfo updates the access information for the given entry index