Skip to content

Commit e011e5c

Browse files
authored
fix: input token overwritten by message delta (#1631)
**Description** according to Anthropic's API message_start events contain the input tokens (tokens consumed for the request) and message_delta events contain incremental output tokens (tokens generated in the response). The message_start event correctly sets tokenUsage with input tokens (e.g., input_tokens=15). However when a message_delta event is processed, it completely overwrites the tokenUsage variable. The message_delta event typically has input_tokens=0 (since it's about output deltas), which overwrites the correct input token count. --------- Signed-off-by: Dan Sun <[email protected]>
1 parent d33eec2 commit e011e5c

File tree

3 files changed

+116
-17
lines changed

3 files changed

+116
-17
lines changed

internal/translator/anthropic_anthropic.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type anthropicToAnthropicTranslator struct {
3636
stream bool
3737
buffered []byte
3838
streamingResponseModel internalapi.ResponseModel
39+
streamingTokenUsage metrics.TokenUsage
3940
}
4041

4142
// RequestBody implements [AnthropicMessagesTranslator.RequestBody].
@@ -83,11 +84,17 @@ func (a *anthropicToAnthropicTranslator) ResponseBody(_ map[string]string, body
8384
if err != nil {
8485
return nil, nil, tokenUsage, a.requestModel, fmt.Errorf("failed to read body: %w", err)
8586
}
87+
88+
// If this is a fresh start (no buffered data), reset the streaming token usage
89+
if len(a.buffered) == 0 {
90+
a.streamingTokenUsage = metrics.TokenUsage{}
91+
}
92+
8693
a.buffered = append(a.buffered, buf...)
87-
tokenUsage = a.extractUsageFromBufferEvent(span)
94+
a.extractUsageFromBufferEvent(span)
8895
// Use stored streaming response model, fallback to request model for non-compliant backends
8996
responseModel = cmp.Or(a.streamingResponseModel, a.requestModel)
90-
return
97+
return nil, nil, a.streamingTokenUsage, responseModel, nil
9198
}
9299

93100
// Parse the Anthropic response to extract token usage.
@@ -110,11 +117,13 @@ func (a *anthropicToAnthropicTranslator) ResponseBody(_ map[string]string, body
110117
}
111118

112119
// extractUsageFromBufferEvent extracts the token usage from the buffered event.
113-
// It scans complete lines and returns the latest usage found in this batch.
114-
func (a *anthropicToAnthropicTranslator) extractUsageFromBufferEvent(s tracing.MessageSpan) (tokenUsage metrics.TokenUsage) {
120+
// It scans complete lines and accumulates usage from all events in this batch.
121+
func (a *anthropicToAnthropicTranslator) extractUsageFromBufferEvent(s tracing.MessageSpan) {
115122
for {
116123
i := bytes.IndexByte(a.buffered, '\n')
117124
if i == -1 {
125+
// Recalculate total tokens before returning
126+
a.updateTotalTokens()
118127
return
119128
}
120129
line := a.buffered[:i]
@@ -133,32 +142,56 @@ func (a *anthropicToAnthropicTranslator) extractUsageFromBufferEvent(s tracing.M
133142
switch {
134143
case eventUnion.MessageStart != nil:
135144
message := eventUnion.MessageStart
136-
// Message only valid in message_start events.
145+
// Store the response model for future batches
137146
if message.Model != "" {
138-
// Store the response model for future batches
139147
a.streamingResponseModel = message.Model
140148
}
141-
// Extract usage from message_start event
149+
// Extract usage from message_start event - this sets the baseline input tokens
142150
if u := message.Usage; u != nil {
143-
tokenUsage = metrics.ExtractTokenUsageFromAnthropic(
151+
messageStartUsage := metrics.ExtractTokenUsageFromAnthropic(
144152
int64(u.InputTokens),
145153
int64(u.OutputTokens),
146154
int64(u.CacheReadInputTokens),
147155
int64(u.CacheCreationInputTokens),
148156
)
157+
// Override with message_start usage (contains input tokens and initial state)
158+
a.streamingTokenUsage.Override(messageStartUsage)
149159
}
150160
case eventUnion.MessageDelta != nil:
151161
u := eventUnion.MessageDelta.Usage
152-
tokenUsage = metrics.ExtractTokenUsageFromAnthropic(
153-
int64(u.InputTokens),
154-
int64(u.OutputTokens),
155-
int64(u.CacheReadInputTokens),
156-
int64(u.CacheCreationInputTokens),
157-
)
162+
// message_delta events provide final counts for specific token types
163+
// Update output tokens from message_delta (final count)
164+
if u.OutputTokens >= 0 {
165+
a.streamingTokenUsage.SetOutputTokens(uint32(u.OutputTokens)) //nolint:gosec
166+
}
158167
}
159168
}
160169
}
161170

171+
// updateTotalTokens recalculates and sets the total token count
172+
func (a *anthropicToAnthropicTranslator) updateTotalTokens() {
173+
inputTokens, inputSet := a.streamingTokenUsage.InputTokens()
174+
outputTokens, outputSet := a.streamingTokenUsage.OutputTokens()
175+
176+
// Initialize missing values to 0 if we have any token data
177+
if outputSet && !inputSet {
178+
a.streamingTokenUsage.SetInputTokens(0)
179+
inputTokens = 0
180+
inputSet = true
181+
}
182+
183+
// Set cached tokens to 0 if not set but we have other token data
184+
if outputSet {
185+
if _, cachedSet := a.streamingTokenUsage.CachedInputTokens(); !cachedSet {
186+
a.streamingTokenUsage.SetCachedInputTokens(0)
187+
}
188+
}
189+
190+
if inputSet && outputSet {
191+
a.streamingTokenUsage.SetTotalTokens(inputTokens + outputTokens)
192+
}
193+
}
194+
162195
// ResponseError implements [AnthropicMessagesTranslator] for Anthropic to AWS Bedrock Anthropic translation.
163196
func (a *anthropicToAnthropicTranslator) ResponseError(map[string]string, io.Reader) (
164197
newHeaders []internalapi.Header,

internal/translator/anthropic_anthropic_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func TestAnthropicToAnthropic_ResponseBody_streaming(t *testing.T) {
107107
// We split the response into two parts to simulate streaming where each part can end in the
108108
// middle of an event.
109109
const responseHead = `event: message_start
110-
data: {"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_01BfvfMsg2gBzwsk6PZRLtDg","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":9,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}} }
110+
data: {"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_01BfvfMsg2gBzwsk6PZRLtDg","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":9,"cache_creation_input_tokens":0,"cache_read_input_tokens":1,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":0,"service_tier":"standard"}} }
111111
112112
event: content_block_start
113113
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }
@@ -130,7 +130,7 @@ event: content_block_stop
130130
data: {"type":"content_block_stop","index":0 }
131131
132132
event: message_delta
133-
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":9,"cache_creation_input_tokens":0,"cache_read_input_tokens":1,"output_tokens":16} }
133+
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":16} }
134134
135135
event: message_stop
136136
data: {"type":"message_stop" }`
@@ -139,7 +139,7 @@ data: {"type":"message_stop" }`
139139
require.NoError(t, err)
140140
require.Nil(t, headerMutation)
141141
require.Nil(t, bodyMutation)
142-
expected := tokenUsageFrom(9, 0, 1, 10)
142+
expected := tokenUsageFrom(10, 1, 0, 10)
143143
require.Equal(t, expected, tokenUsage)
144144
require.Equal(t, "claude-sonnet-4-5-20250929", responseModel)
145145

internal/translator/anthropic_gcpanthropic_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package translator
88
import (
99
"bytes"
1010
"encoding/json"
11+
"strings"
1112
"testing"
1213

1314
"github.com/stretchr/testify/assert"
@@ -589,3 +590,68 @@ func tokenUsageFrom(in, cachedInput, out, total int32) metrics.TokenUsage {
589590
}
590591
return usage
591592
}
593+
594+
func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingFullScenario(t *testing.T) {
595+
// Test to reproduce and verify fix for the input_token=0 issue in Anthropic streaming
596+
// This test verifies that input_tokens from message_start are preserved when
597+
// message_delta doesn't provide input_tokens (real-world scenario)
598+
599+
translator := NewAnthropicToGCPAnthropicTranslator("v1", "")
600+
601+
// Simulate request body to set stream=true
602+
reqBody := anthropic.MessagesRequest{
603+
Stream: true,
604+
Model: "claude-3-sonnet-20240229",
605+
}
606+
_, _, err := translator.RequestBody([]byte(`{"stream":true}`), &reqBody, false)
607+
require.NoError(t, err)
608+
609+
// Sample streaming response from Anthropic with realistic flow:
610+
// 1. message_start provides input_tokens=15
611+
// 2. content_block events provide the actual text content
612+
// 3. message_delta at the end provides output_tokens=5 but no input_tokens
613+
// 4. message_stop ends the stream
614+
sseStream := `event: message_start
615+
data: {"type": "message_start", "message": {"id": "msg_123", "type": "message", "role": "assistant", "content": [], "model": "claude-3-sonnet-20240229", "usage": {"input_tokens": 15, "output_tokens": 0}}}
616+
617+
event: content_block_start
618+
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
619+
620+
event: content_block_delta
621+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
622+
623+
event: content_block_stop
624+
data: {"type": "content_block_stop", "index": 0}
625+
626+
event: message_delta
627+
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 5}}
628+
629+
event: message_stop
630+
data: {"type": "message_stop"}
631+
632+
`
633+
634+
// Process the streaming response
635+
reader := strings.NewReader(sseStream)
636+
_, _, tokenUsage, _, err := translator.ResponseBody(nil, reader, false, nil)
637+
require.NoError(t, err)
638+
639+
// Verify token usage - this should preserve input_tokens from message_start
640+
inputTokens, inputSet := tokenUsage.InputTokens()
641+
outputTokens, outputSet := tokenUsage.OutputTokens()
642+
totalTokens, totalSet := tokenUsage.TotalTokens()
643+
cachedTokens, cachedSet := tokenUsage.CachedInputTokens()
644+
645+
// Assertions
646+
assert.True(t, inputSet, "Input tokens should be set")
647+
assert.Equal(t, uint32(15), inputTokens, "Input tokens should be preserved from message_start")
648+
649+
assert.True(t, outputSet, "Output tokens should be set")
650+
assert.Equal(t, uint32(5), outputTokens, "Output tokens should come from message_delta")
651+
652+
assert.True(t, totalSet, "Total tokens should be calculated")
653+
assert.Equal(t, uint32(20), totalTokens, "Total tokens should be input + output")
654+
655+
assert.True(t, cachedSet, "Cached tokens should be set")
656+
assert.Equal(t, uint32(0), cachedTokens, "No cached tokens in this scenario")
657+
}

0 commit comments

Comments
 (0)