Skip to content

Commit 5f375fb

Browse files
committed
feat: activity message
1 parent c27826b commit 5f375fb

File tree

10 files changed

+1107
-13
lines changed

10 files changed

+1107
-13
lines changed

sdks/community/go/pkg/core/events/a.log

Lines changed: 374 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
// ActivitySnapshotEvent contains a snapshot of an activity message.
9+
type ActivitySnapshotEvent struct {
10+
*BaseEvent
11+
MessageID string `json:"messageId"`
12+
ActivityType string `json:"activityType"`
13+
Content any `json:"content"`
14+
Replace *bool `json:"replace,omitempty"`
15+
}
16+
17+
// NewActivitySnapshotEvent creates a new activity snapshot event.
18+
func NewActivitySnapshotEvent(messageID, activityType string, content any) *ActivitySnapshotEvent {
19+
replace := true
20+
return &ActivitySnapshotEvent{
21+
BaseEvent: NewBaseEvent(EventTypeActivitySnapshot),
22+
MessageID: messageID,
23+
ActivityType: activityType,
24+
Content: content,
25+
Replace: &replace,
26+
}
27+
}
28+
29+
// WithReplace sets the replace flag for the snapshot event.
30+
func (e *ActivitySnapshotEvent) WithReplace(replace bool) *ActivitySnapshotEvent {
31+
e.Replace = &replace
32+
return e
33+
}
34+
35+
// Validate validates the activity snapshot event.
36+
func (e *ActivitySnapshotEvent) Validate() error {
37+
if err := e.BaseEvent.Validate(); err != nil {
38+
return err
39+
}
40+
41+
if e.MessageID == "" {
42+
return fmt.Errorf("ActivitySnapshotEvent validation failed: messageId field is required")
43+
}
44+
45+
if e.ActivityType == "" {
46+
return fmt.Errorf("ActivitySnapshotEvent validation failed: activityType field is required")
47+
}
48+
49+
if e.Content == nil {
50+
return fmt.Errorf("ActivitySnapshotEvent validation failed: content field is required")
51+
}
52+
53+
return nil
54+
}
55+
56+
// ToJSON serializes the event to JSON.
57+
func (e *ActivitySnapshotEvent) ToJSON() ([]byte, error) {
58+
return json.Marshal(e)
59+
}
60+
61+
// ActivityDeltaEvent contains incremental updates for an activity message.
62+
type ActivityDeltaEvent struct {
63+
*BaseEvent
64+
MessageID string `json:"messageId"`
65+
ActivityType string `json:"activityType"`
66+
Patch []JSONPatchOperation `json:"patch"`
67+
}
68+
69+
// NewActivityDeltaEvent creates a new activity delta event.
70+
func NewActivityDeltaEvent(messageID, activityType string, patch []JSONPatchOperation) *ActivityDeltaEvent {
71+
return &ActivityDeltaEvent{
72+
BaseEvent: NewBaseEvent(EventTypeActivityDelta),
73+
MessageID: messageID,
74+
ActivityType: activityType,
75+
Patch: patch,
76+
}
77+
}
78+
79+
// Validate validates the activity delta event.
80+
func (e *ActivityDeltaEvent) Validate() error {
81+
if err := e.BaseEvent.Validate(); err != nil {
82+
return err
83+
}
84+
85+
if e.MessageID == "" {
86+
return fmt.Errorf("ActivityDeltaEvent validation failed: messageId field is required")
87+
}
88+
89+
if e.ActivityType == "" {
90+
return fmt.Errorf("ActivityDeltaEvent validation failed: activityType field is required")
91+
}
92+
93+
if len(e.Patch) == 0 {
94+
return fmt.Errorf("ActivityDeltaEvent validation failed: patch field must contain at least one operation")
95+
}
96+
97+
for i, op := range e.Patch {
98+
if err := validateJSONPatchOperation(op); err != nil {
99+
return fmt.Errorf("ActivityDeltaEvent validation failed: invalid patch operation at index %d: %w", i, err)
100+
}
101+
}
102+
103+
return nil
104+
}
105+
106+
// ToJSON serializes the event to JSON.
107+
func (e *ActivityDeltaEvent) ToJSON() ([]byte, error) {
108+
return json.Marshal(e)
109+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestActivitySnapshotEventBasics(t *testing.T) {
12+
content := map[string]any{"status": "draft"}
13+
14+
event := NewActivitySnapshotEvent("activity-1", "PLAN", content)
15+
16+
assert.Equal(t, EventTypeActivitySnapshot, event.Type())
17+
assert.Equal(t, "activity-1", event.MessageID)
18+
assert.Equal(t, "PLAN", event.ActivityType)
19+
require.NotNil(t, event.Replace)
20+
assert.True(t, *event.Replace)
21+
assert.NoError(t, event.Validate())
22+
23+
event = event.WithReplace(false)
24+
require.NotNil(t, event.Replace)
25+
assert.False(t, *event.Replace)
26+
}
27+
28+
func TestActivitySnapshotEventValidationAndJSON(t *testing.T) {
29+
event := NewActivitySnapshotEvent("activity-1", "PLAN", map[string]any{"status": "draft"})
30+
31+
data, err := event.ToJSON()
32+
require.NoError(t, err)
33+
34+
var decoded map[string]any
35+
require.NoError(t, json.Unmarshal(data, &decoded))
36+
37+
assert.Equal(t, string(EventTypeActivitySnapshot), decoded["type"])
38+
assert.Equal(t, "activity-1", decoded["messageId"])
39+
assert.Equal(t, "PLAN", decoded["activityType"])
40+
content, ok := decoded["content"].(map[string]any)
41+
require.True(t, ok)
42+
assert.Equal(t, "draft", content["status"])
43+
44+
event.MessageID = ""
45+
assert.Error(t, event.Validate())
46+
47+
event.MessageID = "activity-1"
48+
event.ActivityType = ""
49+
assert.Error(t, event.Validate())
50+
51+
event.ActivityType = "PLAN"
52+
event.Content = nil
53+
assert.Error(t, event.Validate())
54+
55+
event.Content = map[string]any{"status": "draft"}
56+
event.BaseEvent.EventType = ""
57+
assert.Error(t, event.Validate())
58+
}
59+
60+
func TestActivitySnapshotEvent_MissingActivityType(t *testing.T) {
61+
event := NewActivitySnapshotEvent("activity-1", "", map[string]any{"status": "draft"})
62+
err := event.Validate()
63+
assert.Error(t, err)
64+
}
65+
66+
func TestActivityDeltaEventValidationAndJSON(t *testing.T) {
67+
patch := []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}}
68+
event := NewActivityDeltaEvent("activity-1", "PLAN", patch)
69+
70+
assert.Equal(t, EventTypeActivityDelta, event.Type())
71+
assert.NoError(t, event.Validate())
72+
73+
data, err := event.ToJSON()
74+
require.NoError(t, err)
75+
76+
var decoded map[string]any
77+
require.NoError(t, json.Unmarshal(data, &decoded))
78+
79+
assert.Equal(t, string(EventTypeActivityDelta), decoded["type"])
80+
assert.Equal(t, "activity-1", decoded["messageId"])
81+
assert.Equal(t, "PLAN", decoded["activityType"])
82+
items, ok := decoded["patch"].([]any)
83+
require.True(t, ok)
84+
assert.Len(t, items, 1)
85+
86+
event.MessageID = ""
87+
assert.Error(t, event.Validate())
88+
89+
event.MessageID = "activity-1"
90+
event.Patch = []JSONPatchOperation{}
91+
assert.Error(t, event.Validate())
92+
93+
event.Patch = []JSONPatchOperation{{Op: "invalid", Path: "/status"}}
94+
assert.Error(t, event.Validate())
95+
96+
event.Patch = []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "ok"}}
97+
event.ActivityType = ""
98+
assert.Error(t, event.Validate())
99+
100+
event.ActivityType = "PLAN"
101+
event.BaseEvent.EventType = ""
102+
assert.Error(t, event.Validate())
103+
}
104+
105+
func TestActivityDeltaEvent_MissingActivityType(t *testing.T) {
106+
event := NewActivityDeltaEvent("activity-1", "", []JSONPatchOperation{{Op: "replace", Path: "/status", Value: "done"}})
107+
err := event.Validate()
108+
assert.Error(t, err)
109+
}

sdks/community/go/pkg/core/events/decoder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,20 @@ func (ed *EventDecoder) DecodeEvent(eventName string, data []byte) (Event, error
130130
}
131131
return &evt, nil
132132

133+
case EventTypeActivitySnapshot:
134+
var evt ActivitySnapshotEvent
135+
if err := json.Unmarshal(data, &evt); err != nil {
136+
return nil, fmt.Errorf("failed to decode ACTIVITY_SNAPSHOT: %w", err)
137+
}
138+
return &evt, nil
139+
140+
case EventTypeActivityDelta:
141+
var evt ActivityDeltaEvent
142+
if err := json.Unmarshal(data, &evt); err != nil {
143+
return nil, fmt.Errorf("failed to decode ACTIVITY_DELTA: %w", err)
144+
}
145+
return &evt, nil
146+
133147
case EventTypeStepStarted:
134148
var evt StepStartedEvent
135149
if err := json.Unmarshal(data, &evt); err != nil {

sdks/community/go/pkg/core/events/decoder_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,41 @@ func TestEventDecoder(t *testing.T) {
204204
assert.Equal(t, "msg-1", msgEvent.Messages[0].ID)
205205
})
206206

207+
t.Run("DecodeEvent_ActivitySnapshot", func(t *testing.T) {
208+
decoder := NewEventDecoder(nil)
209+
data := []byte(`{"messageId": "activity-1", "activityType": "PLAN", "content": {"status": "draft"}, "replace": false}`)
210+
211+
event, err := decoder.DecodeEvent("ACTIVITY_SNAPSHOT", data)
212+
require.NoError(t, err)
213+
require.NotNil(t, event)
214+
215+
activityEvent, ok := event.(*ActivitySnapshotEvent)
216+
require.True(t, ok)
217+
assert.Equal(t, "activity-1", activityEvent.MessageID)
218+
assert.Equal(t, "PLAN", activityEvent.ActivityType)
219+
require.NotNil(t, activityEvent.Replace)
220+
assert.False(t, *activityEvent.Replace)
221+
content, ok := activityEvent.Content.(map[string]any)
222+
require.True(t, ok)
223+
assert.Equal(t, "draft", content["status"])
224+
})
225+
226+
t.Run("DecodeEvent_ActivityDelta", func(t *testing.T) {
227+
decoder := NewEventDecoder(nil)
228+
data := []byte(`{"messageId": "activity-1", "activityType": "PLAN", "patch": [{"op": "replace", "path": "/status", "value": "streaming"}]}`)
229+
230+
event, err := decoder.DecodeEvent("ACTIVITY_DELTA", data)
231+
require.NoError(t, err)
232+
require.NotNil(t, event)
233+
234+
activityEvent, ok := event.(*ActivityDeltaEvent)
235+
require.True(t, ok)
236+
assert.Equal(t, "activity-1", activityEvent.MessageID)
237+
assert.Equal(t, "PLAN", activityEvent.ActivityType)
238+
assert.Len(t, activityEvent.Patch, 1)
239+
assert.Equal(t, "replace", activityEvent.Patch[0].Op)
240+
})
241+
207242
t.Run("DecodeEvent_StepStarted", func(t *testing.T) {
208243
decoder := NewEventDecoder(nil)
209244
data := []byte(`{"stepName": "step-1"}`)

sdks/community/go/pkg/core/events/events.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
EventTypeStateSnapshot EventType = "STATE_SNAPSHOT"
2525
EventTypeStateDelta EventType = "STATE_DELTA"
2626
EventTypeMessagesSnapshot EventType = "MESSAGES_SNAPSHOT"
27+
EventTypeActivitySnapshot EventType = "ACTIVITY_SNAPSHOT"
28+
EventTypeActivityDelta EventType = "ACTIVITY_DELTA"
2729
EventTypeRaw EventType = "RAW"
2830
EventTypeCustom EventType = "CUSTOM"
2931
EventTypeRunStarted EventType = "RUN_STARTED"
@@ -57,6 +59,8 @@ var validEventTypes = map[EventType]bool{
5759
EventTypeStateSnapshot: true,
5860
EventTypeStateDelta: true,
5961
EventTypeMessagesSnapshot: true,
62+
EventTypeActivitySnapshot: true,
63+
EventTypeActivityDelta: true,
6064
EventTypeRaw: true,
6165
EventTypeCustom: true,
6266
EventTypeRunStarted: true,
@@ -318,6 +322,14 @@ func ValidateSequence(events []Event) error {
318322
// They represent complete message state at any point in time
319323
// Additional validation could be added if needed (e.g., consistency checks)
320324

325+
case EventTypeActivitySnapshot:
326+
// Activity snapshot events are always valid in sequence context
327+
// They represent complete activity state at any point in time
328+
329+
case EventTypeActivityDelta:
330+
// Activity delta events are always valid in sequence context
331+
// They represent incremental activity changes at any point in time
332+
321333
case EventTypeRaw:
322334
// Raw events are always valid in sequence context
323335
// They contain external data that should be passed through
@@ -381,6 +393,10 @@ func EventFromJSON(data []byte) (Event, error) {
381393
event = &StateDeltaEvent{}
382394
case EventTypeMessagesSnapshot:
383395
event = &MessagesSnapshotEvent{}
396+
case EventTypeActivitySnapshot:
397+
event = &ActivitySnapshotEvent{}
398+
case EventTypeActivityDelta:
399+
event = &ActivityDeltaEvent{}
384400
case EventTypeRaw:
385401
event = &RawEvent{}
386402
case EventTypeCustom:

0 commit comments

Comments
 (0)