From 6bff830893bf34759fadcc62de17cf579e27c466 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 16 Sep 2025 13:12:27 -0700 Subject: [PATCH 1/5] bugfix: use request id to locate the correct cache entry to update Signed-off-by: Alex Wang --- .../pkg/cache/cache_interface.go | 8 +- src/semantic-router/pkg/cache/cache_test.go | 20 ++--- .../pkg/cache/inmemory_cache.go | 18 ++-- src/semantic-router/pkg/cache/milvus_cache.go | 85 ++++++++++--------- .../pkg/extproc/error_metrics_test.go | 3 - .../pkg/extproc/metrics_integration_test.go | 2 - .../pkg/extproc/request_handler.go | 7 +- .../pkg/extproc/response_handler.go | 14 +-- src/semantic-router/pkg/extproc/router.go | 6 -- .../pkg/extproc/test_utils_test.go | 3 - .../pkg/extproc/testing_helpers_test.go | 5 -- tools/make/milvus.mk | 21 +++++ 12 files changed, 93 insertions(+), 99 deletions(-) diff --git a/src/semantic-router/pkg/cache/cache_interface.go b/src/semantic-router/pkg/cache/cache_interface.go index b1940f10..ceb18c26 100644 --- a/src/semantic-router/pkg/cache/cache_interface.go +++ b/src/semantic-router/pkg/cache/cache_interface.go @@ -4,6 +4,7 @@ import "time" // CacheEntry represents a complete cached request-response pair with associated metadata type CacheEntry struct { + RequestID string RequestBody []byte ResponseBody []byte Model string @@ -18,14 +19,13 @@ type CacheBackend interface { IsEnabled() bool // AddPendingRequest stores a request awaiting its response - // Returns the processed query string and any error - AddPendingRequest(model string, query string, requestBody []byte) (string, error) + AddPendingRequest(requestID string, model string, query string, requestBody []byte) error // UpdateWithResponse completes a pending request with the received response - UpdateWithResponse(query string, responseBody []byte) error + UpdateWithResponse(requestID string, responseBody []byte) error // AddEntry stores a complete request-response pair in the cache - AddEntry(model string, query string, requestBody, responseBody []byte) error + AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error // FindSimilar searches for semantically similar cached requests // Returns the cached response, match status, and any error diff --git a/src/semantic-router/pkg/cache/cache_test.go b/src/semantic-router/pkg/cache/cache_test.go index 31b9379f..e41d2e0c 100644 --- a/src/semantic-router/pkg/cache/cache_test.go +++ b/src/semantic-router/pkg/cache/cache_test.go @@ -442,7 +442,7 @@ development: }) It("should handle AddEntry operation with embeddings", func() { - err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response")) + err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response")) Expect(err).NotTo(HaveOccurred()) stats := inMemoryCache.GetStats() @@ -451,7 +451,7 @@ development: It("should handle FindSimilar operation with embeddings", func() { // First add an entry - err := inMemoryCache.AddEntry("test-model", "test query", []byte("request"), []byte("response")) + err := inMemoryCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response")) Expect(err).NotTo(HaveOccurred()) // Search for similar query @@ -468,12 +468,11 @@ development: }) It("should handle AddPendingRequest and UpdateWithResponse", func() { - query, err := inMemoryCache.AddPendingRequest("test-model", "test query", []byte("request")) + err := inMemoryCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request")) Expect(err).NotTo(HaveOccurred()) - Expect(query).To(Equal("test query")) // Update with response - err = inMemoryCache.UpdateWithResponse("test query", []byte("response")) + err = inMemoryCache.UpdateWithResponse("test-request-id", []byte("response")) Expect(err).NotTo(HaveOccurred()) // Should now be able to find it @@ -494,7 +493,7 @@ development: highThresholdCache := cache.NewInMemoryCache(highThresholdOptions) defer highThresholdCache.Close() - err := highThresholdCache.AddEntry("test-model", "machine learning", []byte("request"), []byte("ml response")) + err := highThresholdCache.AddEntry("test-request-id", "test-model", "machine learning", []byte("request"), []byte("ml response")) Expect(err).NotTo(HaveOccurred()) // Exact match should work @@ -512,7 +511,7 @@ development: It("should track hit and miss statistics", func() { // Add an entry with a specific query - err := inMemoryCache.AddEntry("test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI")) + err := inMemoryCache.AddEntry("test-request-id", "test-model", "What is machine learning?", []byte("request"), []byte("ML is a subset of AI")) Expect(err).NotTo(HaveOccurred()) // Search for the exact cached query (should be a hit) @@ -561,14 +560,13 @@ development: // Disabled cache operations should not error but should be no-ops // They should NOT try to generate embeddings - query, err := disabledCache.AddPendingRequest("model", "query", []byte("request")) + err := disabledCache.AddPendingRequest("test-request-id", "test-model", "test query", []byte("request")) Expect(err).NotTo(HaveOccurred()) - Expect(query).To(Equal("query")) - err = disabledCache.UpdateWithResponse("query", []byte("response")) + err = disabledCache.UpdateWithResponse("test-request-id", []byte("response")) Expect(err).NotTo(HaveOccurred()) - err = disabledCache.AddEntry("model", "query", []byte("request"), []byte("response")) + err = disabledCache.AddEntry("test-request-id", "test-model", "test query", []byte("request"), []byte("response")) Expect(err).NotTo(HaveOccurred()) response, found, err := disabledCache.FindSimilar("model", "query") diff --git a/src/semantic-router/pkg/cache/inmemory_cache.go b/src/semantic-router/pkg/cache/inmemory_cache.go index 9928683d..61fb8773 100644 --- a/src/semantic-router/pkg/cache/inmemory_cache.go +++ b/src/semantic-router/pkg/cache/inmemory_cache.go @@ -55,18 +55,18 @@ func (c *InMemoryCache) IsEnabled() bool { } // AddPendingRequest stores a request that is awaiting its response -func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) { +func (c *InMemoryCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error { start := time.Now() if !c.enabled { - return query, nil + return nil } // Generate semantic embedding for the query embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension if err != nil { metrics.RecordCacheOperation("memory", "add_pending", "error", time.Since(start).Seconds()) - return "", fmt.Errorf("failed to generate embedding: %w", err) + return fmt.Errorf("failed to generate embedding: %w", err) } c.mu.Lock() @@ -77,6 +77,7 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod // Create cache entry for the pending request entry := CacheEntry{ + RequestID: requestID, RequestBody: requestBody, Model: model, Query: query, @@ -110,11 +111,11 @@ func (c *InMemoryCache) AddPendingRequest(model string, query string, requestBod metrics.RecordCacheOperation("memory", "add_pending", "success", time.Since(start).Seconds()) metrics.UpdateCacheEntries("memory", len(c.entries)) - return query, nil + return nil } // UpdateWithResponse completes a pending request by adding the response -func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) error { +func (c *InMemoryCache) UpdateWithResponse(requestID string, responseBody []byte) error { start := time.Now() if !c.enabled { @@ -129,7 +130,7 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er // Locate the pending request and complete it for i, entry := range c.entries { - if entry.Query == query && entry.ResponseBody == nil { + if entry.RequestID == requestID && entry.ResponseBody == nil { // Complete the cache entry with the response c.entries[i].ResponseBody = responseBody c.entries[i].Timestamp = time.Now() @@ -144,11 +145,11 @@ func (c *InMemoryCache) UpdateWithResponse(query string, responseBody []byte) er // No matching pending request found metrics.RecordCacheOperation("memory", "update_response", "error", time.Since(start).Seconds()) - return fmt.Errorf("no pending request found for query: %s", query) + return fmt.Errorf("no pending request found for request ID: %s", requestID) } // AddEntry stores a complete request-response pair in the cache -func (c *InMemoryCache) AddEntry(model string, query string, requestBody, responseBody []byte) error { +func (c *InMemoryCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error { start := time.Now() if !c.enabled { @@ -163,6 +164,7 @@ func (c *InMemoryCache) AddEntry(model string, query string, requestBody, respon } entry := CacheEntry{ + RequestID: requestID, RequestBody: requestBody, ResponseBody: responseBody, Model: model, diff --git a/src/semantic-router/pkg/cache/milvus_cache.go b/src/semantic-router/pkg/cache/milvus_cache.go index a4edcde3..725f711b 100644 --- a/src/semantic-router/pkg/cache/milvus_cache.go +++ b/src/semantic-router/pkg/cache/milvus_cache.go @@ -263,6 +263,11 @@ func (c *MilvusCache) createCollection() error { PrimaryKey: true, TypeParams: map[string]string{"max_length": "64"}, }, + { + Name: "request_id", + DataType: entity.FieldTypeVarChar, + TypeParams: map[string]string{"max_length": "64"}, + }, { Name: "model", DataType: entity.FieldTypeVarChar, @@ -328,15 +333,15 @@ func (c *MilvusCache) IsEnabled() bool { } // AddPendingRequest stores a request that is awaiting its response -func (c *MilvusCache) AddPendingRequest(model string, query string, requestBody []byte) (string, error) { +func (c *MilvusCache) AddPendingRequest(requestID string, model string, query string, requestBody []byte) error { start := time.Now() if !c.enabled { - return query, nil + return nil } // Store incomplete entry for later completion with response - result, err := c.addEntry(model, query, requestBody, nil) + err := c.addEntry("", requestID, model, query, requestBody, nil) if err != nil { metrics.RecordCacheOperation("milvus", "add_pending", "error", time.Since(start).Seconds()) @@ -344,34 +349,29 @@ func (c *MilvusCache) AddPendingRequest(model string, query string, requestBody metrics.RecordCacheOperation("milvus", "add_pending", "success", time.Since(start).Seconds()) } - return result, err + return err } // UpdateWithResponse completes a pending request by adding the response -func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) error { +func (c *MilvusCache) UpdateWithResponse(requestID string, responseBody []byte) error { start := time.Now() if !c.enabled { return nil } - queryPreview := query - if len(query) > 50 { - queryPreview = query[:50] + "..." - } - - observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (query: %s, response_size: %d)", - queryPreview, len(responseBody)) + observability.Debugf("MilvusCache.UpdateWithResponse: updating pending entry (request_id: %s, response_size: %d)", + requestID, len(responseBody)) // Find the pending entry and complete it with the response // Query for the incomplete entry to retrieve its metadata ctx := context.Background() - queryExpr := fmt.Sprintf("query == \"%s\" && response_body == \"\"", query) + queryExpr := fmt.Sprintf("request_id == \"%s\" && response_body == \"\"", requestID) observability.Debugf("MilvusCache.UpdateWithResponse: searching for pending entry with expr: %s", queryExpr) results, err := c.client.Query(ctx, c.collectionName, []string{}, queryExpr, - []string{"model", "request_body"}) + []string{"id", "model", "query", "request_body"}) if err != nil { observability.Debugf("MilvusCache.UpdateWithResponse: query failed: %v", err) @@ -380,29 +380,27 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro } if len(results) == 0 { - observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found, adding as new complete entry") - // Create new complete entry when no pending entry exists - _, err := c.addEntry("unknown", query, []byte(""), responseBody) - if err != nil { - metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds()) - } else { - metrics.RecordCacheOperation("milvus", "update_response", "success", time.Since(start).Seconds()) - } - return err + observability.Debugf("MilvusCache.UpdateWithResponse: no pending entry found") + metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds()) + return fmt.Errorf("no pending entry found") } // Get the model and request body from the pending entry - modelColumn := results[0].(*entity.ColumnVarChar) - requestColumn := results[1].(*entity.ColumnVarChar) + idColumn := results[0].(*entity.ColumnVarChar) + modelColumn := results[1].(*entity.ColumnVarChar) + queryColumn := results[2].(*entity.ColumnVarChar) + requestColumn := results[3].(*entity.ColumnVarChar) - if modelColumn.Len() > 0 { + if idColumn.Len() > 0 { + id := idColumn.Data()[0] model := modelColumn.Data()[0] + query := queryColumn.Data()[0] requestBody := requestColumn.Data()[0] - observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (model: %s)", model) + observability.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)", id, model) // Create the complete entry with response data - _, err := c.addEntry(model, query, []byte(requestBody), responseBody) + err := c.addEntry(id, requestID, model, query, []byte(requestBody), responseBody) if err != nil { metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds()) return fmt.Errorf("failed to add complete entry: %w", err) @@ -416,14 +414,14 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro } // AddEntry stores a complete request-response pair in the cache -func (c *MilvusCache) AddEntry(model string, query string, requestBody, responseBody []byte) error { +func (c *MilvusCache) AddEntry(requestID string, model string, query string, requestBody, responseBody []byte) error { start := time.Now() if !c.enabled { return nil } - _, err := c.addEntry(model, query, requestBody, responseBody) + err := c.addEntry("", requestID, model, query, requestBody, responseBody) if err != nil { metrics.RecordCacheOperation("milvus", "add_entry", "error", time.Since(start).Seconds()) @@ -435,20 +433,23 @@ func (c *MilvusCache) AddEntry(model string, query string, requestBody, response } // addEntry handles the internal logic for storing entries in Milvus -func (c *MilvusCache) addEntry(model string, query string, requestBody, responseBody []byte) (string, error) { +func (c *MilvusCache) addEntry(id string, requestID string, model string, query string, requestBody, responseBody []byte) error { // Generate semantic embedding for the query embedding, err := candle_binding.GetEmbedding(query, 0) // Auto-detect dimension if err != nil { - return "", fmt.Errorf("failed to generate embedding: %w", err) + return fmt.Errorf("failed to generate embedding: %w", err) } - // Generate unique ID - id := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s_%s_%d", model, query, time.Now().UnixNano())))) + // Generate unique ID if not provided + if id == "" { + id = fmt.Sprintf("%x", md5.Sum(fmt.Appendf(nil, "%s_%s_%d", model, query, time.Now().UnixNano()))) + } ctx := context.Background() - // Prepare data for insertion + // Prepare data for upsert ids := []string{id} + requestIDs := []string{requestID} models := []string{model} queries := []string{query} requestBodies := []string{string(requestBody)} @@ -458,6 +459,7 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response // Create columns idColumn := entity.NewColumnVarChar("id", ids) + requestIDColumn := entity.NewColumnVarChar("request_id", requestIDs) modelColumn := entity.NewColumnVarChar("model", models) queryColumn := entity.NewColumnVarChar("query", queries) requestColumn := entity.NewColumnVarChar("request_body", requestBodies) @@ -465,13 +467,13 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response embeddingColumn := entity.NewColumnFloatVector(c.config.Collection.VectorField.Name, len(embedding), embeddings) timestampColumn := entity.NewColumnInt64("timestamp", timestamps) - // Insert the entry into the collection - observability.Debugf("MilvusCache.addEntry: inserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)", + // Upsert the entry into the collection + observability.Debugf("MilvusCache.addEntry: upserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)", c.collectionName, len(embedding), len(requestBody), len(responseBody)) - _, err = c.client.Insert(ctx, c.collectionName, "", idColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn) + _, err = c.client.Upsert(ctx, c.collectionName, "", idColumn, requestIDColumn, modelColumn, queryColumn, requestColumn, responseColumn, embeddingColumn, timestampColumn) if err != nil { - observability.Debugf("MilvusCache.addEntry: insert failed: %v", err) - return "", fmt.Errorf("failed to insert cache entry: %w", err) + observability.Debugf("MilvusCache.addEntry: upsert failed: %v", err) + return fmt.Errorf("failed to upsert cache entry: %w", err) } // Ensure data is persisted to storage @@ -483,11 +485,12 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response observability.LogEvent("cache_entry_added", map[string]interface{}{ "backend": "milvus", "collection": c.collectionName, + "request_id": requestID, "query": query, "model": model, "embedding_dimension": len(embedding), }) - return query, nil + return nil } // FindSimilar searches for semantically similar cached requests diff --git a/src/semantic-router/pkg/extproc/error_metrics_test.go b/src/semantic-router/pkg/extproc/error_metrics_test.go index 01303c71..b7544804 100644 --- a/src/semantic-router/pkg/extproc/error_metrics_test.go +++ b/src/semantic-router/pkg/extproc/error_metrics_test.go @@ -43,7 +43,6 @@ func getCounterValue(metricName string, want map[string]string) float64 { func TestRequestParseErrorIncrementsErrorCounter(t *testing.T) { r := &OpenAIRouter{} - r.InitializeForTesting() ctx := &RequestContext{} // Invalid JSON triggers parse error @@ -65,7 +64,6 @@ func TestRequestParseErrorIncrementsErrorCounter(t *testing.T) { func TestResponseParseErrorIncrementsErrorCounter(t *testing.T) { r := &OpenAIRouter{} - r.InitializeForTesting() ctx := &RequestContext{RequestModel: "model-a"} // Invalid JSON triggers parse error in response body handler @@ -86,7 +84,6 @@ func TestResponseParseErrorIncrementsErrorCounter(t *testing.T) { func TestUpstreamStatusIncrements4xx5xxCounters(t *testing.T) { r := &OpenAIRouter{} - r.InitializeForTesting() ctx := &RequestContext{RequestModel: "m"} diff --git a/src/semantic-router/pkg/extproc/metrics_integration_test.go b/src/semantic-router/pkg/extproc/metrics_integration_test.go index 662d6d78..c16dd4ca 100644 --- a/src/semantic-router/pkg/extproc/metrics_integration_test.go +++ b/src/semantic-router/pkg/extproc/metrics_integration_test.go @@ -45,8 +45,6 @@ var _ = Describe("Metrics recording", func() { BeforeEach(func() { // Use a minimal router that doesn't require external models router = &OpenAIRouter{} - // Initialize internal maps used by handlers - router.InitializeForTesting() }) It("records TTFT on response headers", func() { diff --git a/src/semantic-router/pkg/extproc/request_handler.go b/src/semantic-router/pkg/extproc/request_handler.go index 25a21fd8..a1ced99d 100644 --- a/src/semantic-router/pkg/extproc/request_handler.go +++ b/src/semantic-router/pkg/extproc/request_handler.go @@ -276,13 +276,10 @@ func (r *OpenAIRouter) handleCaching(ctx *RequestContext) (*ext_proc.ProcessingR } // Cache miss, store the request for later - cacheID, err := r.Cache.AddPendingRequest(requestModel, requestQuery, ctx.OriginalRequestBody) + err = r.Cache.AddPendingRequest(ctx.RequestID, requestModel, requestQuery, ctx.OriginalRequestBody) if err != nil { observability.Errorf("Error adding pending request to cache: %v", err) - } else { - r.pendingRequestsLock.Lock() - r.pendingRequests[ctx.RequestID] = []byte(cacheID) - r.pendingRequestsLock.Unlock() + // Continue without caching } } diff --git a/src/semantic-router/pkg/extproc/response_handler.go b/src/semantic-router/pkg/extproc/response_handler.go index 4edef993..6e79f2a0 100644 --- a/src/semantic-router/pkg/extproc/response_handler.go +++ b/src/semantic-router/pkg/extproc/response_handler.go @@ -145,17 +145,9 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response } } - // Check if this request has a pending cache entry - r.pendingRequestsLock.Lock() - cacheID, exists := r.pendingRequests[ctx.RequestID] - if exists { - delete(r.pendingRequests, ctx.RequestID) - } - r.pendingRequestsLock.Unlock() - - // If we have a pending request, update the cache - if exists && ctx.RequestQuery != "" && responseBody != nil { - err := r.Cache.UpdateWithResponse(string(cacheID), responseBody) + // Update the cache + if ctx.RequestQuery != "" && responseBody != nil { + err := r.Cache.UpdateWithResponse(ctx.RequestID, responseBody) if err != nil { observability.Errorf("Error updating cache: %v", err) // Continue even if cache update fails diff --git a/src/semantic-router/pkg/extproc/router.go b/src/semantic-router/pkg/extproc/router.go index 004cf5d4..e333cd48 100644 --- a/src/semantic-router/pkg/extproc/router.go +++ b/src/semantic-router/pkg/extproc/router.go @@ -2,7 +2,6 @@ package extproc import ( "fmt" - "sync" ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -25,10 +24,6 @@ type OpenAIRouter struct { PIIChecker *pii.PolicyChecker Cache cache.CacheBackend ToolsDatabase *tools.ToolsDatabase - - // Map to track pending requests and their unique IDs - pendingRequests map[string][]byte - pendingRequestsLock sync.Mutex } // Ensure OpenAIRouter implements the ext_proc calls @@ -161,7 +156,6 @@ func NewOpenAIRouter(configPath string) (*OpenAIRouter, error) { PIIChecker: piiChecker, Cache: semanticCache, ToolsDatabase: toolsDatabase, - pendingRequests: make(map[string][]byte), } // Log reasoning configuration after router is created diff --git a/src/semantic-router/pkg/extproc/test_utils_test.go b/src/semantic-router/pkg/extproc/test_utils_test.go index 23d38776..924acfe3 100644 --- a/src/semantic-router/pkg/extproc/test_utils_test.go +++ b/src/semantic-router/pkg/extproc/test_utils_test.go @@ -246,8 +246,5 @@ func CreateTestRouter(cfg *config.RouterConfig) (*extproc.OpenAIRouter, error) { ToolsDatabase: toolsDatabase, } - // Initialize internal fields for testing - router.InitializeForTesting() - return router, nil } diff --git a/src/semantic-router/pkg/extproc/testing_helpers_test.go b/src/semantic-router/pkg/extproc/testing_helpers_test.go index afc48f07..492ca099 100644 --- a/src/semantic-router/pkg/extproc/testing_helpers_test.go +++ b/src/semantic-router/pkg/extproc/testing_helpers_test.go @@ -25,8 +25,3 @@ func (r *OpenAIRouter) HandleResponseHeaders(v *ext_proc.ProcessingRequest_Respo func (r *OpenAIRouter) HandleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) { return r.handleResponseBody(v, ctx) } - -// InitializeForTesting initializes the internal maps and mutexes for testing -func (r *OpenAIRouter) InitializeForTesting() { - r.pendingRequests = make(map[string][]byte) -} diff --git a/tools/make/milvus.mk b/tools/make/milvus.mk index 075792a9..d435dd22 100644 --- a/tools/make/milvus.mk +++ b/tools/make/milvus.mk @@ -66,3 +66,24 @@ test-semantic-router-milvus: build-router start-milvus @export LD_LIBRARY_PATH=$${PWD}/candle-binding/target/release && \ cd src/semantic-router && CGO_ENABLED=1 go test -tags=milvus -v ./... @echo "Consider running 'make stop-milvus' when done testing" + +# Milvus UI (Attu) management +start-milvus-ui: ## Start Attu UI to browse Milvus data + @$(LOG_TARGET) + @echo "Starting Attu (Milvus UI) with $(CONTAINER_RUNTIME)..." + @$(CONTAINER_RUNTIME) run -d \ + --name milvus-ui \ + --add-host=host.docker.internal:host-gateway \ + -e MILVUS_URL=host.docker.internal:19530 \ + -p 18000:3000 \ + zilliz/attu:v2.3.5 + @echo "Waiting for Attu to be ready..." + @sleep 3 + @echo "Open UI: http://localhost:18000 (Milvus at host.docker.internal:19530)" + +stop-milvus-ui: + @$(LOG_TARGET) + @echo "Stopping Attu (Milvus UI) container..." + @$(CONTAINER_RUNTIME) stop milvus-ui || true + @$(CONTAINER_RUNTIME) rm milvus-ui || true + @echo "Attu container stopped and removed" \ No newline at end of file From 61500ee1e3c5b873f03bb7a0c265913c17602da2 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 16 Sep 2025 21:09:19 -0700 Subject: [PATCH 2/5] update docs Signed-off-by: Alex Wang --- CONTRIBUTING.md | 1 + tools/make/common.mk | 2 ++ 2 files changed, 3 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 01f91297..8280fc6d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -205,6 +205,7 @@ The test suite includes: - **Envoy logs:** Check the terminal running `make run-envoy` for detailed request/response logs - **Router logs:** Check the terminal running `make run-router` for classification and routing decisions - **Rust library:** Use `RUST_LOG=debug` environment variable for detailed Rust logs +- **Go library:** Use `SR_LOG_LEVEL=debug` environment variable for detailed Go logs ## Code Style and Standards diff --git a/tools/make/common.mk b/tools/make/common.mk index 9faa3c49..1bd527fd 100644 --- a/tools/make/common.mk +++ b/tools/make/common.mk @@ -63,6 +63,8 @@ help: @echo " clean-milvus - Stop container and clean data" @echo " test-milvus-cache - Test cache with Milvus backend" @echo " test-semantic-router-milvus - Test router with Milvus cache" + @echo " start-milvus-ui - Start Milvus UI to browse data" + @echo " stop-milvus-ui - Stop and remove Milvus UI container" @echo " Example: CONTAINER_RUNTIME=podman make start-milvus" @echo "" @echo " Demo targets:" From 2e17ed0aecb4912b487d5d6a5c7e9408d11f7023 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 16 Sep 2025 21:15:40 -0700 Subject: [PATCH 3/5] fix if condition Signed-off-by: Alex Wang --- src/semantic-router/pkg/extproc/response_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/semantic-router/pkg/extproc/response_handler.go b/src/semantic-router/pkg/extproc/response_handler.go index 6e79f2a0..5fbe9711 100644 --- a/src/semantic-router/pkg/extproc/response_handler.go +++ b/src/semantic-router/pkg/extproc/response_handler.go @@ -146,7 +146,7 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response } // Update the cache - if ctx.RequestQuery != "" && responseBody != nil { + if ctx.RequestID != "" && responseBody != nil { err := r.Cache.UpdateWithResponse(ctx.RequestID, responseBody) if err != nil { observability.Errorf("Error updating cache: %v", err) From bf1a38c27b41f054ad14138bccd34b9db86e237f Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 16 Sep 2025 21:52:49 -0700 Subject: [PATCH 4/5] fix unit test failure Signed-off-by: Alex Wang --- src/semantic-router/pkg/extproc/metrics_integration_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/semantic-router/pkg/extproc/metrics_integration_test.go b/src/semantic-router/pkg/extproc/metrics_integration_test.go index c16dd4ca..397318a8 100644 --- a/src/semantic-router/pkg/extproc/metrics_integration_test.go +++ b/src/semantic-router/pkg/extproc/metrics_integration_test.go @@ -11,6 +11,7 @@ import ( ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/cache" ) func getHistogramSampleCount(metricName, model string) uint64 { @@ -44,7 +45,9 @@ var _ = Describe("Metrics recording", func() { BeforeEach(func() { // Use a minimal router that doesn't require external models - router = &OpenAIRouter{} + router = &OpenAIRouter{ + Cache: cache.NewInMemoryCache(cache.InMemoryCacheOptions{Enabled: false}), + } }) It("records TTFT on response headers", func() { From 7835da24304a095a1968c8368832ba25374ae66b Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 17 Sep 2025 09:23:00 -0700 Subject: [PATCH 5/5] trigger ci Signed-off-by: Alex Wang