From ef37a57ca37261628e646f538be9aefbb7d8e64b Mon Sep 17 00:00:00 2001 From: hackerli Date: Fri, 28 Nov 2025 09:48:38 +0800 Subject: [PATCH 1/4] feat --- server/agui/go.mod | 2 + server/agui/go.sum | 2 - server/agui/internal/reduce/reduce.go | 78 ++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/server/agui/go.mod b/server/agui/go.mod index 61dab02d9..7822bae9b 100644 --- a/server/agui/go.mod +++ b/server/agui/go.mod @@ -4,6 +4,8 @@ go 1.24.4 replace trpc.group/trpc-go/trpc-agent-go => ../../ +replace github.com/ag-ui-protocol/ag-ui/sdks/community/go => ../../../ag-ui/sdks/community/go + require ( github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 github.com/google/uuid v1.6.0 diff --git a/server/agui/go.sum b/server/agui/go.sum index 40afce3a7..81040ec48 100644 --- a/server/agui/go.sum +++ b/server/agui/go.sum @@ -1,5 +1,3 @@ -github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 h1:Nvj2lAHdXqA4CzI6yZbEpLDoWLyN2uJ/LlBfM0VsSrc= -github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI= github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE= github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= diff --git a/server/agui/internal/reduce/reduce.go b/server/agui/internal/reduce/reduce.go index ec64575cd..098cb9dc3 100644 --- a/server/agui/internal/reduce/reduce.go +++ b/server/agui/internal/reduce/reduce.go @@ -117,7 +117,7 @@ func (r *reducer) reduce(trackEvent session.TrackEvent) error { case *aguievents.ToolCallResultEvent: return r.handleToolResult(e) default: - return nil + return r.handleActivity(e) } } @@ -284,6 +284,82 @@ func (r *reducer) handleToolResult(e *aguievents.ToolCallResultEvent) error { return nil } +// handleActivity handles the activity event. +func (r *reducer) handleActivity(e aguievents.Event) error { + activity := &aguievents.Message{Role: "activity"} + switch e := e.(type) { + case *aguievents.StepStartedEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "stepName": e.StepName, + } + case *aguievents.StepFinishedEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "stepName": e.StepName, + } + case *aguievents.StateSnapshotEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "snapshot": e.Snapshot, + } + case *aguievents.StateDeltaEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "delta": e.Delta, + } + case *aguievents.MessagesSnapshotEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "messages": e.Messages, + } + case *aguievents.ActivitySnapshotEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "messageId": e.MessageID, + "activityType": e.ActivityType, + "content": e.Content, + "replace": e.Replace, + } + case *aguievents.ActivityDeltaEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "messageId": e.MessageID, + "activityType": e.ActivityType, + "patch": e.Patch, + } + case *aguievents.CustomEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "name": e.Name, + "value": e.Value, + } + case *aguievents.RawEvent: + activity.ID = e.ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "source": e.Source, + "event": e.Event, + } + default: + activity.ID = e.GetBaseEvent().ID() + activity.ActivityType = string(e.Type()) + activity.ActivityContent = map[string]any{ + "content": e, + } + } + r.messages = append(r.messages, activity) + return nil +} + // finalize finalizes the message snapshots. func (r *reducer) finalize() error { for id, state := range r.texts { From 8088264ffd99862e705bbe62a0fbefdcbf6c4190 Mon Sep 17 00:00:00 2001 From: Flash-LHR <2479082762@qq.com> Date: Tue, 2 Dec 2025 22:55:26 +0800 Subject: [PATCH 2/4] chore --- examples/agui/go.mod | 2 +- examples/agui/go.sum | 4 ++-- server/agui/go.mod | 4 +--- server/agui/go.sum | 2 ++ 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/agui/go.mod b/examples/agui/go.mod index e9e4cde23..050a474b7 100644 --- a/examples/agui/go.mod +++ b/examples/agui/go.mod @@ -8,7 +8,7 @@ replace ( ) require ( - github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 + github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd github.com/google/uuid v1.6.0 github.com/sirupsen/logrus v1.9.3 go.opentelemetry.io/otel v1.29.0 diff --git a/examples/agui/go.sum b/examples/agui/go.sum index c207ce798..8d4d559c2 100644 --- a/examples/agui/go.sum +++ b/examples/agui/go.sum @@ -1,5 +1,5 @@ -github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 h1:Nvj2lAHdXqA4CzI6yZbEpLDoWLyN2uJ/LlBfM0VsSrc= -github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI= +github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd h1:qeat3GmeCc5a3xlpi+Z0EcuM3CqTl0O6ekJ+dAqCqqo= +github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI= github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE= github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= diff --git a/server/agui/go.mod b/server/agui/go.mod index 7822bae9b..8462b3945 100644 --- a/server/agui/go.mod +++ b/server/agui/go.mod @@ -4,10 +4,8 @@ go 1.24.4 replace trpc.group/trpc-go/trpc-agent-go => ../../ -replace github.com/ag-ui-protocol/ag-ui/sdks/community/go => ../../../ag-ui/sdks/community/go - require ( - github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 + github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.10.0 trpc.group/trpc-go/trpc-agent-go v0.0.0-20251126064502-c8c2594d2519 diff --git a/server/agui/go.sum b/server/agui/go.sum index 81040ec48..4eeff14d2 100644 --- a/server/agui/go.sum +++ b/server/agui/go.sum @@ -1,3 +1,5 @@ +github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd h1:qeat3GmeCc5a3xlpi+Z0EcuM3CqTl0O6ekJ+dAqCqqo= +github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI= github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE= github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= From 137d39762528a73bb77c69597cffc392234d8460 Mon Sep 17 00:00:00 2001 From: Flash-LHR <2479082762@qq.com> Date: Tue, 2 Dec 2025 23:03:05 +0800 Subject: [PATCH 3/4] test --- server/agui/internal/reduce/reduce_test.go | 27 +++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/server/agui/internal/reduce/reduce_test.go b/server/agui/internal/reduce/reduce_test.go index 7eb1d58d7..f6f4343a5 100644 --- a/server/agui/internal/reduce/reduce_test.go +++ b/server/agui/internal/reduce/reduce_test.go @@ -155,17 +155,14 @@ func TestHandleTextChunkSuccess(t *testing.T) { }{ { name: "assistant default role empty delta", - chunk: aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1"), + chunk: aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr("")), wantRole: "assistant", wantName: testAppName, wantContent: "", }, { - name: "user role with delta", - chunk: aguievents.NewTextMessageChunkEvent(). - WithChunkMessageID("msg-2"). - WithChunkRole("user"). - WithChunkDelta("hi"), + name: "user role with delta", + chunk: aguievents.NewTextMessageChunkEvent(stringPtr("msg-2"), stringPtr("user"), stringPtr("hi")), wantRole: "user", wantName: testUserID, wantContent: "hi", @@ -210,16 +207,20 @@ func TestHandleTextChunkSuccess(t *testing.T) { } } +func stringPtr(s string) *string { + return &s +} + func TestHandleTextChunkErrors(t *testing.T) { t.Run("missing id", func(t *testing.T) { - chunk := aguievents.NewTextMessageChunkEvent() + chunk := aguievents.NewTextMessageChunkEvent(stringPtr(""), stringPtr("assistant"), stringPtr("")) r := new(testAppName, testUserID) if err := r.handleTextChunk(chunk); err == nil || !strings.Contains(err.Error(), "text message chunk missing id") { t.Fatalf("unexpected error %v", err) } }) t.Run("duplicate id", func(t *testing.T) { - chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1") + chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr("")) r := new(testAppName, testUserID) if err := r.handleTextChunk(chunk); err != nil { t.Fatalf("handleTextChunk err: %v", err) @@ -229,14 +230,14 @@ func TestHandleTextChunkErrors(t *testing.T) { } }) t.Run("unsupported role", func(t *testing.T) { - chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-3").WithChunkRole("tool") + chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-3"), stringPtr("tool"), stringPtr("")) r := new(testAppName, testUserID) if err := r.handleTextChunk(chunk); err == nil || !strings.Contains(err.Error(), "unsupported role: tool") { t.Fatalf("unexpected error %v", err) } }) t.Run("empty string id pointer", func(t *testing.T) { - chunk := aguievents.NewTextMessageChunkEvent() + chunk := aguievents.NewTextMessageChunkEvent(stringPtr(""), stringPtr("assistant"), stringPtr("")) empty := "" chunk.MessageID = &empty r := new(testAppName, testUserID) @@ -248,7 +249,7 @@ func TestHandleTextChunkErrors(t *testing.T) { func TestReduceEventDispatchesChunk(t *testing.T) { r := new(testAppName, testUserID) - chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1").WithChunkDelta("hi") + chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr("hi")) if err := r.reduceEvent(chunk); err != nil { t.Fatalf("reduceEvent err: %v", err) } @@ -701,8 +702,8 @@ func TestReduceIgnoresUnknownEvents(t *testing.T) { if err != nil { t.Fatalf("Reduce err: %v", err) } - if len(msgs) != 0 { - t.Fatalf("expected no messages, got %d", len(msgs)) + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d", len(msgs)) } } From 13e1adbed7bb762299890a74b3dbb636728365b2 Mon Sep 17 00:00:00 2001 From: Flash-LHR <2479082762@qq.com> Date: Tue, 2 Dec 2025 23:24:47 +0800 Subject: [PATCH 4/4] test --- server/agui/internal/reduce/reduce_test.go | 140 +++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/server/agui/internal/reduce/reduce_test.go b/server/agui/internal/reduce/reduce_test.go index f6f4343a5..c357aeba3 100644 --- a/server/agui/internal/reduce/reduce_test.go +++ b/server/agui/internal/reduce/reduce_test.go @@ -10,6 +10,7 @@ package reduce import ( + "reflect" "strings" "testing" "time" @@ -707,6 +708,145 @@ func TestReduceIgnoresUnknownEvents(t *testing.T) { } } +func TestHandleActivityAllCases(t *testing.T) { + stepStarted := aguievents.NewStepStartedEvent("prep") + stepFinished := aguievents.NewStepFinishedEvent("cleanup") + stateSnapshot := aguievents.NewStateSnapshotEvent(map[string]any{"status": "ok"}) + stateDeltaOps := []aguievents.JSONPatchOperation{{Op: "add", Path: "/count", Value: 1}} + stateDelta := aguievents.NewStateDeltaEvent(stateDeltaOps) + messageSnapshotEvent := aguievents.NewMessagesSnapshotEvent([]aguievents.Message{ + {ID: "msg-1", Role: "assistant"}, + }) + activitySnapshot := aguievents.NewActivitySnapshotEvent("activity-1", "PLAN", map[string]any{"status": "draft"}).WithReplace(false) + activityDeltaOps := []aguievents.JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}} + activityDelta := aguievents.NewActivityDeltaEvent("activity-2", "PLAN", activityDeltaOps) + customEvent := aguievents.NewCustomEvent("custom-event", aguievents.WithValue(map[string]any{"k": "v"})) + rawEvent := aguievents.NewRawEvent(map[string]any{"raw": true}, aguievents.WithSource("unit-test")) + runStarted := aguievents.NewRunStartedEvent("thread-1", "run-1") + + tests := []struct { + name string + event aguievents.Event + wantID string + wantType string + wantContent map[string]any + }{ + { + name: "step started", + event: stepStarted, + wantID: stepStarted.ID(), + wantType: string(stepStarted.Type()), + wantContent: map[string]any{"stepName": stepStarted.StepName}, + }, + { + name: "step finished", + event: stepFinished, + wantID: stepFinished.ID(), + wantType: string(stepFinished.Type()), + wantContent: map[string]any{"stepName": stepFinished.StepName}, + }, + { + name: "state snapshot", + event: stateSnapshot, + wantID: stateSnapshot.ID(), + wantType: string(stateSnapshot.Type()), + wantContent: map[string]any{"snapshot": stateSnapshot.Snapshot}, + }, + { + name: "state delta", + event: stateDelta, + wantID: stateDelta.ID(), + wantType: string(stateDelta.Type()), + wantContent: map[string]any{"delta": stateDelta.Delta}, + }, + { + name: "messages snapshot", + event: messageSnapshotEvent, + wantID: messageSnapshotEvent.ID(), + wantType: string(messageSnapshotEvent.Type()), + wantContent: map[string]any{"messages": messageSnapshotEvent.Messages}, + }, + { + name: "activity snapshot", + event: activitySnapshot, + wantID: activitySnapshot.ID(), + wantType: string(activitySnapshot.Type()), + wantContent: map[string]any{ + "messageId": activitySnapshot.MessageID, + "activityType": activitySnapshot.ActivityType, + "content": activitySnapshot.Content, + "replace": activitySnapshot.Replace, + }, + }, + { + name: "activity delta", + event: activityDelta, + wantID: activityDelta.ID(), + wantType: string(activityDelta.Type()), + wantContent: map[string]any{ + "messageId": activityDelta.MessageID, + "activityType": activityDelta.ActivityType, + "patch": activityDelta.Patch, + }, + }, + { + name: "custom event", + event: customEvent, + wantID: customEvent.ID(), + wantType: string(customEvent.Type()), + wantContent: map[string]any{ + "name": customEvent.Name, + "value": customEvent.Value, + }, + }, + { + name: "raw event", + event: rawEvent, + wantID: rawEvent.ID(), + wantType: string(rawEvent.Type()), + wantContent: map[string]any{ + "source": rawEvent.Source, + "event": rawEvent.Event, + }, + }, + { + name: "default passthrough", + event: runStarted, + wantID: runStarted.GetBaseEvent().ID(), + wantType: string(runStarted.Type()), + wantContent: map[string]any{"content": runStarted}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := new(testAppName, testUserID) + if err := r.handleActivity(tt.event); err != nil { + t.Fatalf("handleActivity err: %v", err) + } + if len(r.messages) != 1 { + t.Fatalf("expected 1 activity message, got %d", len(r.messages)) + } + msg := r.messages[0] + if msg.Role != "activity" { + t.Fatalf("unexpected role %q", msg.Role) + } + if msg.ID != tt.wantID { + t.Fatalf("unexpected id %q", msg.ID) + } + if msg.ActivityType != tt.wantType { + t.Fatalf("unexpected activity type %q", msg.ActivityType) + } + if msg.Content != nil { + t.Fatalf("expected nil text content, got %v", msg.Content) + } + if !reflect.DeepEqual(msg.ActivityContent, tt.wantContent) { + t.Fatalf("unexpected activity content %+v", msg.ActivityContent) + } + }) + } +} + func TestHandleToolEndMissingParent(t *testing.T) { r := new(testAppName, testUserID) r.toolCalls["tool-call-1"] = &toolCallState{