Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 0 additions & 116 deletions pkg/model/provider/anthropic/beta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func (c *Client) createBetaStream(
slog.Error("Failed to convert messages for Anthropic Beta request", "error", err)
return nil, err
}
if err := validateAnthropicSequencingBeta(converted); err != nil {
slog.Warn("Invalid message sequencing for Anthropic Beta API detected, attempting self-repair", "error", err)
converted = repairAnthropicSequencingBeta(converted)
if err2 := validateAnthropicSequencingBeta(converted); err2 != nil {
slog.Error("Failed to self-repair Anthropic Beta sequencing", "error", err2)
return nil, err
}
}
if len(converted) == 0 {
return nil, errors.New("no messages to send after conversion: all messages were filtered out")
}
Expand Down Expand Up @@ -149,114 +141,6 @@ func (c *Client) createBetaStream(
return ad, nil
}

// validateAnthropicSequencingBeta performs the same validation as standard API but for Beta payloads
func validateAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) error {
for i := range msgs {
m, ok := marshalToMapBeta(msgs[i])
if !ok || m["role"] != "assistant" {
continue
}

toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
if len(toolUseIDs) == 0 {
continue
}

if i+1 >= len(msgs) {
slog.Warn("Anthropic (beta) sequencing invalid: assistant tool_use present but no next user tool_result message", "assistant_index", i)
return errors.New("assistant tool_use present but no subsequent user message with tool_result blocks (beta)")
}

next, ok := marshalToMapBeta(msgs[i+1])
if !ok || next["role"] != "user" {
slog.Warn("Anthropic (beta) sequencing invalid: next message after assistant tool_use is not user", "assistant_index", i, "next_role", next["role"])
return errors.New("assistant tool_use must be followed by a user message containing corresponding tool_result blocks (beta)")
}

toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
missing := differenceIDs(toolUseIDs, toolResultIDs)
if len(missing) > 0 {
slog.Warn("Anthropic (beta) sequencing invalid: missing tool_result for tool_use id in next user message", "assistant_index", i, "tool_use_id", missing[0], "missing_count", len(missing))
return fmt.Errorf("missing tool_result for tool_use id %s in the next user message (beta)", missing[0])
}
}
return nil
}

// repairAnthropicSequencingBeta inserts a synthetic user message with tool_result blocks
// for any assistant tool_use blocks that don't have corresponding tool_result blocks
// in the immediate next user message.
func repairAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) []anthropic.BetaMessageParam {
if len(msgs) == 0 {
return msgs
}
repaired := make([]anthropic.BetaMessageParam, 0, len(msgs)+2)
for i := range msgs {
m, ok := marshalToMapBeta(msgs[i])
if !ok || m["role"] != "assistant" {
repaired = append(repaired, msgs[i])
continue
}

toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
if len(toolUseIDs) == 0 {
repaired = append(repaired, msgs[i])
continue
}

// Check if the next message is a user message with tool_results
needsSyntheticMessage := true
if i+1 < len(msgs) {
if next, ok := marshalToMapBeta(msgs[i+1]); ok && next["role"] == "user" {
toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
// Remove tool_use IDs that have corresponding tool_results
for id := range toolResultIDs {
delete(toolUseIDs, id)
}
// If all tool_use IDs have results, no synthetic message needed
if len(toolUseIDs) == 0 {
needsSyntheticMessage = false
}
}
}

// Append the assistant message first
repaired = append(repaired, msgs[i])

// If there are missing tool_results, insert a synthetic user message immediately after
if needsSyntheticMessage && len(toolUseIDs) > 0 {
slog.Debug("Inserting synthetic user message for missing tool_results",
"assistant_index", i,
"missing_count", len(toolUseIDs))

blocks := make([]anthropic.BetaContentBlockParamUnion, 0, len(toolUseIDs))
for id := range toolUseIDs {
slog.Debug("Creating synthetic tool_result", "tool_use_id", id)
blocks = append(blocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: id,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: "(tool execution failed)"}},
},
},
})
}
repaired = append(repaired, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: blocks,
})
}
}
return repaired
}

// marshalToMapBeta is an alias for marshalToMap - shared with standard API.
// Kept as separate function for clarity in Beta-specific code paths.
var marshalToMapBeta = marshalToMap

// contentArrayBeta is an alias for contentArray - shared with standard API.
var contentArrayBeta = contentArray

// countAnthropicTokensBeta calls Anthropic's Count Tokens API for the provided Beta API payload
// and returns the number of input tokens.
func countAnthropicTokensBeta(
Expand Down
73 changes: 48 additions & 25 deletions pkg/model/provider/anthropic/beta_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// blocks from the same assistant message MUST be grouped into a single user message.
func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Message) ([]anthropic.BetaMessageParam, error) {
var betaMessages []anthropic.BetaMessageParam
var pendingToolUseIDs map[string]struct{}

for i := 0; i < len(messages); i++ {
msg := &messages[i]
Expand Down Expand Up @@ -75,11 +76,15 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag

// Add tool calls
if len(msg.ToolCalls) > 0 {
pendingToolUseIDs = make(map[string]struct{}, len(msg.ToolCalls))
for _, toolCall := range msg.ToolCalls {
var inpts map[string]any
if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &inpts); err != nil {
inpts = map[string]any{}
}
if toolCall.ID != "" {
pendingToolUseIDs[toolCall.ID] = struct{}{}
}
contentBlocks = append(contentBlocks, anthropic.BetaContentBlockParamUnion{
OfToolUse: &anthropic.BetaToolUseBlockParam{
ID: toolCall.ID,
Expand All @@ -88,6 +93,8 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag
},
})
}
} else {
pendingToolUseIDs = nil
}

if len(contentBlocks) > 0 {
Expand All @@ -102,38 +109,54 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag
// Collect consecutive tool messages and merge them into a single user message
// This is required by Anthropic API: all tool_result blocks for tool_use blocks
// from the same assistant message must be in the same user message
toolResultBlocks := []anthropic.BetaContentBlockParamUnion{
{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: msg.ToolCallID,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(msg.Content)}},
},
},
},
if pendingToolUseIDs == nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beta converter silently drops orphan tool results while regular converter returns error

When pendingToolUseIDs == nil (indicating no preceding assistant tool_use in the current window), the beta converter silently skips tool messages without validation (lines 112-120). However, the regular converter in client.go:457-459 explicitly returns an error for this exact scenario:

if pendingToolUseIDs == nil {
    return nil, fmt.Errorf("unexpected tool result without preceding tool_use (tool_use_id=%q)", messages[i].ToolCallID)
}

Issue: This inconsistency violates the PR's stated goal of "strict enforcement" and "removing self-repair". Orphan tool results are a protocol violation per Anthropic's requirements — they should be caught and reported in both converters, not silently dropped in one.

Impact: Malformed message sequences (e.g., assistant with no tool calls followed by tool results) pass through undetected in the beta API path, potentially causing downstream issues or silent data loss.

Recommendation: Change the beta converter to return an error like the regular converter does, maintaining consistent strict validation across both API paths.

// Orphan tool results (no preceding assistant tool_use in this window): drop them.
j := i
for j < len(messages) && messages[j].Role == chat.MessageRoleTool {
j++
}
i = j - 1
continue
}

// Look ahead for consecutive tool messages and merge them
j := i + 1
toolResultBlocks := make([]anthropic.BetaContentBlockParamUnion, 0)
hadToolMessages := false
j := i
for j < len(messages) && messages[j].Role == chat.MessageRoleTool {
toolResultBlocks = append(toolResultBlocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: messages[j].ToolCallID,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(messages[j].Content)}},
},
},
})
hadToolMessages = true
id := messages[j].ToolCallID
if id != "" {
if _, ok := pendingToolUseIDs[id]; ok {
toolResultBlocks = append(toolResultBlocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: id,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(messages[j].Content)}},
},
},
})
delete(pendingToolUseIDs, id)
}
}
j++
}

// Add the merged user message with all tool results
betaMessages = append(betaMessages, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: toolResultBlocks,
})
if hadToolMessages && len(toolResultBlocks) == 0 {
return nil, fmt.Errorf("tool_result messages present but none match pending tool_use ids (beta converter)")
}
if len(pendingToolUseIDs) > 0 {
for id := range pendingToolUseIDs {
return nil, fmt.Errorf("missing tool_result for tool_use id %s (beta converter)", id)
}
}

// Skip the messages we've already processed
if len(toolResultBlocks) > 0 {
betaMessages = append(betaMessages, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: toolResultBlocks,
})
}
pendingToolUseIDs = nil
i = j - 1
continue
}
Expand Down
61 changes: 41 additions & 20 deletions pkg/model/provider/anthropic/beta_converter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package anthropic

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -10,6 +11,39 @@ import (
"github.com/docker/cagent/pkg/tools"
)

func marshalToMapBeta(t *testing.T, v any) map[string]any {
t.Helper()
b, err := json.Marshal(v)
require.NoError(t, err)
var m map[string]any
require.NoError(t, json.Unmarshal(b, &m))
return m
}

func contentArrayBeta(m map[string]any) []any {
if a, ok := m["content"].([]any); ok {
return a
}
return nil
}

func collectToolResultIDsBeta(content []any) map[string]struct{} {
ids := make(map[string]struct{})
for _, c := range content {
cb, ok := c.(map[string]any)
if !ok {
continue
}
if cb["type"] != "tool_result" {
continue
}
if id, _ := cb["tool_use_id"].(string); id != "" {
ids[id] = struct{}{}
}
}
return ids
}

func TestConvertBetaMessages_MergesConsecutiveToolMessages(t *testing.T) {
// Simulates the roast battle scenario where:
// - Assistant message has 2 tool_use blocks (transfer_task calls)
Expand Down Expand Up @@ -65,27 +99,22 @@ func TestConvertBetaMessages_MergesConsecutiveToolMessages(t *testing.T) {

require.Len(t, betaMessages, 4, "Should have 4 messages after conversion")

msg0Map, _ := marshalToMapBeta(betaMessages[0])
msg1Map, _ := marshalToMapBeta(betaMessages[1])
msg2Map, _ := marshalToMapBeta(betaMessages[2])
msg3Map, _ := marshalToMapBeta(betaMessages[3])
msg0Map := marshalToMapBeta(t, betaMessages[0])
msg1Map := marshalToMapBeta(t, betaMessages[1])
msg2Map := marshalToMapBeta(t, betaMessages[2])
msg3Map := marshalToMapBeta(t, betaMessages[3])
assert.Equal(t, "user", msg0Map["role"])
assert.Equal(t, "assistant", msg1Map["role"])
assert.Equal(t, "user", msg2Map["role"])
assert.Equal(t, "assistant", msg3Map["role"])

userMsg2Map, ok := marshalToMapBeta(betaMessages[2])
require.True(t, ok)
userMsg2Map := marshalToMapBeta(t, betaMessages[2])
content := contentArrayBeta(userMsg2Map)
require.Len(t, content, 2, "User message should have 2 tool_result blocks")

toolResultIDs := collectToolResultIDs(content)
toolResultIDs := collectToolResultIDsBeta(content)
assert.Contains(t, toolResultIDs, "tool_call_1")
assert.Contains(t, toolResultIDs, "tool_call_2")

// Most importantly: validate that the sequence is valid for Anthropic API
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err, "Converted messages should pass Anthropic sequencing validation")
}

func TestConvertBetaMessages_SingleToolMessage(t *testing.T) {
Expand Down Expand Up @@ -123,10 +152,6 @@ func TestConvertBetaMessages_SingleToolMessage(t *testing.T) {
betaMessages, err := testClient().convertBetaMessages(t.Context(), messages)
require.NoError(t, err)
require.Len(t, betaMessages, 4)

// Validate sequence
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err)
}

func TestConvertBetaMessages_NonConsecutiveToolMessages(t *testing.T) {
Expand Down Expand Up @@ -181,10 +206,6 @@ func TestConvertBetaMessages_NonConsecutiveToolMessages(t *testing.T) {
},
}

betaMessages, err := testClient().convertBetaMessages(t.Context(), messages)
_, err := testClient().convertBetaMessages(t.Context(), messages)
require.NoError(t, err)

// Validate the entire sequence
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err, "Messages with non-consecutive tool calls should still validate")
}
Loading