Skip to content

Commit 88aa203

Browse files
o11y: Add request error counters (#132)
Signed-off-by: Jintao Zhang <[email protected]>
1 parent d3c6d91 commit 88aa203

File tree

6 files changed

+231
-5
lines changed

6 files changed

+231
-5
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package extproc
2+
3+
import (
4+
"testing"
5+
6+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
7+
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
8+
"github.com/prometheus/client_golang/prometheus"
9+
dto "github.com/prometheus/client_model/go"
10+
)
11+
12+
// getCounterValue returns the sum of a counter across metrics matching the given labels
13+
func getCounterValue(metricName string, want map[string]string) float64 {
14+
var sum float64
15+
mfs, _ := prometheus.DefaultGatherer.Gather()
16+
for _, fam := range mfs {
17+
if fam.GetName() != metricName || fam.GetType() != dto.MetricType_COUNTER {
18+
continue
19+
}
20+
for _, m := range fam.GetMetric() {
21+
labels := m.GetLabel()
22+
match := true
23+
for k, v := range want {
24+
found := false
25+
for _, l := range labels {
26+
if l.GetName() == k && l.GetValue() == v {
27+
found = true
28+
break
29+
}
30+
}
31+
if !found {
32+
match = false
33+
break
34+
}
35+
}
36+
if match && m.GetCounter() != nil {
37+
sum += m.GetCounter().GetValue()
38+
}
39+
}
40+
}
41+
return sum
42+
}
43+
44+
func TestRequestParseErrorIncrementsErrorCounter(t *testing.T) {
45+
r := &OpenAIRouter{}
46+
r.InitializeForTesting()
47+
48+
ctx := &RequestContext{}
49+
// Invalid JSON triggers parse error
50+
badBody := []byte("not-json")
51+
v := &ext_proc.ProcessingRequest_RequestBody{
52+
RequestBody: &ext_proc.HttpBody{Body: badBody},
53+
}
54+
55+
before := getCounterValue("llm_request_errors_total", map[string]string{"reason": "parse_error", "model": "unknown"})
56+
57+
// Use test helper wrapper to access unexported method
58+
_, _ = r.HandleRequestBody(v, ctx)
59+
60+
after := getCounterValue("llm_request_errors_total", map[string]string{"reason": "parse_error", "model": "unknown"})
61+
if !(after > before) {
62+
t.Fatalf("expected llm_request_errors_total(parse_error,unknown) to increase: before=%v after=%v", before, after)
63+
}
64+
}
65+
66+
func TestResponseParseErrorIncrementsErrorCounter(t *testing.T) {
67+
r := &OpenAIRouter{}
68+
r.InitializeForTesting()
69+
70+
ctx := &RequestContext{RequestModel: "model-a"}
71+
// Invalid JSON triggers parse error in response body handler
72+
badJSON := []byte("{invalid}")
73+
v := &ext_proc.ProcessingRequest_ResponseBody{
74+
ResponseBody: &ext_proc.HttpBody{Body: badJSON},
75+
}
76+
77+
before := getCounterValue("llm_request_errors_total", map[string]string{"reason": "parse_error", "model": "model-a"})
78+
79+
_, _ = r.HandleResponseBody(v, ctx)
80+
81+
after := getCounterValue("llm_request_errors_total", map[string]string{"reason": "parse_error", "model": "model-a"})
82+
if !(after > before) {
83+
t.Fatalf("expected llm_request_errors_total(parse_error,model-a) to increase: before=%v after=%v", before, after)
84+
}
85+
}
86+
87+
func TestUpstreamStatusIncrements4xx5xxCounters(t *testing.T) {
88+
r := &OpenAIRouter{}
89+
r.InitializeForTesting()
90+
91+
ctx := &RequestContext{RequestModel: "m"}
92+
93+
// 503 -> upstream_5xx
94+
hdrs5xx := &ext_proc.ProcessingRequest_ResponseHeaders{
95+
ResponseHeaders: &ext_proc.HttpHeaders{
96+
Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: ":status", Value: "503"}}},
97+
},
98+
}
99+
100+
before5xx := getCounterValue("llm_request_errors_total", map[string]string{"reason": "upstream_5xx", "model": "m"})
101+
_, _ = r.HandleResponseHeaders(hdrs5xx, ctx)
102+
after5xx := getCounterValue("llm_request_errors_total", map[string]string{"reason": "upstream_5xx", "model": "m"})
103+
if !(after5xx > before5xx) {
104+
t.Fatalf("expected upstream_5xx to increase for model m: before=%v after=%v", before5xx, after5xx)
105+
}
106+
107+
// 404 -> upstream_4xx
108+
hdrs4xx := &ext_proc.ProcessingRequest_ResponseHeaders{
109+
ResponseHeaders: &ext_proc.HttpHeaders{
110+
Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: ":status", Value: "404"}}},
111+
},
112+
}
113+
114+
before4xx := getCounterValue("llm_request_errors_total", map[string]string{"reason": "upstream_4xx", "model": "m"})
115+
_, _ = r.HandleResponseHeaders(hdrs4xx, ctx)
116+
after4xx := getCounterValue("llm_request_errors_total", map[string]string{"reason": "upstream_4xx", "model": "m"})
117+
if !(after4xx > before4xx) {
118+
t.Fatalf("expected upstream_4xx to increase for model m: before=%v after=%v", before4xx, after4xx)
119+
}
120+
}

src/semantic-router/pkg/extproc/processor.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log"
88

99
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
10+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/metrics"
1011
"google.golang.org/grpc/codes"
1112
"google.golang.org/grpc/status"
1213
)
@@ -32,15 +33,26 @@ func (r *OpenAIRouter) Process(stream ext_proc.ExternalProcessor_ProcessServer)
3233
// Handle gRPC status-based cancellations/timeouts
3334
if s, ok := status.FromError(err); ok {
3435
switch s.Code() {
35-
case codes.Canceled, codes.DeadlineExceeded:
36+
case codes.Canceled:
3637
log.Println("Stream canceled gracefully")
38+
metrics.RecordRequestError(ctx.RequestModel, "cancellation")
39+
return nil
40+
case codes.DeadlineExceeded:
41+
log.Println("Stream deadline exceeded")
42+
metrics.RecordRequestError(ctx.RequestModel, "timeout")
3743
return nil
3844
}
3945
}
4046

4147
// Handle context cancellation from the server-side context
42-
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
48+
if errors.Is(err, context.Canceled) {
4349
log.Println("Stream canceled gracefully")
50+
metrics.RecordRequestError(ctx.RequestModel, "cancellation")
51+
return nil
52+
}
53+
if errors.Is(err, context.DeadlineExceeded) {
54+
log.Println("Stream deadline exceeded")
55+
metrics.RecordRequestError(ctx.RequestModel, "timeout")
4456
return nil
4557
}
4658

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,23 @@ func (r *OpenAIRouter) handleRequestBody(v *ext_proc.ProcessingRequest_RequestBo
164164
openAIRequest, err := parseOpenAIRequest(ctx.OriginalRequestBody)
165165
if err != nil {
166166
log.Printf("Error parsing OpenAI request: %v", err)
167+
// Attempt to determine model for labeling (may be unknown here)
168+
metrics.RecordRequestError(ctx.RequestModel, "parse_error")
169+
// Count this request as well, with unknown model if necessary
170+
metrics.RecordModelRequest(ctx.RequestModel)
167171
return nil, status.Errorf(codes.InvalidArgument, "invalid request body: %v", err)
168172
}
169173

170174
// Store the original model
171175
originalModel := string(openAIRequest.Model)
172176
log.Printf("Original model: %s", originalModel)
173177

174-
// Record the initial request to this model
178+
// Record the initial request to this model (count all requests)
175179
metrics.RecordModelRequest(originalModel)
180+
// Also set the model on context early so error metrics can label it
181+
if ctx.RequestModel == "" {
182+
ctx.RequestModel = originalModel
183+
}
176184

177185
// Get content from messages
178186
userContent, nonUserMessages := extractUserAndNonUserContent(openAIRequest)
@@ -202,6 +210,7 @@ func (r *OpenAIRouter) performSecurityChecks(ctx *RequestContext, userContent st
202210
if err != nil {
203211
log.Printf("Error performing jailbreak analysis: %v", err)
204212
// Continue processing despite jailbreak analysis error
213+
metrics.RecordRequestError(ctx.RequestModel, "classification_failed")
205214
} else if hasJailbreak {
206215
// Find the first jailbreak detection for response
207216
var jailbreakType string
@@ -224,6 +233,8 @@ func (r *OpenAIRouter) performSecurityChecks(ctx *RequestContext, userContent st
224233
"confidence": confidence,
225234
"request_id": ctx.RequestID,
226235
})
236+
// Count this as a blocked request
237+
metrics.RecordRequestError(ctx.RequestModel, "jailbreak_block")
227238
jailbreakResponse := http.CreateJailbreakViolationResponse(jailbreakType, confidence)
228239
return jailbreakResponse, true
229240
} else {
@@ -347,6 +358,7 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
347358
"model": matchedModel,
348359
"denied_pii": defaultDeniedPII,
349360
})
361+
metrics.RecordRequestError(matchedModel, "pii_policy_denied")
350362
piiResponse := http.CreatePIIViolationResponse(matchedModel, defaultDeniedPII)
351363
return piiResponse, nil
352364
}
@@ -359,6 +371,7 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
359371
"model": matchedModel,
360372
"denied_pii": deniedPII,
361373
})
374+
metrics.RecordRequestError(matchedModel, "pii_policy_denied")
362375
piiResponse := http.CreatePIIViolationResponse(matchedModel, deniedPII)
363376
return piiResponse, nil
364377
}
@@ -397,12 +410,14 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
397410
modifiedBody, err := serializeOpenAIRequest(openAIRequest)
398411
if err != nil {
399412
log.Printf("Error serializing modified request: %v", err)
413+
metrics.RecordRequestError(actualModel, "serialization_error")
400414
return nil, status.Errorf(codes.Internal, "error serializing modified request: %v", err)
401415
}
402416

403417
modifiedBody, err = r.setReasoningModeToRequestBody(modifiedBody, useReasoning, categoryName)
404418
if err != nil {
405419
log.Printf("Error setting reasoning mode %v to request: %v", useReasoning, err)
420+
metrics.RecordRequestError(actualModel, "serialization_error")
406421
return nil, status.Errorf(codes.Internal, "error setting reasoning mode: %v", err)
407422
}
408423

@@ -489,6 +504,7 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
489504
"model": originalModel,
490505
"denied_pii": deniedPII,
491506
})
507+
metrics.RecordRequestError(originalModel, "pii_policy_denied")
492508
piiResponse := http.CreatePIIViolationResponse(originalModel, deniedPII)
493509
return piiResponse, nil
494510
}
@@ -595,6 +611,7 @@ func (r *OpenAIRouter) handleToolSelection(openAIRequest *openai.ChatCompletionN
595611
openAIRequest.Tools = nil
596612
return r.updateRequestWithTools(openAIRequest, response, ctx)
597613
}
614+
metrics.RecordRequestError(getModelFromCtx(ctx), "classification_failed")
598615
return err
599616
}
600617

@@ -613,6 +630,7 @@ func (r *OpenAIRouter) handleToolSelection(openAIRequest *openai.ChatCompletionN
613630
// Convert the tool to OpenAI SDK format
614631
toolBytes, err := json.Marshal(tool)
615632
if err != nil {
633+
metrics.RecordRequestError(getModelFromCtx(ctx), "serialization_error")
616634
return err
617635
}
618636
var sdkTool openai.ChatCompletionToolParam

src/semantic-router/pkg/extproc/response_handler.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package extproc
33
import (
44
"encoding/json"
55
"log"
6+
"strconv"
67
"time"
78

9+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
810
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
911

1012
"github.com/openai/openai-go"
@@ -13,7 +15,18 @@ import (
1315
)
1416

1517
// handleResponseHeaders processes the response headers
16-
func (r *OpenAIRouter) handleResponseHeaders(_ *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
18+
func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
19+
// Detect upstream HTTP status and record non-2xx as errors
20+
if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil {
21+
if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 {
22+
if statusCode >= 500 {
23+
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx")
24+
} else if statusCode >= 400 {
25+
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_4xx")
26+
}
27+
}
28+
}
29+
1730
// Best-effort TTFT measurement: record on first response headers if we have a start time and model
1831
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
1932
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
@@ -38,6 +51,35 @@ func (r *OpenAIRouter) handleResponseHeaders(_ *ext_proc.ProcessingRequest_Respo
3851
return response, nil
3952
}
4053

54+
// getStatusFromHeaders extracts :status pseudo-header value as integer
55+
func getStatusFromHeaders(headerMap *core.HeaderMap) int {
56+
if headerMap == nil {
57+
return 0
58+
}
59+
for _, hv := range headerMap.Headers {
60+
if hv.Key == ":status" {
61+
if hv.Value != "" {
62+
if code, err := strconv.Atoi(hv.Value); err == nil {
63+
return code
64+
}
65+
}
66+
if len(hv.RawValue) > 0 {
67+
if code, err := strconv.Atoi(string(hv.RawValue)); err == nil {
68+
return code
69+
}
70+
}
71+
}
72+
}
73+
return 0
74+
}
75+
76+
func getModelFromCtx(ctx *RequestContext) string {
77+
if ctx == nil || ctx.RequestModel == "" {
78+
return "unknown"
79+
}
80+
return ctx.RequestModel
81+
}
82+
4183
// handleResponseBody processes the response body
4284
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
4385
completionLatency := time.Since(ctx.StartTime)
@@ -49,6 +91,7 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
4991
var parsed openai.ChatCompletion
5092
if err := json.Unmarshal(responseBody, &parsed); err != nil {
5193
log.Printf("Error parsing tokens from response: %v", err)
94+
metrics.RecordRequestError(ctx.RequestModel, "parse_error")
5295
}
5396
promptTokens := int(parsed.Usage.PromptTokens)
5497
completionTokens := int(parsed.Usage.CompletionTokens)

src/semantic-router/pkg/metrics/metrics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ var (
102102
[]string{"model"},
103103
)
104104

105+
// RequestErrorsTotal tracks request errors categorized by reason
106+
RequestErrorsTotal = promauto.NewCounterVec(
107+
prometheus.CounterOpts{
108+
Name: "llm_request_errors_total",
109+
Help: "The total number of request errors categorized by reason (e.g., timeout, upstream_5xx, pii_policy_denied, jailbreak_block, parse_error, serialization_error, cancellation)",
110+
},
111+
[]string{"model", "reason"},
112+
)
113+
105114
// ModelCost tracks the total cost attributed to each model by currency
106115
ModelCost = promauto.NewCounterVec(
107116
prometheus.CounterOpts{
@@ -353,9 +362,32 @@ var (
353362

354363
// RecordModelRequest increments the counter for requests to a specific model
355364
func RecordModelRequest(model string) {
365+
if model == "" {
366+
model = "unknown"
367+
}
356368
ModelRequests.WithLabelValues(model).Inc()
357369
}
358370

371+
// RecordRequestError increments request error counters labeled by model and normalized reason
372+
func RecordRequestError(model, reason string) {
373+
if model == "" {
374+
model = "unknown"
375+
}
376+
if reason == "" {
377+
reason = "unknown"
378+
}
379+
// Normalize a few common variants to canonical reasons
380+
switch reason {
381+
case "deadline_exceeded":
382+
reason = "timeout"
383+
case "upstream_500", "upstream_502", "upstream_503", "upstream_504":
384+
reason = "upstream_5xx"
385+
case "upstream_400", "upstream_401", "upstream_403", "upstream_404", "upstream_429":
386+
reason = "upstream_4xx"
387+
}
388+
RequestErrorsTotal.WithLabelValues(model, reason).Inc()
389+
}
390+
359391
// RecordModelRouting records that a request was routed from one model to another
360392
func RecordModelRouting(sourceModel, targetModel string) {
361393
if sourceModel != targetModel {

src/semantic-router/pkg/utils/http/response.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import (
66
"log"
77
"time"
88

9+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/metrics"
10+
911
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
1012
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1113
typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
12-
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/metrics"
1314
)
1415

1516
// CreatePIIViolationResponse creates an HTTP response for PII policy violations

0 commit comments

Comments
 (0)