Skip to content

Commit db3f659

Browse files
authored
internal/flow: fix duplicate tool messages emitted after parallel tool execution (#420)
When tools run in parallel their results are merged into one event; the history reorder step appended that same event once per tool ID, so the next LLM call saw duplicate tool messages and rejected the request. Deduplicate response event indices while preserving order so each merged tool response is emitted exactly once; tightened comments and added a regression test to lock the behaviour in.
1 parent 81c6b2d commit db3f659

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

internal/flow/processor/content.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,21 @@ func (p *ContentRequestProcessor) rearrangeAsyncFuncRespHist(
358358
responseEventIndices = append(responseEventIndices, idx)
359359
}
360360
}
361+
// When tools run in parallel they commonly return all results inside one response event.
362+
// If we pushed the same event once per tool ID, the LLM would see duplicated tool
363+
// messages and reject the request. Keep only the first occurrence of each event index
364+
// while preserving their original order.
365+
seenIdx := make(map[int]struct{}, len(functionCallIDs))
366+
uniqueIndices := responseEventIndices[:0]
367+
// Reuse the existing slice to deduplicate in place and maintain the original order.
368+
for _, idx := range responseEventIndices {
369+
if _, seen := seenIdx[idx]; seen {
370+
continue
371+
}
372+
seenIdx[idx] = struct{}{}
373+
uniqueIndices = append(uniqueIndices, idx)
374+
}
375+
responseEventIndices = uniqueIndices
361376

362377
resultEvents = append(resultEvents, evt)
363378

internal/flow/processor/content_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,68 @@ func TestContentRequestProcessor_ToolCalls(t *testing.T) {
154154
}
155155
}
156156

157+
func TestContentRequestProcessor_RearrangeAsyncFuncRespHist_DeduplicatesMergedResponses(t *testing.T) {
158+
processor := NewContentRequestProcessor()
159+
160+
toolCallEvent := event.Event{
161+
Author: "assistant",
162+
Response: &model.Response{
163+
Choices: []model.Choice{
164+
{
165+
Message: model.Message{
166+
Role: model.RoleAssistant,
167+
ToolCalls: []model.ToolCall{
168+
{
169+
ID: "call_0",
170+
Function: model.FunctionDefinitionParam{Name: "calculator"},
171+
},
172+
{
173+
ID: "call_1",
174+
Function: model.FunctionDefinitionParam{Name: "calculator"},
175+
},
176+
},
177+
},
178+
},
179+
},
180+
},
181+
}
182+
183+
mergedToolResponse := event.Event{
184+
Author: "assistant",
185+
Response: &model.Response{
186+
Choices: []model.Choice{
187+
{
188+
Message: model.Message{
189+
Role: model.RoleTool,
190+
ToolID: "call_0",
191+
Content: "result 0",
192+
},
193+
},
194+
{
195+
Message: model.Message{
196+
Role: model.RoleTool,
197+
ToolID: "call_1",
198+
Content: "result 1",
199+
},
200+
},
201+
},
202+
},
203+
}
204+
205+
result := processor.rearrangeAsyncFuncRespHist([]event.Event{toolCallEvent, mergedToolResponse})
206+
207+
if len(result) != 2 {
208+
t.Fatalf("expected 2 events (tool call + single response), got %d", len(result))
209+
}
210+
211+
toolResultEvent := result[1]
212+
resultIDs := toolResultEvent.GetToolResultIDs()
213+
assert.ElementsMatch(t, []string{"call_0", "call_1"}, resultIDs,
214+
"tool result IDs should match the original tool calls once each")
215+
assert.Len(t, toolResultEvent.Response.Choices, 2,
216+
"tool response event should contain one choice per tool ID without duplication")
217+
}
218+
157219
func TestContentRequestProcessor_ToolResponses(t *testing.T) {
158220
tests := []struct {
159221
name string

0 commit comments

Comments
 (0)