@@ -263,6 +263,11 @@ func (c *MilvusCache) createCollection() error {
263263 PrimaryKey : true ,
264264 TypeParams : map [string ]string {"max_length" : "64" },
265265 },
266+ {
267+ Name : "request_id" ,
268+ DataType : entity .FieldTypeVarChar ,
269+ TypeParams : map [string ]string {"max_length" : "64" },
270+ },
266271 {
267272 Name : "model" ,
268273 DataType : entity .FieldTypeVarChar ,
@@ -328,50 +333,45 @@ func (c *MilvusCache) IsEnabled() bool {
328333}
329334
330335// AddPendingRequest stores a request that is awaiting its response
331- func (c * MilvusCache ) AddPendingRequest (model string , query string , requestBody []byte ) ( string , error ) {
336+ func (c * MilvusCache ) AddPendingRequest (requestID string , model string , query string , requestBody []byte ) error {
332337 start := time .Now ()
333338
334339 if ! c .enabled {
335- return query , nil
340+ return nil
336341 }
337342
338343 // Store incomplete entry for later completion with response
339- result , err := c .addEntry (model , query , requestBody , nil )
344+ err := c .addEntry ("" , requestID , model , query , requestBody , nil )
340345
341346 if err != nil {
342347 metrics .RecordCacheOperation ("milvus" , "add_pending" , "error" , time .Since (start ).Seconds ())
343348 } else {
344349 metrics .RecordCacheOperation ("milvus" , "add_pending" , "success" , time .Since (start ).Seconds ())
345350 }
346351
347- return result , err
352+ return err
348353}
349354
350355// UpdateWithResponse completes a pending request by adding the response
351- func (c * MilvusCache ) UpdateWithResponse (query string , responseBody []byte ) error {
356+ func (c * MilvusCache ) UpdateWithResponse (requestID string , responseBody []byte ) error {
352357 start := time .Now ()
353358
354359 if ! c .enabled {
355360 return nil
356361 }
357362
358- queryPreview := query
359- if len (query ) > 50 {
360- queryPreview = query [:50 ] + "..."
361- }
362-
363- observability .Debugf ("MilvusCache.UpdateWithResponse: updating pending entry (query: %s, response_size: %d)" ,
364- queryPreview , len (responseBody ))
363+ observability .Debugf ("MilvusCache.UpdateWithResponse: updating pending entry (request_id: %s, response_size: %d)" ,
364+ requestID , len (responseBody ))
365365
366366 // Find the pending entry and complete it with the response
367367 // Query for the incomplete entry to retrieve its metadata
368368 ctx := context .Background ()
369- queryExpr := fmt .Sprintf ("query == \" %s\" && response_body == \" \" " , query )
369+ queryExpr := fmt .Sprintf ("request_id == \" %s\" && response_body == \" \" " , requestID )
370370
371371 observability .Debugf ("MilvusCache.UpdateWithResponse: searching for pending entry with expr: %s" , queryExpr )
372372
373373 results , err := c .client .Query (ctx , c .collectionName , []string {}, queryExpr ,
374- []string {"model" , "request_body" })
374+ []string {"id" , " model" , "query " , "request_body" })
375375
376376 if err != nil {
377377 observability .Debugf ("MilvusCache.UpdateWithResponse: query failed: %v" , err )
@@ -380,29 +380,27 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
380380 }
381381
382382 if len (results ) == 0 {
383- observability .Debugf ("MilvusCache.UpdateWithResponse: no pending entry found, adding as new complete entry" )
384- // Create new complete entry when no pending entry exists
385- _ , err := c .addEntry ("unknown" , query , []byte ("" ), responseBody )
386- if err != nil {
387- metrics .RecordCacheOperation ("milvus" , "update_response" , "error" , time .Since (start ).Seconds ())
388- } else {
389- metrics .RecordCacheOperation ("milvus" , "update_response" , "success" , time .Since (start ).Seconds ())
390- }
391- return err
383+ observability .Debugf ("MilvusCache.UpdateWithResponse: no pending entry found" )
384+ metrics .RecordCacheOperation ("milvus" , "update_response" , "error" , time .Since (start ).Seconds ())
385+ return fmt .Errorf ("no pending entry found" )
392386 }
393387
394388 // Get the model and request body from the pending entry
395- modelColumn := results [0 ].(* entity.ColumnVarChar )
396- requestColumn := results [1 ].(* entity.ColumnVarChar )
389+ idColumn := results [0 ].(* entity.ColumnVarChar )
390+ modelColumn := results [1 ].(* entity.ColumnVarChar )
391+ queryColumn := results [2 ].(* entity.ColumnVarChar )
392+ requestColumn := results [3 ].(* entity.ColumnVarChar )
397393
398- if modelColumn .Len () > 0 {
394+ if idColumn .Len () > 0 {
395+ id := idColumn .Data ()[0 ]
399396 model := modelColumn .Data ()[0 ]
397+ query := queryColumn .Data ()[0 ]
400398 requestBody := requestColumn .Data ()[0 ]
401399
402- observability .Debugf ("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (model: %s)" , model )
400+ observability .Debugf ("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)" , id , model )
403401
404402 // Create the complete entry with response data
405- _ , err := c .addEntry (model , query , []byte (requestBody ), responseBody )
403+ err := c .addEntry (id , requestID , model , query , []byte (requestBody ), responseBody )
406404 if err != nil {
407405 metrics .RecordCacheOperation ("milvus" , "update_response" , "error" , time .Since (start ).Seconds ())
408406 return fmt .Errorf ("failed to add complete entry: %w" , err )
@@ -416,14 +414,14 @@ func (c *MilvusCache) UpdateWithResponse(query string, responseBody []byte) erro
416414}
417415
418416// AddEntry stores a complete request-response pair in the cache
419- func (c * MilvusCache ) AddEntry (model string , query string , requestBody , responseBody []byte ) error {
417+ func (c * MilvusCache ) AddEntry (requestID string , model string , query string , requestBody , responseBody []byte ) error {
420418 start := time .Now ()
421419
422420 if ! c .enabled {
423421 return nil
424422 }
425423
426- _ , err := c .addEntry (model , query , requestBody , responseBody )
424+ err := c .addEntry ("" , requestID , model , query , requestBody , responseBody )
427425
428426 if err != nil {
429427 metrics .RecordCacheOperation ("milvus" , "add_entry" , "error" , time .Since (start ).Seconds ())
@@ -435,20 +433,23 @@ func (c *MilvusCache) AddEntry(model string, query string, requestBody, response
435433}
436434
437435// addEntry handles the internal logic for storing entries in Milvus
438- func (c * MilvusCache ) addEntry (model string , query string , requestBody , responseBody []byte ) ( string , error ) {
436+ func (c * MilvusCache ) addEntry (id string , requestID string , model string , query string , requestBody , responseBody []byte ) error {
439437 // Generate semantic embedding for the query
440438 embedding , err := candle_binding .GetEmbedding (query , 0 ) // Auto-detect dimension
441439 if err != nil {
442- return "" , fmt .Errorf ("failed to generate embedding: %w" , err )
440+ return fmt .Errorf ("failed to generate embedding: %w" , err )
443441 }
444442
445- // Generate unique ID
446- id := fmt .Sprintf ("%x" , md5 .Sum ([]byte (fmt .Sprintf ("%s_%s_%d" , model , query , time .Now ().UnixNano ()))))
443+ // Generate unique ID if not provided
444+ if id == "" {
445+ id = fmt .Sprintf ("%x" , md5 .Sum (fmt .Appendf (nil , "%s_%s_%d" , model , query , time .Now ().UnixNano ())))
446+ }
447447
448448 ctx := context .Background ()
449449
450- // Prepare data for insertion
450+ // Prepare data for upsert
451451 ids := []string {id }
452+ requestIDs := []string {requestID }
452453 models := []string {model }
453454 queries := []string {query }
454455 requestBodies := []string {string (requestBody )}
@@ -458,20 +459,21 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response
458459
459460 // Create columns
460461 idColumn := entity .NewColumnVarChar ("id" , ids )
462+ requestIDColumn := entity .NewColumnVarChar ("request_id" , requestIDs )
461463 modelColumn := entity .NewColumnVarChar ("model" , models )
462464 queryColumn := entity .NewColumnVarChar ("query" , queries )
463465 requestColumn := entity .NewColumnVarChar ("request_body" , requestBodies )
464466 responseColumn := entity .NewColumnVarChar ("response_body" , responseBodies )
465467 embeddingColumn := entity .NewColumnFloatVector (c .config .Collection .VectorField .Name , len (embedding ), embeddings )
466468 timestampColumn := entity .NewColumnInt64 ("timestamp" , timestamps )
467469
468- // Insert the entry into the collection
469- observability .Debugf ("MilvusCache.addEntry: inserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)" ,
470+ // Upsert the entry into the collection
471+ observability .Debugf ("MilvusCache.addEntry: upserting entry into collection '%s' (embedding_dim: %d, request_size: %d, response_size: %d)" ,
470472 c .collectionName , len (embedding ), len (requestBody ), len (responseBody ))
471- _ , err = c .client .Insert (ctx , c .collectionName , "" , idColumn , modelColumn , queryColumn , requestColumn , responseColumn , embeddingColumn , timestampColumn )
473+ _ , err = c .client .Upsert (ctx , c .collectionName , "" , idColumn , requestIDColumn , modelColumn , queryColumn , requestColumn , responseColumn , embeddingColumn , timestampColumn )
472474 if err != nil {
473- observability .Debugf ("MilvusCache.addEntry: insert failed: %v" , err )
474- return "" , fmt .Errorf ("failed to insert cache entry: %w" , err )
475+ observability .Debugf ("MilvusCache.addEntry: upsert failed: %v" , err )
476+ return fmt .Errorf ("failed to upsert cache entry: %w" , err )
475477 }
476478
477479 // Ensure data is persisted to storage
@@ -483,11 +485,12 @@ func (c *MilvusCache) addEntry(model string, query string, requestBody, response
483485 observability .LogEvent ("cache_entry_added" , map [string ]interface {}{
484486 "backend" : "milvus" ,
485487 "collection" : c .collectionName ,
488+ "request_id" : requestID ,
486489 "query" : query ,
487490 "model" : model ,
488491 "embedding_dimension" : len (embedding ),
489492 })
490- return query , nil
493+ return nil
491494}
492495
493496// FindSimilar searches for semantically similar cached requests
0 commit comments