Skip to content

Commit f274dac

Browse files
authored
refactor: move Token Usage definition to metrics package from translator (#1583)
**Description** This moves translator.LLMTokenUsage struct into metrics package from translator package to make sure that all call site of metrics interface uses the same signature by simply passing the cost info from translator. This will help unify all the per-endpoint implementation detail. **Related Issues/PRs (if applicable)** Preparation for #90 --------- Signed-off-by: Takeshi Yoneda <[email protected]>
1 parent 9866258 commit f274dac

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+618
-556
lines changed

internal/extproc/chatcompletion_processor.go

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ type chatCompletionProcessorUpstreamFilter struct {
189189
// onRetry is true if this is a retry request at the upstream filter.
190190
onRetry bool
191191
// cost is the cost of the request that is accumulated during the processing of the response.
192-
costs translator.LLMTokenUsage
192+
costs metrics.TokenUsage
193193
// metrics tracking.
194194
metrics metrics.Metrics
195195
// stream is set to true if the request is a streaming request.
@@ -413,17 +413,8 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
413413
},
414414
}
415415

416-
// Update accumulated token usage.
417-
// TODO: we need to investigate if we need to accumulate the token usage for streaming responses.
418-
if c.stream {
419-
// For streaming, translators report cumulative usage; keep the latest totals.
420-
if tokenUsage != (translator.LLMTokenUsage{}) {
421-
c.costs = tokenUsage
422-
}
423-
} else {
424-
// Non-streaming: single-shot totals.
425-
c.costs = tokenUsage
426-
}
416+
// Translator reports the latest cumulative token usage which we use to override existing costs.
417+
c.costs.Override(tokenUsage)
427418

428419
// Set the response model for metrics
429420
c.metrics.SetResponseModel(responseModel)
@@ -432,16 +423,17 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
432423
if c.stream {
433424
// Token latency is only recorded for streaming responses, otherwise it doesn't make sense since
434425
// these metrics are defined as a difference between the two output events.
435-
c.metrics.RecordTokenLatency(ctx, tokenUsage.OutputTokens, body.EndOfStream, c.requestHeaders)
426+
out, _ := c.costs.OutputTokens()
427+
c.metrics.RecordTokenLatency(ctx, out, body.EndOfStream, c.requestHeaders)
436428
// Emit usage once at end-of-stream using final totals.
437429
if body.EndOfStream {
438-
c.metrics.RecordTokenUsage(ctx, metrics.OptUint32(c.costs.InputTokens), metrics.OptUint32(c.costs.CachedInputTokens), metrics.OptUint32(c.costs.OutputTokens), c.requestHeaders)
430+
c.metrics.RecordTokenUsage(ctx, c.costs, c.requestHeaders)
439431
}
440432
// TODO: if c.forcedStreamOptionIncludeUsage is true, we should not include usage in the response body since
441433
// that's what the clients would expect. However, it is a little bit tricky as we simply just reading the streaming
442434
// chunk by chunk, we only want to drop a specific line before the last chunk.
443435
} else {
444-
c.metrics.RecordTokenUsage(ctx, metrics.OptUint32(tokenUsage.InputTokens), metrics.OptUint32(tokenUsage.CachedInputTokens), metrics.OptUint32(tokenUsage.OutputTokens), c.requestHeaders)
436+
c.metrics.RecordTokenUsage(ctx, c.costs, c.requestHeaders)
445437
}
446438

447439
if body.EndOfStream && len(c.config.RequestCosts) > 0 {
@@ -554,29 +546,33 @@ func buildContentLengthDynamicMetadataOnRequest(contentLength int) *structpb.Str
554546
// This function is called by the upstream filter only at the end of the stream (body.EndOfStream=true)
555547
// when the response is successfully completed. It is not called for failed requests or partial responses.
556548
// The metadata includes token usage costs and model information for downstream processing.
557-
func buildDynamicMetadata(config *filterapi.RuntimeConfig, costs *translator.LLMTokenUsage, requestHeaders map[string]string, backendName string) (*structpb.Struct, error) {
549+
func buildDynamicMetadata(config *filterapi.RuntimeConfig, costs *metrics.TokenUsage, requestHeaders map[string]string, backendName string) (*structpb.Struct, error) {
558550
metadata := make(map[string]*structpb.Value, len(config.RequestCosts)+2)
559551
for i := range config.RequestCosts {
560552
rc := &config.RequestCosts[i]
561553
var cost uint32
562554
switch rc.Type {
563555
case filterapi.LLMRequestCostTypeInputToken:
564-
cost = costs.InputTokens
556+
cost, _ = costs.InputTokens()
565557
case filterapi.LLMRequestCostTypeCachedInputToken:
566-
cost = costs.CachedInputTokens
558+
cost, _ = costs.CachedInputTokens()
567559
case filterapi.LLMRequestCostTypeOutputToken:
568-
cost = costs.OutputTokens
560+
cost, _ = costs.OutputTokens()
569561
case filterapi.LLMRequestCostTypeTotalToken:
570-
cost = costs.TotalTokens
562+
cost, _ = costs.TotalTokens()
571563
case filterapi.LLMRequestCostTypeCEL:
564+
in, _ := costs.InputTokens()
565+
cachedIn, _ := costs.CachedInputTokens()
566+
out, _ := costs.OutputTokens()
567+
total, _ := costs.TotalTokens()
572568
costU64, err := llmcostcel.EvaluateProgram(
573569
rc.CELProg,
574570
requestHeaders[internalapi.ModelNameHeaderKeyDefault],
575571
backendName,
576-
costs.InputTokens,
577-
costs.CachedInputTokens,
578-
costs.OutputTokens,
579-
costs.TotalTokens,
572+
in,
573+
cachedIn,
574+
out,
575+
total,
580576
)
581577
if err != nil {
582578
return nil, fmt.Errorf("failed to evaluate CEL expression: %w", err)

internal/extproc/chatcompletion_processor_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import (
2525
"github.com/envoyproxy/ai-gateway/internal/headermutator"
2626
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2727
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
28+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2829
"github.com/envoyproxy/ai-gateway/internal/testing/testotel"
2930
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
30-
"github.com/envoyproxy/ai-gateway/internal/translator"
3131
)
3232

3333
func TestChatCompletion_Schema(t *testing.T) {
@@ -253,8 +253,10 @@ func Test_chatCompletionProcessorUpstreamFilter_ProcessResponseBody(t *testing.T
253253
mt := &mockTranslator{
254254
t: t, expResponseBody: inBody,
255255
retHeaderMutation: []internalapi.Header{{"foo", "bar"}},
256-
retUsedToken: translator.LLMTokenUsage{OutputTokens: 123, InputTokens: 1, CachedInputTokens: 1},
257256
}
257+
mt.retUsedToken.SetOutputTokens(123)
258+
mt.retUsedToken.SetInputTokens(1)
259+
mt.retUsedToken.SetCachedInputTokens(1)
258260

259261
celProgInt, err := llmcostcel.NewProgram("54321")
260262
require.NoError(t, err)
@@ -351,7 +353,7 @@ func Test_chatCompletionProcessorUpstreamFilter_ProcessResponseBody(t *testing.T
351353
// First chunk (not end of stream) should not complete the request.
352354
chunk := &extprocv3.HttpBody{Body: []byte("chunk-1"), EndOfStream: false}
353355
mt.expResponseBody = chunk
354-
mt.retUsedToken = translator.LLMTokenUsage{} // no usage yet in early chunks.
356+
mt.retUsedToken = metrics.TokenUsage{} // no usage yet in early chunks.
355357
_, err := p.ProcessResponseBody(t.Context(), chunk)
356358
require.NoError(t, err)
357359
mm.RequireRequestNotCompleted(t)
@@ -361,7 +363,10 @@ func Test_chatCompletionProcessorUpstreamFilter_ProcessResponseBody(t *testing.T
361363
// Final chunk should mark success and record usage once.
362364
final := &extprocv3.HttpBody{Body: []byte("chunk-final"), EndOfStream: true}
363365
mt.expResponseBody = final
364-
mt.retUsedToken = translator.LLMTokenUsage{InputTokens: 5, CachedInputTokens: 3, OutputTokens: 138, TotalTokens: 143}
366+
mt.retUsedToken.SetInputTokens(5)
367+
mt.retUsedToken.SetCachedInputTokens(3)
368+
mt.retUsedToken.SetOutputTokens(138)
369+
mt.retUsedToken.SetTotalTokens(143)
365370
_, err = p.ProcessResponseBody(t.Context(), final)
366371
require.NoError(t, err)
367372
mm.RequireRequestSuccess(t)
@@ -811,15 +816,13 @@ func Test_ProcessResponseBody_UsesActualResponseModel(t *testing.T) {
811816
// Create a mock translator that returns token usage with response model
812817
// Simulating OpenAI's automatic routing where gpt-5-nano routes to gpt-5-nano-2025-08-07
813818
mt := &mockTranslator{
814-
t: t,
815-
expRequestBody: &body,
816-
expHeaders: map[string]string{":status": "200"},
817-
retUsedToken: translator.LLMTokenUsage{
818-
InputTokens: 10,
819-
OutputTokens: 20,
820-
},
819+
t: t,
820+
expRequestBody: &body,
821+
expHeaders: map[string]string{":status": "200"},
821822
retResponseModel: "gpt-5-nano-2025-08-07",
822823
}
824+
mt.retUsedToken.SetInputTokens(10)
825+
mt.retUsedToken.SetOutputTokens(20)
823826

824827
p := &chatCompletionProcessorUpstreamFilter{
825828
config: &filterapi.RuntimeConfig{},

internal/extproc/completions_processor.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ type completionsProcessorUpstreamFilter struct {
187187
// See the comment on the `forcedStreamOptionIncludeUsage` field in the router filter.
188188
forcedStreamOptionIncludeUsage bool
189189
// cost is the cost of the request that is accumulated during the processing of the response.
190-
costs translator.LLMTokenUsage
190+
costs metrics.TokenUsage
191191
// span is the tracing span for this request, inherited from the router filter.
192192
span tracing.CompletionSpan
193193
// metrics tracking.
@@ -395,23 +395,19 @@ func (c *completionsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Con
395395
},
396396
}
397397

398-
// Accumulate token usage for completions.
399-
c.costs.InputTokens += tokenUsage.InputTokens
400-
c.costs.OutputTokens += tokenUsage.OutputTokens
401-
c.costs.TotalTokens += tokenUsage.TotalTokens
398+
c.costs.Override(tokenUsage)
402399

403400
// Record metrics.
404401
if c.stream {
405402
// Token latency is only recorded for streaming responses
406-
c.metrics.RecordTokenLatency(ctx, tokenUsage.OutputTokens, body.EndOfStream, c.requestHeaders)
403+
out, _ := c.costs.OutputTokens()
404+
c.metrics.RecordTokenLatency(ctx, out, body.EndOfStream, c.requestHeaders)
407405
// Emit usage once at end-of-stream using final totals.
408406
if body.EndOfStream {
409-
c.metrics.RecordTokenUsage(ctx,
410-
metrics.OptUint32(c.costs.InputTokens), metrics.OptUint32None, metrics.OptUint32(c.costs.OutputTokens), c.requestHeaders)
407+
c.metrics.RecordTokenUsage(ctx, c.costs, c.requestHeaders)
411408
}
412409
} else {
413-
c.metrics.RecordTokenUsage(ctx,
414-
metrics.OptUint32(tokenUsage.InputTokens), metrics.OptUint32None, metrics.OptUint32(tokenUsage.OutputTokens), c.requestHeaders)
410+
c.metrics.RecordTokenUsage(ctx, c.costs, c.requestHeaders)
415411
}
416412

417413
if body.EndOfStream && len(c.config.RequestCosts) > 0 {

internal/extproc/completions_processor_test.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/envoyproxy/ai-gateway/internal/headermutator"
2525
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2626
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
27+
"github.com/envoyproxy/ai-gateway/internal/metrics"
2728
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
28-
"github.com/envoyproxy/ai-gateway/internal/translator"
2929
)
3030

3131
func TestCompletions_Schema(t *testing.T) {
@@ -189,11 +189,9 @@ func Test_completionsProcessorUpstreamFilter_ProcessResponseBody(t *testing.T) {
189189

190190
mt.resHeaderMutation = []internalapi.Header{{"test", "success"}}
191191
mt.resBodyMutation = []byte("response body")
192-
mt.resTokenUsage = translator.LLMTokenUsage{
193-
InputTokens: 10,
194-
OutputTokens: 20,
195-
TotalTokens: 30,
196-
}
192+
mt.resTokenUsage.SetInputTokens(10)
193+
mt.resTokenUsage.SetOutputTokens(20)
194+
mt.resTokenUsage.SetTotalTokens(30)
197195
mt.resModel = "gpt-4"
198196

199197
resp, err := p.ProcessResponseBody(t.Context(), &extprocv3.HttpBody{Body: []byte("test"), EndOfStream: true})
@@ -209,10 +207,15 @@ func Test_completionsProcessorUpstreamFilter_ProcessResponseBody(t *testing.T) {
209207
require.Equal(t, "success", string(re.ResponseBody.GetResponse().GetHeaderMutation().SetHeaders[0].Header.RawValue))
210208
require.Equal(t, "response body", string(re.ResponseBody.GetResponse().GetBodyMutation().GetBody()))
211209

212-
// Check that costs were accumulated
213-
require.Equal(t, uint32(10), p.costs.InputTokens)
214-
require.Equal(t, uint32(20), p.costs.OutputTokens)
215-
require.Equal(t, uint32(30), p.costs.TotalTokens)
210+
in, ok := p.costs.InputTokens()
211+
require.True(t, ok)
212+
require.Equal(t, uint32(10), in)
213+
out, ok := p.costs.OutputTokens()
214+
require.True(t, ok)
215+
require.Equal(t, uint32(20), out)
216+
total, ok := p.costs.TotalTokens()
217+
require.True(t, ok)
218+
require.Equal(t, uint32(30), total)
216219
})
217220
}
218221

@@ -394,7 +397,7 @@ type mockCompletionTranslator struct {
394397
resBodyMutation []byte
395398
resErrorHeaderMutation []internalapi.Header
396399
resErrorBodyMutation []byte
397-
resTokenUsage translator.LLMTokenUsage
400+
resTokenUsage metrics.TokenUsage
398401
resModel internalapi.ResponseModel
399402
err error
400403
}
@@ -410,7 +413,7 @@ func (m *mockCompletionTranslator) ResponseHeaders(headers map[string]string) ([
410413
return m.resHeaderMutation, m.err
411414
}
412415

413-
func (m *mockCompletionTranslator) ResponseBody(map[string]string, io.Reader, bool, tracing.CompletionSpan) ([]internalapi.Header, []byte, translator.LLMTokenUsage, internalapi.ResponseModel, error) {
416+
func (m *mockCompletionTranslator) ResponseBody(map[string]string, io.Reader, bool, tracing.CompletionSpan) ([]internalapi.Header, []byte, metrics.TokenUsage, internalapi.ResponseModel, error) {
414417
return m.resHeaderMutation, m.resBodyMutation, m.resTokenUsage, m.resModel, m.err
415418
}
416419

@@ -608,15 +611,17 @@ func Test_completionsProcessorUpstreamFilter_ProcessResponseBody_Streaming(t *te
608611
}
609612
// First chunk (not end of stream) should not complete the request.
610613
chunk := &extprocv3.HttpBody{Body: []byte("chunk-1"), EndOfStream: false}
611-
mt.resTokenUsage = translator.LLMTokenUsage{} // no usage yet in early chunks.
614+
mt.resTokenUsage = metrics.TokenUsage{} // no usage yet in early chunks.
612615
_, err := p.ProcessResponseBody(t.Context(), chunk)
613616
require.NoError(t, err)
614617
mm.RequireRequestNotCompleted(t)
615618
require.Zero(t, mm.streamingOutputTokens) // first chunk has 0 output tokens
616619

617620
// Final chunk should mark success and record usage once.
618621
final := &extprocv3.HttpBody{Body: []byte("chunk-final"), EndOfStream: true}
619-
mt.resTokenUsage = translator.LLMTokenUsage{InputTokens: 5, OutputTokens: 138, TotalTokens: 143}
622+
mt.resTokenUsage.SetInputTokens(5)
623+
mt.resTokenUsage.SetOutputTokens(138)
624+
mt.resTokenUsage.SetTotalTokens(143)
620625
_, err = p.ProcessResponseBody(t.Context(), final)
621626
require.NoError(t, err)
622627
mm.RequireRequestSuccess(t)
@@ -756,11 +761,9 @@ func Test_completionsProcessorUpstreamFilter_CELCostEvaluation(t *testing.T) {
756761
t: t,
757762
resBodyMutation: expBody,
758763
resHeaderMutation: []internalapi.Header{{"foo", "bar"}},
759-
resTokenUsage: translator.LLMTokenUsage{
760-
OutputTokens: 123,
761-
InputTokens: 1,
762-
},
763764
}
765+
mt.resTokenUsage.SetOutputTokens(123)
766+
mt.resTokenUsage.SetInputTokens(1)
764767

765768
celProgInt, err := llmcostcel.NewProgram("54321")
766769
require.NoError(t, err)
@@ -938,13 +941,12 @@ func Test_completionsProcessorUpstreamFilter_ModelTracking(t *testing.T) {
938941
// Create a mock translator that returns token usage with response model
939942
// Simulating OpenAI's automatic routing where gpt-3.5-turbo-instruct routes to gpt-3.5-turbo-instruct-0914
940943
mt := &mockCompletionTranslator{
941-
t: t,
942-
resTokenUsage: translator.LLMTokenUsage{
943-
InputTokens: 10,
944-
OutputTokens: 20,
945-
},
944+
t: t,
946945
resModel: "gpt-3.5-turbo-instruct-0914",
947946
}
947+
mt.resTokenUsage.SetOutputTokens(20)
948+
mt.resTokenUsage.SetInputTokens(10)
949+
948950
p := &completionsProcessorUpstreamFilter{
949951
config: &filterapi.RuntimeConfig{},
950952
requestHeaders: headers,
@@ -1056,9 +1058,10 @@ func Test_completionsProcessorUpstreamFilter_StreamingTokenLatencyTracking(t *te
10561058
interTokenLatencyMs: 250.0,
10571059
}
10581060
mt := &mockCompletionTranslator{
1059-
t: t,
1060-
resTokenUsage: translator.LLMTokenUsage{InputTokens: 5, OutputTokens: 20},
1061+
t: t,
10611062
}
1063+
mt.resTokenUsage.SetOutputTokens(20)
1064+
mt.resTokenUsage.SetInputTokens(5)
10621065

10631066
// Build config with token metadata
10641067
requestCosts := []filterapi.RuntimeRequestCost{

internal/extproc/embeddings_processor.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ type embeddingsProcessorUpstreamFilter struct {
161161
// onRetry is true if this is a retry request at the upstream filter.
162162
onRetry bool
163163
// cost is the cost of the request that is accumulated during the processing of the response.
164-
costs translator.LLMTokenUsage
164+
costs metrics.TokenUsage
165165
// metrics tracking.
166166
metrics metrics.Metrics
167167
// span is the tracing span for this request, inherited from the router filter.
@@ -362,14 +362,12 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
362362
},
363363
}
364364

365-
// Accumulate token usage for embeddings (only input and total tokens are relevant).
366-
e.costs.InputTokens += tokenUsage.InputTokens
367-
e.costs.TotalTokens += tokenUsage.TotalTokens
365+
e.costs.Override(tokenUsage)
368366

369367
e.metrics.SetResponseModel(responseModel)
370368

371369
// Update metrics with token usage.
372-
e.metrics.RecordTokenUsage(ctx, metrics.OptUint32(tokenUsage.InputTokens), metrics.OptUint32None, metrics.OptUint32None, e.requestHeaders)
370+
e.metrics.RecordTokenUsage(ctx, e.costs, e.requestHeaders)
373371

374372
if body.EndOfStream && len(e.config.RequestCosts) > 0 {
375373
resp.DynamicMetadata, err = buildDynamicMetadata(e.config, &e.costs, e.requestHeaders, e.backendName)

internal/extproc/embeddings_processor_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2525
"github.com/envoyproxy/ai-gateway/internal/llmcostcel"
2626
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
27-
"github.com/envoyproxy/ai-gateway/internal/translator"
2827
)
2928

3029
func TestEmbeddings_Schema(t *testing.T) {
@@ -150,8 +149,9 @@ func Test_embeddingsProcessorUpstreamFilter_ProcessResponseBody(t *testing.T) {
150149
mt := &mockEmbeddingTranslator{
151150
t: t, expResponseBody: inBody,
152151
retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut,
153-
retUsedToken: translator.LLMTokenUsage{InputTokens: 123, TotalTokens: 123},
154152
}
153+
mt.retUsedToken.SetTotalTokens(123)
154+
mt.retUsedToken.SetInputTokens(123)
155155

156156
celProgInt, err := llmcostcel.NewProgram("54321")
157157
require.NoError(t, err)
@@ -403,14 +403,12 @@ func TestEmbeddings_ProcessResponseBody_OverridesHeaderModelWithResponseModel(t
403403

404404
// Create a mock translator that returns token usage with response model
405405
mt := &mockEmbeddingTranslator{
406-
t: t,
407-
expRequestBody: &body,
408-
expHeaders: map[string]string{":status": "200"},
409-
retUsedToken: translator.LLMTokenUsage{
410-
InputTokens: 15,
411-
},
406+
t: t,
407+
expRequestBody: &body,
408+
expHeaders: map[string]string{":status": "200"},
412409
retResponseModel: "actual-embedding-model",
413410
}
411+
mt.retUsedToken.SetInputTokens(15)
414412

415413
p := &embeddingsProcessorUpstreamFilter{
416414
config: &filterapi.RuntimeConfig{},

0 commit comments

Comments
 (0)