Skip to content

Commit 29f243f

Browse files
authored
fix(metrics): correctly instantiate per-request scope metrics (#1382)
**Description** Previously, the metrics struct instance, for example, ChatCompletionMetrics interface implementation was shared across all requests, and only instantiated once at the extproc startup time. However, that implementation has request-scope fields such as model name, request start, etc. In other words, there was a serious race condition in them. This fixes it by instantiating these metrics interface implementations per-request. --------- Signed-off-by: Takeshi Yoneda <[email protected]>
1 parent df1795a commit 29f243f

18 files changed

+122
-97
lines changed

cmd/extproc/mainlib/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,10 @@ func Main(ctx context.Context, args []string, stderr io.Writer) (err error) {
230230
if err != nil {
231231
return fmt.Errorf("failed to create metrics: %w", err)
232232
}
233-
chatCompletionMetrics := metrics.NewChatCompletion(meter, metricsRequestHeaderAttributes)
234-
messagesMetrics := metrics.NewMessages(meter, metricsRequestHeaderAttributes)
235-
completionMetrics := metrics.NewCompletion(meter, metricsRequestHeaderAttributes)
236-
embeddingsMetrics := metrics.NewEmbeddings(meter, metricsRequestHeaderAttributes)
233+
chatCompletionMetrics := metrics.NewChatCompletionFactory(meter, metricsRequestHeaderAttributes)
234+
messagesMetrics := metrics.NewMessagesFactory(meter, metricsRequestHeaderAttributes)
235+
completionMetrics := metrics.NewCompletionFactory(meter, metricsRequestHeaderAttributes)
236+
embeddingsMetrics := metrics.NewEmbeddingsFactory(meter, metricsRequestHeaderAttributes)
237237
mcpMetrics := metrics.NewMCP(meter, metricsRequestHeaderAttributes)
238238

239239
tracing, err := tracing.NewTracingFromEnv(ctx, os.Stdout, spanRequestHeaderAttributes)

internal/extproc/chatcompletion_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
)
3232

3333
// ChatCompletionProcessorFactory returns a factory method to instantiate the chat completion processor.
34-
func ChatCompletionProcessorFactory(ccm metrics.ChatCompletionMetrics) ProcessorFactory {
34+
func ChatCompletionProcessorFactory(f metrics.ChatCompletionMetricsFactory) ProcessorFactory {
3535
return func(config *processorConfig, requestHeaders map[string]string, logger *slog.Logger, tracing tracing.Tracing, isUpstreamFilter bool) (Processor, error) {
3636
logger = logger.With("processor", "chat-completion", "isUpstreamFilter", fmt.Sprintf("%v", isUpstreamFilter))
3737
if !isUpstreamFilter {
@@ -46,7 +46,7 @@ func ChatCompletionProcessorFactory(ccm metrics.ChatCompletionMetrics) Processor
4646
config: config,
4747
requestHeaders: requestHeaders,
4848
logger: logger,
49-
metrics: ccm,
49+
metrics: f(),
5050
}, nil
5151
}
5252
}

internal/extproc/chatcompletion_processor_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/envoyproxy/ai-gateway/internal/filterapi"
2626
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2727
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
28+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2829
"github.com/envoyproxy/ai-gateway/internal/testing/testotel"
2930
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
3031
)
@@ -39,7 +40,9 @@ func TestChatCompletion_Schema(t *testing.T) {
3940
})
4041
t.Run("supported openai / on upstream", func(t *testing.T) {
4142
cfg := &processorConfig{}
42-
routeFilter, err := ChatCompletionProcessorFactory(nil)(cfg, nil, slog.Default(), tracing.NoopTracing{}, true)
43+
routeFilter, err := ChatCompletionProcessorFactory(func() metrics.ChatCompletionMetrics {
44+
return &mockChatCompletionMetrics{}
45+
})(cfg, nil, slog.Default(), tracing.NoopTracing{}, true)
4346
require.NoError(t, err)
4447
require.NotNil(t, routeFilter)
4548
require.IsType(t, &chatCompletionProcessorUpstreamFilter{}, routeFilter)

internal/extproc/completions_processor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
)
3030

3131
// CompletionsProcessorFactory returns a factory method to instantiate the completions processor.
32-
func CompletionsProcessorFactory(cm metrics.CompletionMetrics) ProcessorFactory {
32+
func CompletionsProcessorFactory(f metrics.CompletionMetricsFactory) ProcessorFactory {
3333
return func(config *processorConfig, requestHeaders map[string]string, logger *slog.Logger, tracing tracing.Tracing, isUpstreamFilter bool) (Processor, error) {
3434
logger = logger.With("processor", "completions", "isUpstreamFilter", fmt.Sprintf("%v", isUpstreamFilter))
3535
if !isUpstreamFilter {
@@ -38,14 +38,14 @@ func CompletionsProcessorFactory(cm metrics.CompletionMetrics) ProcessorFactory
3838
tracer: tracing.CompletionTracer(),
3939
requestHeaders: requestHeaders,
4040
logger: logger,
41-
metrics: cm,
41+
metrics: f(),
4242
}, nil
4343
}
4444
return &completionsProcessorUpstreamFilter{
4545
config: config,
4646
requestHeaders: requestHeaders,
4747
logger: logger,
48-
metrics: cm,
48+
metrics: f(),
4949
}, nil
5050
}
5151
}

internal/extproc/completions_processor_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/envoyproxy/ai-gateway/internal/filterapi"
2525
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2626
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
27+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2728
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
2829
)
2930

@@ -48,7 +49,9 @@ func TestCompletions_Schema(t *testing.T) {
4849
for _, tt := range tests {
4950
t.Run(tt.name, func(t *testing.T) {
5051
cfg := &processorConfig{}
51-
filter, err := CompletionsProcessorFactory(nil)(cfg, nil, slog.Default(), tracing.NoopTracing{}, tt.onUpstream)
52+
filter, err := CompletionsProcessorFactory(func() metrics.CompletionMetrics {
53+
return &mockCompletionMetrics{}
54+
})(cfg, nil, slog.Default(), tracing.NoopTracing{}, tt.onUpstream)
5255
require.NoError(t, err)
5356
require.NotNil(t, filter)
5457
require.IsType(t, tt.expectedType, filter)

internal/extproc/embeddings_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
)
2929

3030
// EmbeddingsProcessorFactory returns a factory method to instantiate the embeddings processor.
31-
func EmbeddingsProcessorFactory(em metrics.EmbeddingsMetrics) ProcessorFactory {
31+
func EmbeddingsProcessorFactory(f metrics.EmbeddingsMetricsFactory) ProcessorFactory {
3232
return func(config *processorConfig, requestHeaders map[string]string, logger *slog.Logger, tracing tracing.Tracing, isUpstreamFilter bool) (Processor, error) {
3333
logger = logger.With("processor", "embeddings", "isUpstreamFilter", fmt.Sprintf("%v", isUpstreamFilter))
3434
if !isUpstreamFilter {
@@ -43,7 +43,7 @@ func EmbeddingsProcessorFactory(em metrics.EmbeddingsMetrics) ProcessorFactory {
4343
config: config,
4444
requestHeaders: requestHeaders,
4545
logger: logger,
46-
metrics: em,
46+
metrics: f(),
4747
}, nil
4848
}
4949
}

internal/extproc/embeddings_processor_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/envoyproxy/ai-gateway/internal/filterapi"
2424
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2525
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
26+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2627
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
2728
)
2829

@@ -36,7 +37,9 @@ func TestEmbeddings_Schema(t *testing.T) {
3637
})
3738
t.Run("supported openai / on upstream", func(t *testing.T) {
3839
cfg := &processorConfig{}
39-
routeFilter, err := EmbeddingsProcessorFactory(nil)(cfg, nil, slog.Default(), tracing.NoopTracing{}, true)
40+
routeFilter, err := EmbeddingsProcessorFactory(func() metrics.EmbeddingsMetrics {
41+
return &mockEmbeddingsMetrics{}
42+
})(cfg, nil, slog.Default(), tracing.NoopTracing{}, true)
4043
require.NoError(t, err)
4144
require.NotNil(t, routeFilter)
4245
require.IsType(t, &embeddingsProcessorUpstreamFilter{}, routeFilter)

internal/extproc/messages_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
//
3232
// Requests: Only accepts Anthropic format requests.
3333
// Responses: Returns Anthropic format responses.
34-
func MessagesProcessorFactory(ccm metrics.MessagesMetrics) ProcessorFactory {
34+
func MessagesProcessorFactory(f metrics.MessagesMetricsFactory) ProcessorFactory {
3535
return func(config *processorConfig, requestHeaders map[string]string, logger *slog.Logger, _ tracing.Tracing, isUpstreamFilter bool) (Processor, error) {
3636
logger = logger.With("processor", "anthropic-messages", "isUpstreamFilter", fmt.Sprintf("%v", isUpstreamFilter))
3737
if !isUpstreamFilter {
@@ -45,7 +45,7 @@ func MessagesProcessorFactory(ccm metrics.MessagesMetrics) ProcessorFactory {
4545
config: config,
4646
requestHeaders: requestHeaders,
4747
logger: logger,
48-
metrics: ccm,
48+
metrics: f(),
4949
}, nil
5050
}
5151
}

internal/extproc/messages_processor_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import (
2828
)
2929

3030
func TestMessagesProcessorFactory(t *testing.T) {
31-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
32-
factory := MessagesProcessorFactory(chatMetrics)
31+
m := metrics.NewMessagesFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})
32+
factory := MessagesProcessorFactory(m)
3333
require.NotNil(t, factory, "MessagesProcessorFactory should return a non-nil factory")
3434

3535
// Test creating a router filter.
@@ -401,7 +401,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithMocks(t *test
401401
}
402402

403403
// Create mock metrics.
404-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
404+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
405405

406406
// Create processor.
407407
processor := &messagesProcessorUpstreamFilter{
@@ -436,7 +436,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessResponseHeaders_WithMocks(t *tes
436436
retErr: nil,
437437
}
438438

439-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
439+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
440440
processor := &messagesProcessorUpstreamFilter{
441441
config: &processorConfig{},
442442
requestHeaders: make(map[string]string),
@@ -461,7 +461,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessResponseBody_WithMocks(t *testin
461461
retErr: nil,
462462
}
463463

464-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
464+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
465465
processor := &messagesProcessorUpstreamFilter{
466466
config: &processorConfig{},
467467
requestHeaders: make(map[string]string),
@@ -532,7 +532,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessResponseBody_CompletionOnlyAtEnd
532532
}
533533

534534
func TestMessagesProcessorUpstreamFilter_MergeWithTokenLatencyMetadata(t *testing.T) {
535-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
535+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
536536
processor := &messagesProcessorUpstreamFilter{
537537
config: &processorConfig{},
538538
logger: slog.Default(),
@@ -561,7 +561,7 @@ func TestMessagesProcessorUpstreamFilter_MergeWithTokenLatencyMetadata(t *testin
561561

562562
func TestMessagesProcessorUpstreamFilter_SetBackend(t *testing.T) {
563563
headers := map[string]string{":path": "/anthropic/v1/messages"}
564-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
564+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
565565
processor := &messagesProcessorUpstreamFilter{
566566
config: &processorConfig{
567567
requestCosts: []processorConfigRequestCost{
@@ -587,7 +587,7 @@ func TestMessagesProcessorUpstreamFilter_SetBackend(t *testing.T) {
587587

588588
func Test_messagesProcessorUpstreamFilter_SetBackend_Success(t *testing.T) {
589589
headers := map[string]string{":path": "/anthropic/v1/messages", internalapi.ModelNameHeaderKeyDefault: "claude"}
590-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
590+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
591591
p := &messagesProcessorUpstreamFilter{
592592
config: &processorConfig{},
593593
requestHeaders: headers,
@@ -724,7 +724,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutatio
724724
}
725725

726726
// Create mock metrics.
727-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
727+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
728728

729729
// Create processor.
730730
processor := &messagesProcessorUpstreamFilter{
@@ -801,7 +801,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutatio
801801
}
802802

803803
// Create mock metrics.
804-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
804+
chatMetrics := metrics.NewMessagesFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
805805

806806
// Create processor.
807807
processor := &messagesProcessorUpstreamFilter{
@@ -888,7 +888,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutatio
888888
}
889889

890890
// Create mock metrics.
891-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
891+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
892892

893893
// Create processor.
894894
processor := &messagesProcessorUpstreamFilter{
@@ -924,7 +924,7 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutatio
924924
func TestMessagesProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *testing.T) {
925925
t.Run("header mutator created correctly", func(t *testing.T) {
926926
headers := map[string]string{":path": "/anthropic/v1/messages"}
927-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
927+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
928928
p := &messagesProcessorUpstreamFilter{
929929
config: &processorConfig{},
930930
requestHeaders: headers,
@@ -980,7 +980,7 @@ func TestMessagesProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *testi
980980

981981
t.Run("header mutator with original headers", func(t *testing.T) {
982982
headers := map[string]string{":path": "/anthropic/v1/messages"}
983-
chatMetrics := metrics.NewChatCompletion(noop.NewMeterProvider().Meter("test"), map[string]string{})
983+
chatMetrics := metrics.NewChatCompletionFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()
984984
p := &messagesProcessorUpstreamFilter{
985985
config: &processorConfig{},
986986
requestHeaders: headers,

internal/metrics/base_metrics.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,23 @@ import (
1616
"github.com/envoyproxy/ai-gateway/internal/internalapi"
1717
)
1818

19+
type baseMetricsFactory struct {
20+
metrics *genAI
21+
requestHeaderAttributeMapping map[string]string // maps HTTP headers to metric attribute names.
22+
}
23+
24+
func (f *baseMetricsFactory) newBaseMetrics(operation string) baseMetrics {
25+
return baseMetrics{
26+
metrics: f.metrics,
27+
operation: operation,
28+
originalModel: "unknown",
29+
requestModel: "unknown",
30+
responseModel: "unknown",
31+
backend: "unknown",
32+
requestHeaderAttributeMapping: f.requestHeaderAttributeMapping,
33+
}
34+
}
35+
1936
// baseMetrics provides shared functionality for AI Gateway metrics implementations.
2037
type baseMetrics struct {
2138
metrics *genAI
@@ -31,19 +48,6 @@ type baseMetrics struct {
3148
requestHeaderAttributeMapping map[string]string // maps HTTP headers to metric attribute names.
3249
}
3350

34-
// newBaseMetrics creates a new baseMetrics instance with the specified operation.
35-
func newBaseMetrics(meter metric.Meter, operation string, requestHeaderAttributeMapping map[string]string) baseMetrics {
36-
return baseMetrics{
37-
metrics: newGenAI(meter),
38-
operation: operation,
39-
originalModel: "unknown",
40-
requestModel: "unknown",
41-
responseModel: "unknown",
42-
backend: "unknown",
43-
requestHeaderAttributeMapping: requestHeaderAttributeMapping,
44-
}
45-
}
46-
4751
// StartRequest initializes timing for a new request.
4852
func (b *baseMetrics) StartRequest(_ map[string]string) {
4953
b.requestStart = time.Now()

0 commit comments

Comments
 (0)