Skip to content

Commit c8ce038

Browse files
authored
server/agui: unique user messages based on requestID in AGUI MessagesSnapshot (#616)
In the agenttool scenario, user messages may appear multiple times in a session to provide context to the agenttool, causing messagesnapshot to send multiple user messages to the frontend. Therefore, it is necessary to deduplicate user messages based on requestID.
1 parent 19afda7 commit c8ce038

File tree

2 files changed

+220
-20
lines changed

2 files changed

+220
-20
lines changed

server/agui/runner/messagessnapshot.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,26 @@ func (r *runner) convertToMessagesSnapshotEvent(ctx context.Context, userID stri
118118
if len(events) == 0 {
119119
return aguievents.NewMessagesSnapshotEvent(messages), nil
120120
}
121+
lastRequestID := ""
121122
for _, event := range events {
122123
event, err := r.handleBeforeTranslate(ctx, &event)
123124
if err != nil {
124125
return nil, fmt.Errorf("handle before translate: %w", err)
125126
}
126-
if event == nil || event.Response == nil || len(event.Response.Choices) == 0 {
127+
if r.ignoreEvent(event) {
127128
continue
128129
}
129130
for _, choice := range event.Response.Choices {
130131
switch choice.Message.Role {
131132
case model.RoleSystem:
132133
messages = append(messages, *r.convertToSystemMessage(event.ID, choice))
133134
case model.RoleUser:
134-
messages = append(messages, *r.convertToUserMessage(event.ID, userID, choice))
135+
if lastRequestID != event.RequestID {
136+
// User message may be repeated multiple times in multiagent scenario.
137+
// Only the first message should be included in the snapshot.
138+
lastRequestID = event.RequestID
139+
messages = append(messages, *r.convertToUserMessage(event.ID, userID, choice))
140+
}
135141
case model.RoleAssistant:
136142
messages = append(messages, *r.convertToAssistantMessage(event.ID, choice))
137143
case model.RoleTool:
@@ -144,6 +150,25 @@ func (r *runner) convertToMessagesSnapshotEvent(ctx context.Context, userID stri
144150
return aguievents.NewMessagesSnapshotEvent(messages), nil
145151
}
146152

153+
func (r *runner) ignoreEvent(event *event.Event) bool {
154+
if event == nil || event.Response == nil || len(event.Response.Choices) == 0 {
155+
return true
156+
}
157+
switch event.Response.Object {
158+
// Model response event.
159+
case model.ObjectTypeChatCompletion:
160+
return false
161+
// Tool response event.
162+
case model.ObjectTypeToolResponse:
163+
return false
164+
// User message event.
165+
case "":
166+
return false
167+
default:
168+
return true
169+
}
170+
}
171+
147172
// convertToSystemMessage converts system events to AG-UI Message.
148173
func (r *runner) convertToSystemMessage(id string, choice model.Choice) *aguievents.Message {
149174
return &aguievents.Message{

server/agui/runner/messagessnapshot_test.go

Lines changed: 193 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import (
1515
"testing"
1616

1717
aguievents "github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/core/events"
18+
"github.com/google/uuid"
1819
"github.com/stretchr/testify/assert"
1920
"trpc.group/trpc-go/trpc-agent-go/agent"
20-
agentevent "trpc.group/trpc-go/trpc-agent-go/event"
21-
eventpkg "trpc.group/trpc-go/trpc-agent-go/event"
21+
"trpc.group/trpc-go/trpc-agent-go/event"
2222
"trpc.group/trpc-go/trpc-agent-go/model"
2323
"trpc.group/trpc-go/trpc-agent-go/server/agui/adapter"
2424
"trpc.group/trpc-go/trpc-agent-go/server/agui/translator"
@@ -58,7 +58,7 @@ func TestMessagesSnapshotRequiresSessionService(t *testing.T) {
5858
}
5959

6060
func TestMessagesSnapshotHappyPath(t *testing.T) {
61-
events := []eventpkg.Event{
61+
events := []event.Event{
6262
newResponse(model.RoleUser, "hello", nil),
6363
newResponse(model.RoleSystem, "system", nil),
6464
newResponse(model.RoleAssistant, "reply", func(m *model.Message) {
@@ -111,7 +111,7 @@ func TestMessagesSnapshotHappyPath(t *testing.T) {
111111
}
112112

113113
func TestMessagesSnapshotUnknownRole(t *testing.T) {
114-
events := []eventpkg.Event{
114+
events := []event.Event{
115115
newResponse(model.RoleUser, "hello", nil),
116116
newResponse(model.Role("unknown"), "?", nil),
117117
}
@@ -173,7 +173,7 @@ func TestMessagesSnapshotUserIDResolverError(t *testing.T) {
173173
userIDResolver: userIDResolver,
174174
runAgentInputHook: NewOptions().RunAgentInputHook,
175175
appName: "demo",
176-
sessionService: &testSessionService{events: []eventpkg.Event{newResponse(model.RoleUser, "hello", nil)}},
176+
sessionService: &testSessionService{events: []event.Event{newResponse(model.RoleUser, "hello", nil)}},
177177
}
178178

179179
stream, err := r.MessagesSnapshot(
@@ -252,8 +252,8 @@ func TestMessagesSnapshotRunAgentInputHookError(t *testing.T) {
252252
type noopBaseRunner struct{}
253253

254254
func (noopBaseRunner) Run(ctx context.Context, userID string, sessionID string, message model.Message,
255-
_ ...agent.RunOption) (<-chan *agentevent.Event, error) {
256-
ch := make(chan *agentevent.Event)
255+
_ ...agent.RunOption) (<-chan *event.Event, error) {
256+
ch := make(chan *event.Event)
257257
close(ch)
258258
return ch, nil
259259
}
@@ -271,7 +271,7 @@ func TestGetSessionEventsNilSession(t *testing.T) {
271271
// TestConvertToMessagesSnapshotEventSkipsNilResponse ensures nil response events are ignored.
272272
func TestConvertToMessagesSnapshotEventSkipsNilResponse(t *testing.T) {
273273
r := &runner{}
274-
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(), "user-id", []eventpkg.Event{{}})
274+
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(), "user-id", []event.Event{{}})
275275
assert.NoError(t, err)
276276
assert.NotNil(t, snapshot)
277277
assert.Len(t, snapshot.Messages, 0)
@@ -280,7 +280,7 @@ func TestConvertToMessagesSnapshotEventSkipsNilResponse(t *testing.T) {
280280
func TestConvertToMessagesSnapshotEventIncludesUserIDName(t *testing.T) {
281281
r := &runner{}
282282
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(),
283-
"user-id", []eventpkg.Event{newResponse(model.RoleUser, "hello", nil)})
283+
"user-id", []event.Event{newResponse(model.RoleUser, "hello", nil)})
284284
assert.NoError(t, err)
285285
assert.NotNil(t, snapshot)
286286
assert.Len(t, snapshot.Messages, 1)
@@ -291,7 +291,7 @@ func TestConvertToMessagesSnapshotEventIncludesUserIDName(t *testing.T) {
291291
// TestConvertToMessagesSnapshotEventBeforeCallbackMutates ensures before callbacks update messages.
292292
func TestConvertToMessagesSnapshotEventBeforeCallbackMutates(t *testing.T) {
293293
callbacks := translator.NewCallbacks().
294-
RegisterBeforeTranslate(func(ctx context.Context, evt *eventpkg.Event) (*eventpkg.Event, error) {
294+
RegisterBeforeTranslate(func(ctx context.Context, evt *event.Event) (*event.Event, error) {
295295
if evt.Response != nil && len(evt.Response.Choices) > 0 {
296296
evt.Response.Choices[0].Message.Content = "patched"
297297
}
@@ -301,29 +301,196 @@ func TestConvertToMessagesSnapshotEventBeforeCallbackMutates(t *testing.T) {
301301
translateCallbacks: callbacks,
302302
}
303303
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(),
304-
"user-id", []eventpkg.Event{newResponse(model.RoleUser, "hello", nil)})
304+
"user-id", []event.Event{newResponse(model.RoleUser, "hello", nil)})
305305
assert.NoError(t, err)
306306
assert.NotNil(t, snapshot)
307307
assert.Len(t, snapshot.Messages, 1)
308308
assert.Equal(t, "patched", *snapshot.Messages[0].Content)
309309
}
310310

311+
func TestConvertToMessagesSnapshotEventDeduplicatesUserMessages(t *testing.T) {
312+
r := &runner{}
313+
sharedRequestID := "req-shared"
314+
events := []event.Event{
315+
newResponseWithRequestID(model.RoleUser, "hello", sharedRequestID, nil),
316+
newResponseWithRequestID(model.RoleUser, "hello again", sharedRequestID, nil),
317+
newResponseWithRequestID(model.RoleUser, "next", "req-next", nil),
318+
}
319+
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(), "user-id", events)
320+
assert.NoError(t, err)
321+
assert.NotNil(t, snapshot)
322+
assert.Len(t, snapshot.Messages, 2)
323+
assert.Equal(t, "hello", *snapshot.Messages[0].Content)
324+
assert.Equal(t, "next", *snapshot.Messages[1].Content)
325+
}
326+
327+
func TestIgnoreEvent(t *testing.T) {
328+
tests := []struct {
329+
name string
330+
evt *event.Event
331+
want bool
332+
}{
333+
{
334+
name: "nil event",
335+
evt: nil,
336+
want: true,
337+
},
338+
{
339+
name: "nil response",
340+
evt: &event.Event{Response: nil},
341+
want: true,
342+
},
343+
{
344+
name: "nil choices",
345+
evt: &event.Event{Response: &model.Response{Choices: nil}},
346+
want: true,
347+
},
348+
{
349+
name: model.ObjectTypeChatCompletion,
350+
evt: &event.Event{Response: &model.Response{
351+
Object: model.ObjectTypeChatCompletion,
352+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleUser, Content: "hello"}}},
353+
}},
354+
want: false,
355+
},
356+
{
357+
name: model.ObjectTypeToolResponse,
358+
evt: &event.Event{Response: &model.Response{
359+
Object: model.ObjectTypeToolResponse,
360+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleTool, Content: "hello"}}},
361+
}},
362+
want: false,
363+
},
364+
{
365+
name: "",
366+
evt: &event.Event{Response: &model.Response{
367+
Object: "",
368+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleUser, Content: "hello"}}},
369+
}},
370+
want: false,
371+
},
372+
{
373+
name: model.ObjectTypeError,
374+
evt: &event.Event{Response: &model.Response{
375+
Object: model.ObjectTypeError,
376+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
377+
}},
378+
want: true,
379+
},
380+
{
381+
name: model.ObjectTypePreprocessingBasic,
382+
evt: &event.Event{Response: &model.Response{
383+
Object: model.ObjectTypePreprocessingBasic,
384+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
385+
}},
386+
want: true,
387+
},
388+
{
389+
name: model.ObjectTypePreprocessingContent,
390+
evt: &event.Event{Response: &model.Response{
391+
Object: model.ObjectTypePreprocessingContent,
392+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
393+
}},
394+
want: true,
395+
},
396+
{
397+
name: model.ObjectTypePreprocessingIdentity,
398+
evt: &event.Event{Response: &model.Response{
399+
Object: model.ObjectTypePreprocessingIdentity,
400+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
401+
}},
402+
want: true,
403+
},
404+
{
405+
name: model.ObjectTypePreprocessingInstruction,
406+
evt: &event.Event{Response: &model.Response{
407+
Object: model.ObjectTypePreprocessingInstruction,
408+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
409+
}},
410+
want: true,
411+
},
412+
{
413+
name: model.ObjectTypePreprocessingPlanning,
414+
evt: &event.Event{Response: &model.Response{
415+
Object: model.ObjectTypePreprocessingPlanning,
416+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
417+
}},
418+
want: true,
419+
},
420+
{
421+
name: model.ObjectTypePostprocessingPlanning,
422+
evt: &event.Event{Response: &model.Response{
423+
Object: model.ObjectTypePostprocessingPlanning,
424+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
425+
}},
426+
want: true,
427+
},
428+
{
429+
name: model.ObjectTypePostprocessingCodeExecution,
430+
evt: &event.Event{Response: &model.Response{
431+
Object: model.ObjectTypePostprocessingCodeExecution,
432+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
433+
}},
434+
want: true,
435+
},
436+
{
437+
name: model.ObjectTypeTransfer,
438+
evt: &event.Event{Response: &model.Response{
439+
Object: model.ObjectTypeTransfer,
440+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
441+
}},
442+
want: true,
443+
},
444+
{
445+
name: model.ObjectTypeRunnerCompletion,
446+
evt: &event.Event{Response: &model.Response{
447+
Object: model.ObjectTypeRunnerCompletion,
448+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
449+
}},
450+
want: true,
451+
},
452+
{
453+
name: model.ObjectTypeStateUpdate,
454+
evt: &event.Event{Response: &model.Response{
455+
Object: model.ObjectTypeStateUpdate,
456+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
457+
}},
458+
want: true,
459+
},
460+
{
461+
name: model.ObjectTypeChatCompletionChunk,
462+
evt: &event.Event{Response: &model.Response{
463+
Object: model.ObjectTypeChatCompletionChunk,
464+
Choices: []model.Choice{{Message: model.Message{Role: model.RoleAssistant, Content: "hello"}}},
465+
}},
466+
want: true,
467+
},
468+
}
469+
for _, test := range tests {
470+
t.Run(test.name, func(t *testing.T) {
471+
r := &runner{}
472+
got := r.ignoreEvent(test.evt)
473+
assert.Equal(t, test.want, got)
474+
})
475+
}
476+
}
477+
311478
// TestConvertToMessagesSnapshotEventBeforeCallbackError ensures errors bubble up.
312479
func TestConvertToMessagesSnapshotEventBeforeCallbackError(t *testing.T) {
313480
callbacks := translator.NewCallbacks().
314-
RegisterBeforeTranslate(func(ctx context.Context, evt *eventpkg.Event) (*eventpkg.Event, error) {
481+
RegisterBeforeTranslate(func(ctx context.Context, evt *event.Event) (*event.Event, error) {
315482
return nil, errors.New("fail")
316483
})
317484
r := &runner{
318485
translateCallbacks: callbacks,
319486
}
320487
snapshot, err := r.convertToMessagesSnapshotEvent(context.Background(),
321-
"user-id", []eventpkg.Event{newResponse(model.RoleUser, "hello", nil)})
488+
"user-id", []event.Event{newResponse(model.RoleUser, "hello", nil)})
322489
assert.Nil(t, snapshot)
323490
assert.Error(t, err)
324491
}
325492

326-
func newResponse(role model.Role, content string, mutate func(*model.Message)) eventpkg.Event {
493+
func newResponse(role model.Role, content string, mutate func(*model.Message)) event.Event {
327494
msg := model.Message{Role: role, Content: content}
328495
if mutate != nil {
329496
mutate(&msg)
@@ -332,12 +499,20 @@ func newResponse(role model.Role, content string, mutate func(*model.Message)) e
332499
ID: "id-" + string(role) + content,
333500
Choices: []model.Choice{{Message: msg}},
334501
}
335-
evt := agentevent.NewResponseEvent("invocation", string(role), resp)
502+
evt := event.NewResponseEvent("invocation", string(role), resp)
503+
evt.RequestID = uuid.NewString()
336504
return *evt
337505
}
338506

507+
func newResponseWithRequestID(role model.Role, content, requestID string,
508+
mutate func(*model.Message)) event.Event {
509+
evt := newResponse(role, content, mutate)
510+
evt.RequestID = requestID
511+
return evt
512+
}
513+
339514
type testSessionService struct {
340-
events []eventpkg.Event
515+
events []event.Event
341516
getErr error
342517
returnNil bool
343518
}
@@ -357,7 +532,7 @@ func (s *testSessionService) GetSession(ctx context.Context, key session.Key,
357532
}
358533
sess := &session.Session{AppName: key.AppName, UserID: key.UserID, ID: key.SessionID}
359534
if len(s.events) > 0 {
360-
sess.Events = append([]eventpkg.Event(nil), s.events...)
535+
sess.Events = append([]event.Event(nil), s.events...)
361536
}
362537
return sess, nil
363538
}
@@ -395,7 +570,7 @@ func (s *testSessionService) DeleteUserState(ctx context.Context, key session.Us
395570
return nil
396571
}
397572

398-
func (s *testSessionService) AppendEvent(ctx context.Context, sess *session.Session, evt *eventpkg.Event,
573+
func (s *testSessionService) AppendEvent(ctx context.Context, sess *session.Session, evt *event.Event,
399574
opts ...session.Option) error {
400575
return nil
401576
}

0 commit comments

Comments
 (0)