Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ The test suite includes:
- **Envoy logs:** Check the terminal running `make run-envoy` for detailed request/response logs
- **Router logs:** Check the terminal running `make run-router` for classification and routing decisions
- **Rust library:** Use `RUST_LOG=debug` environment variable for detailed Rust logs
- **Go library:** Use `SR_LOG_LEVEL=debug` environment variable for detailed Go logs

## Code Style and Standards

Expand Down
8 changes: 4 additions & 4 deletions src/semantic-router/pkg/cache/cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// CacheEntry represents a complete cached request-response pair with associated metadata
type CacheEntry struct {
RequestID string
RequestBody []byte
ResponseBody []byte
Model string
Expand All @@ -18,14 +19,13 @@ type CacheBackend interface {
IsEnabled() bool

// AddPendingRequest stores a request awaiting its response
// Returns the processed query string and any error
AddPendingRequest(model string, query string, requestBody []byte) (string, error)
AddPendingRequest(requestID string, model string, query string, requestBody []byte) error

// UpdateWithResponse completes a pending request with the received response
UpdateWithResponse(query string, responseBody []byte) error
UpdateWithResponse(requestID string, responseBody []byte) error

// AddEntry stores a complete request-response pair in the cache
AddEntry(model string, query string, requestBody, responseBody []byte) error
AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error

// FindSimilar searches for semantically similar cached requests
// Returns the cached response, match status, and any error
Expand Down
20 changes: 9 additions & 11 deletions src/semantic-router/pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ development:
})

It("should handle AddEntry operation with embeddings", func() {
err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response"))
err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
Expect(err).NotTo(HaveOccurred())

stats := inMemoryCache.GetStats()
Expand All @@ -451,7 +451,7 @@ development:

It("should handle FindSimilar operation with embeddings", func() {
// First add an entry
err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response"))
err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
Expect(err).NotTo(HaveOccurred())

// Search for similar query
Expand All @@ -468,12 +468,11 @@ development:
})

It("should handle AddPendingRequest and UpdateWithResponse", func() {
query, err := inMemoryCache.AddPendingRequest("test-model", "test query", []byte("request"))
err := inMemoryCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request"))
Expect(err).NotTo(HaveOccurred())
Expect(query).To(Equal("test query"))

// Update with response
err = inMemoryCache.UpdateWithResponse("test query", []byte("response"))
err = inMemoryCache.UpdateWithResponse("test-request-id", []byte("response"))
Expect(err).NotTo(HaveOccurred())

// Should now be able to find it
Expand All @@ -494,7 +493,7 @@ development:
highThresholdCache := cache.NewInMemoryCache(highThresholdOptions)
defer highThresholdCache.Close()

err := highThresholdCache.AddEntry("test-model", "machine learning", []byte("request"), []byte("ml response"))
err := highThresholdCache.AddEntry("test-request-id", "test-model", "machine learning", []byte("request"), []byte("ml response"))
Expect(err).NotTo(HaveOccurred())

// Exact match should work
Expand All @@ -512,7 +511,7 @@ development:

It("should track hit and miss statistics", func() {
// Add an entry with a specific query
err := inMemoryCache.AddEntry("test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI"))
err := inMemoryCache.AddEntry("test-request-id", "test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI"))
Expect(err).NotTo(HaveOccurred())

// Search for the exact cached query (should be a hit)
Expand Down Expand Up @@ -561,14 +560,13 @@ development:

// Disabled cache operations should not error but should be no-ops
// They should NOT try to generate embeddings
query, err := disabledCache.AddPendingRequest("model", "query", []byte("request"))
err := disabledCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request"))
Expect(err).NotTo(HaveOccurred())
Expect(query).To(Equal("query"))

err = disabledCache.UpdateWithResponse("query", []byte("response"))
err = disabledCache.UpdateWithResponse("test-request-id", []byte("response"))
Expect(err).NotTo(HaveOccurred())

err = disabledCache.AddEntry("model", "query", []byte("request"), []byte("response"))
err = disabledCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
Expect(err).NotTo(HaveOccurred())

response, found, err := disabledCache.FindSimilar("model", "query")
Expand Down
18 changes: 10 additions & 8 deletions src/semantic-router/pkg/cache/inmemory_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ func (c *InMemoryCache) IsEnabled() bool {
}

// AddPendingRequest stores a request that is awaiting its response
func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) {
func (c *InMemoryCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
start := time.Now()

if !c.enabled {
return query, nil
return nil
}

// Generate semantic embedding for the query
embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension
if err != nil {
metrics.RecordCacheOperation("memory", "add_pending", "error", time.Since(start).Seconds())
return "", fmt.Errorf("failed to generate embedding: %w", err)
return fmt.Errorf("failed to generate embedding: %w", err)
}

c.mu.Lock()
Expand All @@ -77,6 +77,7 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod

// Create cache entry for the pending request
entry := CacheEntry{
RequestID: requestID,
RequestBody: requestBody,
Model: model,
Query: query,
Expand Down Expand Up @@ -110,11 +111,11 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod
metrics.RecordCacheOperation("memory", "add_pending", "success", time.Since(start).Seconds())
metrics.UpdateCacheEntries("memory", len(c.entries))

return query, nil
return nil
}

// UpdateWithResponse completes a pending request by adding the response
func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) error {
func (c *InMemoryCache) UpdateWithResponse(requestID string, responseBody []byte) error {
start := time.Now()

if !c.enabled {
Expand All @@ -129,7 +130,7 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er

// Locate the pending request and complete it
for i, entry := range c.entries {
if entry.Query == query && entry.ResponseBody == nil {
if entry.RequestID == requestID && entry.ResponseBody == nil {
// Complete the cache entry with the response
c.entries[i].ResponseBody = responseBody
c.entries[i].Timestamp = time.Now()
Expand All @@ -144,11 +145,11 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er

// No matching pending request found
metrics.RecordCacheOperation("memory", "update_response", "error", time.Since(start).Seconds())
return fmt.Errorf("no pending request found for query: %s", query)
return fmt.Errorf("no pending request found for request ID: %s", requestID)
}

// AddEntry stores a complete request-response pair in the cache
func (c *InMemoryCache) AddEntry(model string, query string, requestBody, responseBody []byte) error {
func (c *InMemoryCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error {
start := time.Now()

if !c.enabled {
Expand All @@ -163,6 +164,7 @@ func (c *InMemoryCache) AddEntry(model string, query string, requestBody, respon
}

entry := CacheEntry{
RequestID: requestID,
RequestBody: requestBody,
ResponseBody: responseBody,
Model: model,
Expand Down
85 changes: 44 additions & 41 deletions src/semantic-router/pkg/cache/milvus_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ func (c *MilvusCache) createCollection() error {
PrimaryKey: true,
TypeParams: map[string]string{"max_length": "64"},
},
{
Name: "request_id",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{"max_length": "64"},
},
{
Name: "model",
DataType: entity.FieldTypeVarChar,
Expand Down Expand Up @@ -328,50 +333,45 @@ func (c *MilvusCache) IsEnabled() bool {
}

// AddPendingRequest stores a request that is awaiting its response
func (c *MilvusCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) {
func (c *MilvusCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
start := time.Now()

if !c.enabled {
return query, nil
return nil
}

// Store incomplete entry for later completion with response
result, err := c.addEntry(model, query, requestBody, nil)
err := c.addEntry("", requestID, model, query, requestBody, nil)

if err != nil {
metrics.RecordCacheOperation("milvus", "add_pending", "error", time.Since(start).Seconds())
} else {
metrics.RecordCacheOperation("milvus", "add_pending", "success", time.Since(start).Seconds())
}

return result, err
return err
}

// UpdateWithResponse completes a pending request by adding the response
func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) error {
func (c *MilvusCache) UpdateWithResponse(requestID string, responseBody []byte) error {
start := time.Now()

if !c.enabled {
return nil
}

queryPreview := query
if len(query) > 50 {
queryPreview = query[:50] + "..."
}

observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (query: %s, response_size: %d)",
queryPreview, len(responseBody))
observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (request_id: %s, response_size: %d)",
requestID, len(responseBody))

// Find the pending entry and complete it with the response
// Query for the incomplete entry to retrieve its metadata
ctx := context.Background()
queryExpr := fmt.Sprintf("query == \"%s\" && response_body == \"\"", query)
queryExpr := fmt.Sprintf("request_id == \"%s\" && response_body == \"\"", requestID)

observability.Debugf("MilvusCache.UpdateWithResponse: searching for pending entry with expr: %s", queryExpr)

results, err := c.client.Query(ctx, c.collectionName, []string{}, queryExpr,
[]string{"model", "request_body"})
[]string{"id", "model", "query", "request_body"})

if err != nil {
observability.Debugf("MilvusCache.UpdateWithResponse: query failed: %v", err)
Expand All @@ -380,29 +380,27 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
}

if len(results) == 0 {
observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found, adding as new complete entry")
// Create new complete entry when no pending entry exists
_, err := c.addEntry("unknown", query, []byte(""), responseBody)
if err != nil {
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
} else {
metrics.RecordCacheOperation("milvus", "update_response", "success", time.Since(start).Seconds())
}
return err
observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found")
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
return fmt.Errorf("no pending entry found")
}

// Get the model and request body from the pending entry
modelColumn := results[0].(*entity.ColumnVarChar)
requestColumn := results[1].(*entity.ColumnVarChar)
idColumn := results[0].(*entity.ColumnVarChar)
modelColumn := results[1].(*entity.ColumnVarChar)
queryColumn := results[2].(*entity.ColumnVarChar)
requestColumn := results[3].(*entity.ColumnVarChar)

if modelColumn.Len() > 0 {
if idColumn.Len() > 0 {
id := idColumn.Data()[0]
model := modelColumn.Data()[0]
query := queryColumn.Data()[0]
requestBody := requestColumn.Data()[0]

observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (model: %s)", model)
observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)", id, model)

// Create the complete entry with response data
_, err := c.addEntry(model, query, []byte(requestBody), responseBody)
err := c.addEntry(id, requestID, model, query, []byte(requestBody), responseBody)
if err != nil {
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
return fmt.Errorf("failed to add complete entry: %w", err)
Expand All @@ -416,14 +414,14 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
}

// AddEntry stores a complete request-response pair in the cache
func (c *MilvusCache) AddEntry(model string, query string, requestBody, responseBody []byte) error {
func (c *MilvusCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error {
start := time.Now()

if !c.enabled {
return nil
}

_, err := c.addEntry(model, query, requestBody, responseBody)
err := c.addEntry("", requestID, model, query, requestBody, responseBody)

if err != nil {
metrics.RecordCacheOperation("milvus", "add_entry", "error", time.Since(start).Seconds())
Expand All @@ -435,20 +433,23 @@ func (c *MilvusCache) AddEntry(model string, query string, requestBody, response
}

// addEntry handles the internal logic for storing entries in Milvus
func (c *MilvusCache) addEntry(model string, query string, requestBody, responseBody []byte) (string, error) {
func (c *MilvusCache) addEntry(id string, requestID string, model string, query string, requestBody, responseBody []byte) error {
// Generate semantic embedding for the query
embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension
if err != nil {
return "", fmt.Errorf("failed to generate embedding: %w", err)
return fmt.Errorf("failed to generate embedding: %w", err)
}

// Generate unique ID
id := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s_%s_%d", model, query, time.Now().UnixNano()))))
// Generate unique ID if not provided
if id == "" {
id = fmt.Sprintf("%x", md5.Sum(fmt.Appendf(nil, "%s_%s_%d", model, query, time.Now().UnixNano())))
}

ctx := context.Background()

// Prepare data for insertion
// Prepare data for upsert
ids := []string{id}
requestIDs := []string{requestID}
models := []string{model}
queries := []string{query}
requestBodies := []string{string(requestBody)}
Expand All @@ -458,20 +459,21 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response

// Create columns
idColumn := entity.NewColumnVarChar("id", ids)
requestIDColumn := entity.NewColumnVarChar("request_id", requestIDs)
modelColumn := entity.NewColumnVarChar("model", models)
queryColumn := entity.NewColumnVarChar("query", queries)
requestColumn := entity.NewColumnVarChar("request_body", requestBodies)
responseColumn := entity.NewColumnVarChar("response_body", responseBodies)
embeddingColumn := entity.NewColumnFloatVector(c.config.Collection.VectorField.Name, len(embedding), embeddings)
timestampColumn := entity.NewColumnInt64("timestamp", timestamps)

// Insert the entry into the collection
observability.Debugf("MilvusCache.addEntry: inserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)",
// Upsert the entry into the collection
observability.Debugf("MilvusCache.addEntry: upserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)",
c.collectionName, len(embedding), len(requestBody), len(responseBody))
_, err = c.client.Insert(ctx, c.collectionName, "", idColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn)
_, err = c.client.Upsert(ctx, c.collectionName, "", idColumn, requestIDColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn)
if err != nil {
observability.Debugf("MilvusCache.addEntry: insert failed: %v", err)
return "", fmt.Errorf("failed to insert cache entry: %w", err)
observability.Debugf("MilvusCache.addEntry: upsert failed: %v", err)
return fmt.Errorf("failed to upsert cache entry: %w", err)
}

// Ensure data is persisted to storage
Expand All @@ -483,11 +485,12 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response
observability.LogEvent("cache_entry_added", map[string]interface{}{
"backend": "milvus",
"collection": c.collectionName,
"request_id": requestID,
"query": query,
"model": model,
"embedding_dimension": len(embedding),
})
return query, nil
return nil
}

// FindSimilar searches for semantically similar cached requests
Expand Down
Loading
Loading