Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/agui/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ replace (
)

require (
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd
github.com/google/uuid v1.6.0
github.com/sirupsen/logrus v1.9.3
go.opentelemetry.io/otel v1.29.0
Expand Down
4 changes: 2 additions & 2 deletions examples/agui/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 h1:Nvj2lAHdXqA4CzI6yZbEpLDoWLyN2uJ/LlBfM0VsSrc=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd h1:qeat3GmeCc5a3xlpi+Z0EcuM3CqTl0O6ekJ+dAqCqqo=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI=
github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE=
github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down
2 changes: 1 addition & 1 deletion server/agui/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.24.4
replace trpc.group/trpc-go/trpc-agent-go => ../../

require (
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.10.0
trpc.group/trpc-go/trpc-agent-go v0.0.0-20251126064502-c8c2594d2519
Expand Down
4 changes: 2 additions & 2 deletions server/agui/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5 h1:Nvj2lAHdXqA4CzI6yZbEpLDoWLyN2uJ/LlBfM0VsSrc=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251023014556-5eaa5fba9ad5/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd h1:qeat3GmeCc5a3xlpi+Z0EcuM3CqTl0O6ekJ+dAqCqqo=
github.com/ag-ui-protocol/ag-ui/sdks/community/go v0.0.0-20251202144511-d0e095fddefd/go.mod h1:ERAMOexUee4AIuoxksuuGoEcHl3aqLwaazjGwlR9ZCI=
github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE=
github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down
78 changes: 77 additions & 1 deletion server/agui/internal/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (r *reducer) reduceEvent(evt aguievents.Event) error {
case *aguievents.ToolCallResultEvent:
return r.handleToolResult(e)
default:
return nil
return r.handleActivity(e)
}
}

Expand Down Expand Up @@ -339,6 +339,82 @@ func (r *reducer) handleToolResult(e *aguievents.ToolCallResultEvent) error {
return nil
}

// handleActivity handles the activity event.
func (r *reducer) handleActivity(e aguievents.Event) error {
activity := &aguievents.Message{Role: "activity"}
switch e := e.(type) {
case *aguievents.StepStartedEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"stepName": e.StepName,
}
case *aguievents.StepFinishedEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"stepName": e.StepName,
}
case *aguievents.StateSnapshotEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"snapshot": e.Snapshot,
}
case *aguievents.StateDeltaEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"delta": e.Delta,
}
case *aguievents.MessagesSnapshotEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"messages": e.Messages,
}
case *aguievents.ActivitySnapshotEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"messageId": e.MessageID,
"activityType": e.ActivityType,
"content": e.Content,
"replace": e.Replace,
}
case *aguievents.ActivityDeltaEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"messageId": e.MessageID,
"activityType": e.ActivityType,
"patch": e.Patch,
}
case *aguievents.CustomEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"name": e.Name,
"value": e.Value,
}
case *aguievents.RawEvent:
activity.ID = e.ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"source": e.Source,
"event": e.Event,
}
default:
activity.ID = e.GetBaseEvent().ID()
activity.ActivityType = string(e.Type())
activity.ActivityContent = map[string]any{
"content": e,
}
}
r.messages = append(r.messages, activity)
return nil
}

// finalize finalizes the message snapshots.
func (r *reducer) finalize() error {
for id, state := range r.texts {
Expand Down
167 changes: 154 additions & 13 deletions server/agui/internal/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package reduce

import (
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -155,17 +156,14 @@ func TestHandleTextChunkSuccess(t *testing.T) {
}{
{
name: "assistant default role empty delta",
chunk: aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1"),
chunk: aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr("")),
wantRole: "assistant",
wantName: testAppName,
wantContent: "",
},
{
name: "user role with delta",
chunk: aguievents.NewTextMessageChunkEvent().
WithChunkMessageID("msg-2").
WithChunkRole("user").
WithChunkDelta("hi"),
name: "user role with delta",
chunk: aguievents.NewTextMessageChunkEvent(stringPtr("msg-2"), stringPtr("user"), stringPtr("hi")),
wantRole: "user",
wantName: testUserID,
wantContent: "hi",
Expand Down Expand Up @@ -210,16 +208,20 @@ func TestHandleTextChunkSuccess(t *testing.T) {
}
}

func stringPtr(s string) *string {
return &s
}

func TestHandleTextChunkErrors(t *testing.T) {
t.Run("missing id", func(t *testing.T) {
chunk := aguievents.NewTextMessageChunkEvent()
chunk := aguievents.NewTextMessageChunkEvent(stringPtr(""), stringPtr("assistant"), stringPtr(""))
r := new(testAppName, testUserID)
if err := r.handleTextChunk(chunk); err == nil || !strings.Contains(err.Error(), "text message chunk missing id") {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("duplicate id", func(t *testing.T) {
chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1")
chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr(""))
r := new(testAppName, testUserID)
if err := r.handleTextChunk(chunk); err != nil {
t.Fatalf("handleTextChunk err: %v", err)
Expand All @@ -229,14 +231,14 @@ func TestHandleTextChunkErrors(t *testing.T) {
}
})
t.Run("unsupported role", func(t *testing.T) {
chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-3").WithChunkRole("tool")
chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-3"), stringPtr("tool"), stringPtr(""))
r := new(testAppName, testUserID)
if err := r.handleTextChunk(chunk); err == nil || !strings.Contains(err.Error(), "unsupported role: tool") {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("empty string id pointer", func(t *testing.T) {
chunk := aguievents.NewTextMessageChunkEvent()
chunk := aguievents.NewTextMessageChunkEvent(stringPtr(""), stringPtr("assistant"), stringPtr(""))
empty := ""
chunk.MessageID = &empty
r := new(testAppName, testUserID)
Expand All @@ -248,7 +250,7 @@ func TestHandleTextChunkErrors(t *testing.T) {

func TestReduceEventDispatchesChunk(t *testing.T) {
r := new(testAppName, testUserID)
chunk := aguievents.NewTextMessageChunkEvent().WithChunkMessageID("msg-1").WithChunkDelta("hi")
chunk := aguievents.NewTextMessageChunkEvent(stringPtr("msg-1"), stringPtr("assistant"), stringPtr("hi"))
if err := r.reduceEvent(chunk); err != nil {
t.Fatalf("reduceEvent err: %v", err)
}
Expand Down Expand Up @@ -701,8 +703,147 @@ func TestReduceIgnoresUnknownEvents(t *testing.T) {
if err != nil {
t.Fatalf("Reduce err: %v", err)
}
if len(msgs) != 0 {
t.Fatalf("expected no messages, got %d", len(msgs))
if len(msgs) != 1 {
t.Fatalf("expected 1 message, got %d", len(msgs))
}
}

func TestHandleActivityAllCases(t *testing.T) {
stepStarted := aguievents.NewStepStartedEvent("prep")
stepFinished := aguievents.NewStepFinishedEvent("cleanup")
stateSnapshot := aguievents.NewStateSnapshotEvent(map[string]any{"status": "ok"})
stateDeltaOps := []aguievents.JSONPatchOperation{{Op: "add", Path: "/count", Value: 1}}
stateDelta := aguievents.NewStateDeltaEvent(stateDeltaOps)
messageSnapshotEvent := aguievents.NewMessagesSnapshotEvent([]aguievents.Message{
{ID: "msg-1", Role: "assistant"},
})
activitySnapshot := aguievents.NewActivitySnapshotEvent("activity-1", "PLAN", map[string]any{"status": "draft"}).WithReplace(false)
activityDeltaOps := []aguievents.JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}}
activityDelta := aguievents.NewActivityDeltaEvent("activity-2", "PLAN", activityDeltaOps)
customEvent := aguievents.NewCustomEvent("custom-event", aguievents.WithValue(map[string]any{"k": "v"}))
rawEvent := aguievents.NewRawEvent(map[string]any{"raw": true}, aguievents.WithSource("unit-test"))
runStarted := aguievents.NewRunStartedEvent("thread-1", "run-1")

tests := []struct {
name string
event aguievents.Event
wantID string
wantType string
wantContent map[string]any
}{
{
name: "step started",
event: stepStarted,
wantID: stepStarted.ID(),
wantType: string(stepStarted.Type()),
wantContent: map[string]any{"stepName": stepStarted.StepName},
},
{
name: "step finished",
event: stepFinished,
wantID: stepFinished.ID(),
wantType: string(stepFinished.Type()),
wantContent: map[string]any{"stepName": stepFinished.StepName},
},
{
name: "state snapshot",
event: stateSnapshot,
wantID: stateSnapshot.ID(),
wantType: string(stateSnapshot.Type()),
wantContent: map[string]any{"snapshot": stateSnapshot.Snapshot},
},
{
name: "state delta",
event: stateDelta,
wantID: stateDelta.ID(),
wantType: string(stateDelta.Type()),
wantContent: map[string]any{"delta": stateDelta.Delta},
},
{
name: "messages snapshot",
event: messageSnapshotEvent,
wantID: messageSnapshotEvent.ID(),
wantType: string(messageSnapshotEvent.Type()),
wantContent: map[string]any{"messages": messageSnapshotEvent.Messages},
},
{
name: "activity snapshot",
event: activitySnapshot,
wantID: activitySnapshot.ID(),
wantType: string(activitySnapshot.Type()),
wantContent: map[string]any{
"messageId": activitySnapshot.MessageID,
"activityType": activitySnapshot.ActivityType,
"content": activitySnapshot.Content,
"replace": activitySnapshot.Replace,
},
},
{
name: "activity delta",
event: activityDelta,
wantID: activityDelta.ID(),
wantType: string(activityDelta.Type()),
wantContent: map[string]any{
"messageId": activityDelta.MessageID,
"activityType": activityDelta.ActivityType,
"patch": activityDelta.Patch,
},
},
{
name: "custom event",
event: customEvent,
wantID: customEvent.ID(),
wantType: string(customEvent.Type()),
wantContent: map[string]any{
"name": customEvent.Name,
"value": customEvent.Value,
},
},
{
name: "raw event",
event: rawEvent,
wantID: rawEvent.ID(),
wantType: string(rawEvent.Type()),
wantContent: map[string]any{
"source": rawEvent.Source,
"event": rawEvent.Event,
},
},
{
name: "default passthrough",
event: runStarted,
wantID: runStarted.GetBaseEvent().ID(),
wantType: string(runStarted.Type()),
wantContent: map[string]any{"content": runStarted},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := new(testAppName, testUserID)
if err := r.handleActivity(tt.event); err != nil {
t.Fatalf("handleActivity err: %v", err)
}
if len(r.messages) != 1 {
t.Fatalf("expected 1 activity message, got %d", len(r.messages))
}
msg := r.messages[0]
if msg.Role != "activity" {
t.Fatalf("unexpected role %q", msg.Role)
}
if msg.ID != tt.wantID {
t.Fatalf("unexpected id %q", msg.ID)
}
if msg.ActivityType != tt.wantType {
t.Fatalf("unexpected activity type %q", msg.ActivityType)
}
if msg.Content != nil {
t.Fatalf("expected nil text content, got %v", msg.Content)
}
if !reflect.DeepEqual(msg.ActivityContent, tt.wantContent) {
t.Fatalf("unexpected activity content %+v", msg.ActivityContent)
}
})
}
}

Expand Down
Loading