Skip to content

Commit 6e79ddb

Browse files
authored
fix: use request id to locate the correct cache entry to update (#154)
* bugfix: use request id to locate the correct cache entry to update Signed-off-by: Alex Wang <[email protected]> * update docs Signed-off-by: Alex Wang <[email protected]> * fix if condition Signed-off-by: Alex Wang <[email protected]> * fix unit test failure Signed-off-by: Alex Wang <[email protected]> * trigger ci Signed-off-by: Alex Wang <[email protected]> --------- Signed-off-by: Alex Wang <[email protected]>
1 parent 4a58c08 commit 6e79ddb

14 files changed

+100
-100
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ The test suite includes:
205205
- **Envoy logs:** Check the terminal running `make run-envoy` for detailed request/response logs
206206
- **Router logs:** Check the terminal running `make run-router` for classification and routing decisions
207207
- **Rust library:** Use `RUST_LOG=debug` environment variable for detailed Rust logs
208+
- **Go library:** Use `SR_LOG_LEVEL=debug` environment variable for detailed Go logs
208209

209210
## Code Style and Standards
210211

src/semantic-router/pkg/cache/cache_interface.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import "time"
44

55
// CacheEntry represents a complete cached request-response pair with associated metadata
66
type CacheEntry struct {
7+
RequestID string
78
RequestBody []byte
89
ResponseBody []byte
910
Model string
@@ -18,14 +19,13 @@ type CacheBackend interface {
1819
IsEnabled() bool
1920

2021
// AddPendingRequest stores a request awaiting its response
21-
// Returns the processed query string and any error
22-
AddPendingRequest(model string, query string, requestBody []byte) (string, error)
22+
AddPendingRequest(requestID string, model string, query string, requestBody []byte) error
2323

2424
// UpdateWithResponse completes a pending request with the received response
25-
UpdateWithResponse(query string, responseBody []byte) error
25+
UpdateWithResponse(requestID string, responseBody []byte) error
2626

2727
// AddEntry stores a complete request-response pair in the cache
28-
AddEntry(model string, query string, requestBody, responseBody []byte) error
28+
AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error
2929

3030
// FindSimilar searches for semantically similar cached requests
3131
// Returns the cached response, match status, and any error

src/semantic-router/pkg/cache/cache_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ development:
442442
})
443443

444444
It("should handle AddEntry operation with embeddings", func() {
445-
err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response"))
445+
err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
446446
Expect(err).NotTo(HaveOccurred())
447447

448448
stats := inMemoryCache.GetStats()
@@ -451,7 +451,7 @@ development:
451451

452452
It("should handle FindSimilar operation with embeddings", func() {
453453
// First add an entry
454-
err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response"))
454+
err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
455455
Expect(err).NotTo(HaveOccurred())
456456

457457
// Search for similar query
@@ -468,12 +468,11 @@ development:
468468
})
469469

470470
It("should handle AddPendingRequest and UpdateWithResponse", func() {
471-
query, err := inMemoryCache.AddPendingRequest("test-model", "test query", []byte("request"))
471+
err := inMemoryCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request"))
472472
Expect(err).NotTo(HaveOccurred())
473-
Expect(query).To(Equal("test query"))
474473

475474
// Update with response
476-
err = inMemoryCache.UpdateWithResponse("test query", []byte("response"))
475+
err = inMemoryCache.UpdateWithResponse("test-request-id", []byte("response"))
477476
Expect(err).NotTo(HaveOccurred())
478477

479478
// Should now be able to find it
@@ -494,7 +493,7 @@ development:
494493
highThresholdCache := cache.NewInMemoryCache(highThresholdOptions)
495494
defer highThresholdCache.Close()
496495

497-
err := highThresholdCache.AddEntry("test-model", "machine learning", []byte("request"), []byte("ml response"))
496+
err := highThresholdCache.AddEntry("test-request-id", "test-model", "machine learning", []byte("request"), []byte("ml response"))
498497
Expect(err).NotTo(HaveOccurred())
499498

500499
// Exact match should work
@@ -512,7 +511,7 @@ development:
512511

513512
It("should track hit and miss statistics", func() {
514513
// Add an entry with a specific query
515-
err := inMemoryCache.AddEntry("test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI"))
514+
err := inMemoryCache.AddEntry("test-request-id", "test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI"))
516515
Expect(err).NotTo(HaveOccurred())
517516

518517
// Search for the exact cached query (should be a hit)
@@ -561,14 +560,13 @@ development:
561560

562561
// Disabled cache operations should not error but should be no-ops
563562
// They should NOT try to generate embeddings
564-
query, err := disabledCache.AddPendingRequest("model", "query", []byte("request"))
563+
err := disabledCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request"))
565564
Expect(err).NotTo(HaveOccurred())
566-
Expect(query).To(Equal("query"))
567565

568-
err = disabledCache.UpdateWithResponse("query", []byte("response"))
566+
err = disabledCache.UpdateWithResponse("test-request-id", []byte("response"))
569567
Expect(err).NotTo(HaveOccurred())
570568

571-
err = disabledCache.AddEntry("model", "query", []byte("request"), []byte("response"))
569+
err = disabledCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response"))
572570
Expect(err).NotTo(HaveOccurred())
573571

574572
response, found, err := disabledCache.FindSimilar("model", "query")

src/semantic-router/pkg/cache/inmemory_cache.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,18 @@ func (c *InMemoryCache) IsEnabled() bool {
5555
}
5656

5757
// AddPendingRequest stores a request that is awaiting its response
58-
func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) {
58+
func (c *InMemoryCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
5959
start := time.Now()
6060

6161
if !c.enabled {
62-
return query, nil
62+
return nil
6363
}
6464

6565
// Generate semantic embedding for the query
6666
embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension
6767
if err != nil {
6868
metrics.RecordCacheOperation("memory", "add_pending", "error", time.Since(start).Seconds())
69-
return "", fmt.Errorf("failed to generate embedding: %w", err)
69+
return fmt.Errorf("failed to generate embedding: %w", err)
7070
}
7171

7272
c.mu.Lock()
@@ -77,6 +77,7 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod
7777

7878
// Create cache entry for the pending request
7979
entry := CacheEntry{
80+
RequestID: requestID,
8081
RequestBody: requestBody,
8182
Model: model,
8283
Query: query,
@@ -110,11 +111,11 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod
110111
metrics.RecordCacheOperation("memory", "add_pending", "success", time.Since(start).Seconds())
111112
metrics.UpdateCacheEntries("memory", len(c.entries))
112113

113-
return query, nil
114+
return nil
114115
}
115116

116117
// UpdateWithResponse completes a pending request by adding the response
117-
func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) error {
118+
func (c *InMemoryCache) UpdateWithResponse(requestID string, responseBody []byte) error {
118119
start := time.Now()
119120

120121
if !c.enabled {
@@ -129,7 +130,7 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er
129130

130131
// Locate the pending request and complete it
131132
for i, entry := range c.entries {
132-
if entry.Query == query && entry.ResponseBody == nil {
133+
if entry.RequestID == requestID && entry.ResponseBody == nil {
133134
// Complete the cache entry with the response
134135
c.entries[i].ResponseBody = responseBody
135136
c.entries[i].Timestamp = time.Now()
@@ -144,11 +145,11 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er
144145

145146
// No matching pending request found
146147
metrics.RecordCacheOperation("memory", "update_response", "error", time.Since(start).Seconds())
147-
return fmt.Errorf("no pending request found for query: %s", query)
148+
return fmt.Errorf("no pending request found for request ID: %s", requestID)
148149
}
149150

150151
// AddEntry stores a complete request-response pair in the cache
151-
func (c *InMemoryCache) AddEntry(model string, query string, requestBody, responseBody []byte) error {
152+
func (c *InMemoryCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error {
152153
start := time.Now()
153154

154155
if !c.enabled {
@@ -163,6 +164,7 @@ func (c *InMemoryCache) AddEntry(model string, query string, requestBody, respon
163164
}
164165

165166
entry := CacheEntry{
167+
RequestID: requestID,
166168
RequestBody: requestBody,
167169
ResponseBody: responseBody,
168170
Model: model,

src/semantic-router/pkg/cache/milvus_cache.go

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ func (c *MilvusCache) createCollection() error {
263263
PrimaryKey: true,
264264
TypeParams: map[string]string{"max_length": "64"},
265265
},
266+
{
267+
Name: "request_id",
268+
DataType: entity.FieldTypeVarChar,
269+
TypeParams: map[string]string{"max_length": "64"},
270+
},
266271
{
267272
Name: "model",
268273
DataType: entity.FieldTypeVarChar,
@@ -328,50 +333,45 @@ func (c *MilvusCache) IsEnabled() bool {
328333
}
329334

330335
// AddPendingRequest stores a request that is awaiting its response
331-
func (c *MilvusCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) {
336+
func (c *MilvusCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error {
332337
start := time.Now()
333338

334339
if !c.enabled {
335-
return query, nil
340+
return nil
336341
}
337342

338343
// Store incomplete entry for later completion with response
339-
result, err := c.addEntry(model, query, requestBody, nil)
344+
err := c.addEntry("", requestID, model, query, requestBody, nil)
340345

341346
if err != nil {
342347
metrics.RecordCacheOperation("milvus", "add_pending", "error", time.Since(start).Seconds())
343348
} else {
344349
metrics.RecordCacheOperation("milvus", "add_pending", "success", time.Since(start).Seconds())
345350
}
346351

347-
return result, err
352+
return err
348353
}
349354

350355
// UpdateWithResponse completes a pending request by adding the response
351-
func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) error {
356+
func (c *MilvusCache) UpdateWithResponse(requestID string, responseBody []byte) error {
352357
start := time.Now()
353358

354359
if !c.enabled {
355360
return nil
356361
}
357362

358-
queryPreview := query
359-
if len(query) > 50 {
360-
queryPreview = query[:50] + "..."
361-
}
362-
363-
observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (query: %s, response_size: %d)",
364-
queryPreview, len(responseBody))
363+
observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (request_id: %s, response_size: %d)",
364+
requestID, len(responseBody))
365365

366366
// Find the pending entry and complete it with the response
367367
// Query for the incomplete entry to retrieve its metadata
368368
ctx := context.Background()
369-
queryExpr := fmt.Sprintf("query == \"%s\" && response_body == \"\"", query)
369+
queryExpr := fmt.Sprintf("request_id == \"%s\" && response_body == \"\"", requestID)
370370

371371
observability.Debugf("MilvusCache.UpdateWithResponse: searching for pending entry with expr: %s", queryExpr)
372372

373373
results, err := c.client.Query(ctx, c.collectionName, []string{}, queryExpr,
374-
[]string{"model", "request_body"})
374+
[]string{"id", "model", "query", "request_body"})
375375

376376
if err != nil {
377377
observability.Debugf("MilvusCache.UpdateWithResponse: query failed: %v", err)
@@ -380,29 +380,27 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
380380
}
381381

382382
if len(results) == 0 {
383-
observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found, adding as new complete entry")
384-
// Create new complete entry when no pending entry exists
385-
_, err := c.addEntry("unknown", query, []byte(""), responseBody)
386-
if err != nil {
387-
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
388-
} else {
389-
metrics.RecordCacheOperation("milvus", "update_response", "success", time.Since(start).Seconds())
390-
}
391-
return err
383+
observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found")
384+
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
385+
return fmt.Errorf("no pending entry found")
392386
}
393387

394388
// Get the model and request body from the pending entry
395-
modelColumn := results[0].(*entity.ColumnVarChar)
396-
requestColumn := results[1].(*entity.ColumnVarChar)
389+
idColumn := results[0].(*entity.ColumnVarChar)
390+
modelColumn := results[1].(*entity.ColumnVarChar)
391+
queryColumn := results[2].(*entity.ColumnVarChar)
392+
requestColumn := results[3].(*entity.ColumnVarChar)
397393

398-
if modelColumn.Len() > 0 {
394+
if idColumn.Len() > 0 {
395+
id := idColumn.Data()[0]
399396
model := modelColumn.Data()[0]
397+
query := queryColumn.Data()[0]
400398
requestBody := requestColumn.Data()[0]
401399

402-
observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (model: %s)", model)
400+
observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)", id, model)
403401

404402
// Create the complete entry with response data
405-
_, err := c.addEntry(model, query, []byte(requestBody), responseBody)
403+
err := c.addEntry(id, requestID, model, query, []byte(requestBody), responseBody)
406404
if err != nil {
407405
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
408406
return fmt.Errorf("failed to add complete entry: %w", err)
@@ -416,14 +414,14 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
416414
}
417415

418416
// AddEntry stores a complete request-response pair in the cache
419-
func (c *MilvusCache) AddEntry(model string, query string, requestBody, responseBody []byte) error {
417+
func (c *MilvusCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error {
420418
start := time.Now()
421419

422420
if !c.enabled {
423421
return nil
424422
}
425423

426-
_, err := c.addEntry(model, query, requestBody, responseBody)
424+
err := c.addEntry("", requestID, model, query, requestBody, responseBody)
427425

428426
if err != nil {
429427
metrics.RecordCacheOperation("milvus", "add_entry", "error", time.Since(start).Seconds())
@@ -435,20 +433,23 @@ func (c *MilvusCache) AddEntry(model string, query string, requestBody, response
435433
}
436434

437435
// addEntry handles the internal logic for storing entries in Milvus
438-
func (c *MilvusCache) addEntry(model string, query string, requestBody, responseBody []byte) (string, error) {
436+
func (c *MilvusCache) addEntry(id string, requestID string, model string, query string, requestBody, responseBody []byte) error {
439437
// Generate semantic embedding for the query
440438
embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension
441439
if err != nil {
442-
return "", fmt.Errorf("failed to generate embedding: %w", err)
440+
return fmt.Errorf("failed to generate embedding: %w", err)
443441
}
444442

445-
// Generate unique ID
446-
id := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s_%s_%d", model, query, time.Now().UnixNano()))))
443+
// Generate unique ID if not provided
444+
if id == "" {
445+
id = fmt.Sprintf("%x", md5.Sum(fmt.Appendf(nil, "%s_%s_%d", model, query, time.Now().UnixNano())))
446+
}
447447

448448
ctx := context.Background()
449449

450-
// Prepare data for insertion
450+
// Prepare data for upsert
451451
ids := []string{id}
452+
requestIDs := []string{requestID}
452453
models := []string{model}
453454
queries := []string{query}
454455
requestBodies := []string{string(requestBody)}
@@ -458,20 +459,21 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response
458459

459460
// Create columns
460461
idColumn := entity.NewColumnVarChar("id", ids)
462+
requestIDColumn := entity.NewColumnVarChar("request_id", requestIDs)
461463
modelColumn := entity.NewColumnVarChar("model", models)
462464
queryColumn := entity.NewColumnVarChar("query", queries)
463465
requestColumn := entity.NewColumnVarChar("request_body", requestBodies)
464466
responseColumn := entity.NewColumnVarChar("response_body", responseBodies)
465467
embeddingColumn := entity.NewColumnFloatVector(c.config.Collection.VectorField.Name, len(embedding), embeddings)
466468
timestampColumn := entity.NewColumnInt64("timestamp", timestamps)
467469

468-
// Insert the entry into the collection
469-
observability.Debugf("MilvusCache.addEntry: inserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)",
470+
// Upsert the entry into the collection
471+
observability.Debugf("MilvusCache.addEntry: upserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)",
470472
c.collectionName, len(embedding), len(requestBody), len(responseBody))
471-
_, err = c.client.Insert(ctx, c.collectionName, "", idColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn)
473+
_, err = c.client.Upsert(ctx, c.collectionName, "", idColumn, requestIDColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn)
472474
if err != nil {
473-
observability.Debugf("MilvusCache.addEntry: insert failed: %v", err)
474-
return "", fmt.Errorf("failed to insert cache entry: %w", err)
475+
observability.Debugf("MilvusCache.addEntry: upsert failed: %v", err)
476+
return fmt.Errorf("failed to upsert cache entry: %w", err)
475477
}
476478

477479
// Ensure data is persisted to storage
@@ -483,11 +485,12 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response
483485
observability.LogEvent("cache_entry_added", map[string]interface{}{
484486
"backend": "milvus",
485487
"collection": c.collectionName,
488+
"request_id": requestID,
486489
"query": query,
487490
"model": model,
488491
"embedding_dimension": len(embedding),
489492
})
490-
return query, nil
493+
return nil
491494
}
492495

493496
// FindSimilar searches for semantically similar cached requests

0 commit comments

Comments
 (0)