Skip to content

Commit 23dd567

Browse files
VarSurenSuren Vartanianyuzisunmathetake
authored
fix: gcp anthropic streaming usage (envoyproxy#1187)
**Description** Usage event were parsed as raw json instead of event type + json. Example of stream chunks: ``` event: message_start data: {"type":"message_start","message":{"id":"msg_vrtx_01V4HwqjNGwvFRqBVuhQNCXW","type":"message","role":"assistant","model":"claude-sonnet-4-20250514","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":1}} } event: ping data: {"type": "ping"} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"There"}} ``` --------- Signed-off-by: Suren Vartanian <[email protected]> Signed-off-by: Dan Sun <[email protected]> Co-authored-by: Suren Vartanian <[email protected]> Co-authored-by: Dan Sun <[email protected]> Co-authored-by: Takeshi Yoneda <[email protected]>
1 parent eb66d27 commit 23dd567

File tree

2 files changed

+129
-22
lines changed

2 files changed

+129
-22
lines changed

internal/extproc/translator/anthropic_gcpanthropic.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package translator
77

88
import (
9+
"bytes"
910
"encoding/json"
1011
"fmt"
1112
"io"
@@ -95,18 +96,44 @@ func (a *anthropicToGCPAnthropicTranslator) ResponseBody(_ map[string]string, bo
9596
return nil, nil, LLMTokenUsage{}, fmt.Errorf("failed to read response body: %w", err)
9697
}
9798

98-
// For streaming chunks, try to extract token usage from message_delta events.
99+
// For streaming chunks, parse SSE format to extract token usage.
99100
if !endOfStream {
100-
// Try to parse as a message_delta event to extract usage.
101-
var eventData map[string]any
102-
if unmarshalErr := json.Unmarshal(bodyBytes, &eventData); unmarshalErr == nil {
103-
if eventType, ok := eventData["type"].(string); ok && eventType == "message_delta" {
104-
if usageData, ok := eventData["usage"].(map[string]any); ok {
105-
// Extract token usage from the message_delta event.
106-
if outputTokens, ok := usageData["output_tokens"].(float64); ok {
107-
tokenUsage = LLMTokenUsage{
108-
OutputTokens: uint32(outputTokens), //nolint:gosec
109-
TotalTokens: uint32(outputTokens), //nolint:gosec // Only output tokens available in streaming
101+
// Parse SSE format - split by lines and look for data: lines.
102+
for line := range bytes.Lines(bodyBytes) {
103+
line = bytes.TrimSpace(line)
104+
dataPrefix := []byte("data: ")
105+
if bytes.HasPrefix(line, dataPrefix) {
106+
jsonData := bytes.TrimPrefix(line, dataPrefix)
107+
108+
var eventData map[string]any
109+
if unmarshalErr := json.Unmarshal(jsonData, &eventData); unmarshalErr != nil {
110+
// Skip lines with invalid JSON (like ping events or malformed data).
111+
continue
112+
}
113+
if eventType, ok := eventData["type"].(string); ok {
114+
switch eventType {
115+
case "message_start":
116+
// Extract input tokens from message.usage.
117+
if messageData, ok := eventData["message"].(map[string]any); ok {
118+
if usageData, ok := messageData["usage"].(map[string]any); ok {
119+
if inputTokens, ok := usageData["input_tokens"].(float64); ok {
120+
tokenUsage.InputTokens = uint32(inputTokens) //nolint:gosec
121+
}
122+
// Some message_start events may include initial output tokens.
123+
if outputTokens, ok := usageData["output_tokens"].(float64); ok && outputTokens > 0 {
124+
tokenUsage.OutputTokens = uint32(outputTokens) //nolint:gosec
125+
}
126+
tokenUsage.TotalTokens = tokenUsage.InputTokens + tokenUsage.OutputTokens
127+
}
128+
}
129+
130+
case "message_delta":
131+
if usageData, ok := eventData["usage"].(map[string]any); ok {
132+
if outputTokens, ok := usageData["output_tokens"].(float64); ok {
133+
// Add to existing output tokens (in case message_start had some initial ones).
134+
tokenUsage.OutputTokens += uint32(outputTokens) //nolint:gosec
135+
tokenUsage.TotalTokens = tokenUsage.InputTokens + tokenUsage.OutputTokens
136+
}
110137
}
111138
}
112139
}

internal/extproc/translator/anthropic_gcpanthropic_test.go

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -509,58 +509,58 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingTokenUsage(t *t
509509
}{
510510
{
511511
name: "regular streaming chunk without usage",
512-
chunk: `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" to me."}}`,
512+
chunk: "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\" to me.\"}}\n\n",
513513
endOfStream: false,
514514
expectedUsage: LLMTokenUsage{
515515
InputTokens: 0,
516516
OutputTokens: 0,
517517
TotalTokens: 0,
518518
},
519-
expectedBody: `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" to me."}}`,
519+
expectedBody: "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\" to me.\"}}\n\n",
520520
},
521521
{
522522
name: "message_delta chunk with token usage",
523-
chunk: `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":84}}`,
523+
chunk: "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":84}}\n\n",
524524
endOfStream: false,
525525
expectedUsage: LLMTokenUsage{
526526
InputTokens: 0,
527527
OutputTokens: 84,
528528
TotalTokens: 84,
529529
},
530-
expectedBody: `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":84}}`,
530+
expectedBody: "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":84}}\n\n",
531531
},
532532
{
533533
name: "message_stop chunk without usage",
534-
chunk: `{"type":"message_stop"}`,
534+
chunk: "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
535535
endOfStream: false,
536536
expectedUsage: LLMTokenUsage{
537537
InputTokens: 0,
538538
OutputTokens: 0,
539539
TotalTokens: 0,
540540
},
541-
expectedBody: `{"type":"message_stop"}`,
541+
expectedBody: "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
542542
},
543543
{
544544
name: "invalid json chunk",
545-
chunk: `{"invalid": "json"}`,
545+
chunk: "event: invalid\ndata: {\"invalid\": \"json\"}\n\n",
546546
endOfStream: false,
547547
expectedUsage: LLMTokenUsage{
548548
InputTokens: 0,
549549
OutputTokens: 0,
550550
TotalTokens: 0,
551551
},
552-
expectedBody: `{"invalid": "json"}`,
552+
expectedBody: "event: invalid\ndata: {\"invalid\": \"json\"}\n\n",
553553
},
554554
{
555555
name: "message_delta with decimal output_tokens",
556-
chunk: `{"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":42.0}}`,
556+
chunk: "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\"},\"usage\":{\"output_tokens\":42.0}}\n\n",
557557
endOfStream: false,
558558
expectedUsage: LLMTokenUsage{
559559
InputTokens: 0,
560560
OutputTokens: 42,
561561
TotalTokens: 42,
562562
},
563-
expectedBody: `{"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":42.0}}`,
563+
expectedBody: "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\"},\"usage\":{\"output_tokens\":42.0}}\n\n",
564564
},
565565
}
566566

@@ -574,7 +574,87 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingTokenUsage(t *t
574574
require.NoError(t, err)
575575
require.Nil(t, headerMutation)
576576
require.NotNil(t, bodyMutation)
577-
require.JSONEq(t, tt.expectedBody, string(bodyMutation.GetBody()))
577+
require.Equal(t, tt.expectedBody, string(bodyMutation.GetBody()))
578+
require.Equal(t, tt.expectedUsage, tokenUsage)
579+
})
580+
}
581+
}
582+
583+
func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingEdgeCases(t *testing.T) {
584+
translator := NewAnthropicToGCPAnthropicTranslator("2023-06-01", "")
585+
586+
tests := []struct {
587+
name string
588+
chunk string
589+
expectedUsage LLMTokenUsage
590+
}{
591+
{
592+
name: "message_start without message field",
593+
chunk: "event: message_start\ndata: {\"type\":\"message_start\"}\n\n",
594+
expectedUsage: LLMTokenUsage{
595+
InputTokens: 0,
596+
OutputTokens: 0,
597+
TotalTokens: 0,
598+
},
599+
},
600+
{
601+
name: "message_start without usage field",
602+
chunk: "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_123\"}}\n\n",
603+
expectedUsage: LLMTokenUsage{
604+
InputTokens: 0,
605+
OutputTokens: 0,
606+
TotalTokens: 0,
607+
},
608+
},
609+
{
610+
name: "message_start with output_tokens = 0",
611+
chunk: "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"usage\":{\"input_tokens\":20,\"output_tokens\":0}}}\n\n",
612+
expectedUsage: LLMTokenUsage{
613+
InputTokens: 20,
614+
OutputTokens: 0,
615+
TotalTokens: 20,
616+
},
617+
},
618+
{
619+
name: "message_start with output_tokens > 0",
620+
chunk: "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"usage\":{\"input_tokens\":15,\"output_tokens\":5}}}\n\n",
621+
expectedUsage: LLMTokenUsage{
622+
InputTokens: 15,
623+
OutputTokens: 5,
624+
TotalTokens: 20,
625+
},
626+
},
627+
{
628+
name: "message_delta without usage field",
629+
chunk: "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"}}\n\n",
630+
expectedUsage: LLMTokenUsage{
631+
InputTokens: 0,
632+
OutputTokens: 0,
633+
TotalTokens: 0,
634+
},
635+
},
636+
{
637+
name: "invalid json in data",
638+
chunk: "event: message_start\ndata: {invalid json}\n\n",
639+
expectedUsage: LLMTokenUsage{
640+
InputTokens: 0,
641+
OutputTokens: 0,
642+
TotalTokens: 0,
643+
},
644+
},
645+
}
646+
647+
for _, tt := range tests {
648+
t.Run(tt.name, func(t *testing.T) {
649+
bodyReader := bytes.NewReader([]byte(tt.chunk))
650+
respHeaders := map[string]string{"content-type": "application/json"}
651+
652+
headerMutation, bodyMutation, tokenUsage, err := translator.ResponseBody(respHeaders, bodyReader, false)
653+
654+
require.NoError(t, err)
655+
require.Nil(t, headerMutation)
656+
require.NotNil(t, bodyMutation)
657+
require.Equal(t, tt.chunk, string(bodyMutation.GetBody()))
578658
require.Equal(t, tt.expectedUsage, tokenUsage)
579659
})
580660
}

0 commit comments

Comments
 (0)