Skip to content

Commit 9506ade

Browse files
kevinelliottclaude
andcommitted
Add bridge connection event and improve event reliability (v0.4.0)
Bridge Connection Event: - Automatically emit bridge.connected event when emitter is initialized - Include system info (OS, OS version, Go version, architecture, AgentPipe version) - Use synchronous sending to ensure connection is announced before proceeding - Add BridgeConnectedData struct with SystemInfo and ConnectedAt fields Cancellation Detection: - Detect context cancellation in orchestrator (Ctrl+C, timeout) - Emit conversation.completed with status="interrupted" when canceled - Track return errors to distinguish normal completion from interruption - Handle both context.Canceled and context.DeadlineExceeded Event Reliability Improvements: - Use synchronous SendEvent for completion and error events - Prevents truncated JSON payloads when program exits - Ensures critical end-of-lifecycle events are fully sent Test Updates: - Update all emitter tests to handle automatic bridge.connected event - Add TestBridgeConnectedEvent and TestBridgeConnectedEventJSON - Add TestBridgeEventOnCancellation for cancellation detection - All tests pass with race detector Code Quality: - Fix gofmt formatting issues - Fix misspelling: "cancelled" → "canceled" - All linting checks pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent dc5115d commit 9506ade

File tree

7 files changed

+409
-83
lines changed

7 files changed

+409
-83
lines changed

cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"github.com/spf13/cobra"
1515
"github.com/spf13/viper"
1616

17-
_ "github.com/kevinelliott/agentpipe/pkg/adapters"
1817
"github.com/kevinelliott/agentpipe/internal/bridge"
1918
"github.com/kevinelliott/agentpipe/internal/version"
19+
_ "github.com/kevinelliott/agentpipe/pkg/adapters"
2020
"github.com/kevinelliott/agentpipe/pkg/agent"
2121
"github.com/kevinelliott/agentpipe/pkg/config"
2222
"github.com/kevinelliott/agentpipe/pkg/conversation"

internal/bridge/emitter.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@ type Emitter struct {
1515
}
1616

1717
// NewEmitter creates a new event emitter for a conversation
18+
// Automatically sends a bridge.connected event to announce the connection
1819
func NewEmitter(config *Config, agentpipeVersion string) *Emitter {
19-
return &Emitter{
20+
emitter := &Emitter{
2021
client: NewClient(config),
2122
conversationID: uuid.New().String(),
2223
sequenceNumber: 0,
2324
systemInfo: CollectSystemInfo(agentpipeVersion),
2425
}
26+
27+
// Emit bridge.connected event to announce the connection
28+
emitter.emitBridgeConnected()
29+
30+
return emitter
2531
}
2632

2733
// GetConversationID returns the conversation ID for this emitter
@@ -88,6 +94,7 @@ func (e *Emitter) EmitMessageCreated(
8894
}
8995

9096
// EmitConversationCompleted emits a conversation.completed event
97+
// Uses synchronous send to ensure the event is fully sent before program exit
9198
func (e *Emitter) EmitConversationCompleted(
9299
status string,
93100
totalMessages int,
@@ -109,10 +116,12 @@ func (e *Emitter) EmitConversationCompleted(
109116
DurationSeconds: duration.Seconds(),
110117
},
111118
}
112-
e.client.SendEventAsync(event)
119+
// Use synchronous send for completion event to ensure it's sent before program exit
120+
_ = e.client.SendEvent(event)
113121
}
114122

115123
// EmitConversationError emits a conversation.error event
124+
// Uses synchronous send to ensure the event is fully sent before program exit
116125
func (e *Emitter) EmitConversationError(
117126
errorMessage string,
118127
errorType string,
@@ -128,5 +137,21 @@ func (e *Emitter) EmitConversationError(
128137
AgentType: agentType,
129138
},
130139
}
131-
e.client.SendEventAsync(event)
140+
// Use synchronous send for error event to ensure it's sent before program exit
141+
_ = e.client.SendEvent(event)
142+
}
143+
144+
// emitBridgeConnected emits a bridge.connected event to announce the connection
145+
// This is called automatically when the emitter is created
146+
func (e *Emitter) emitBridgeConnected() {
147+
event := &Event{
148+
Type: EventBridgeConnected,
149+
Timestamp: UTCTime{time.Now()},
150+
Data: BridgeConnectedData{
151+
SystemInfo: e.systemInfo,
152+
ConnectedAt: time.Now().UTC().Format(time.RFC3339),
153+
},
154+
}
155+
// Use synchronous send to ensure connection is announced before proceeding
156+
_ = e.client.SendEvent(event)
132157
}

internal/bridge/emitter_test.go

Lines changed: 157 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestGetConversationID(t *testing.T) {
7070
}
7171

7272
func TestEmitConversationStarted(t *testing.T) {
73-
receivedEvent := make(chan *Event, 1)
73+
receivedEvents := make(chan *Event, 10)
7474

7575
// Create mock server
7676
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -81,7 +81,7 @@ func TestEmitConversationStarted(t *testing.T) {
8181
return
8282
}
8383

84-
receivedEvent <- &event
84+
receivedEvents <- &event
8585
w.WriteHeader(http.StatusCreated)
8686
}))
8787
defer server.Close()
@@ -108,33 +108,36 @@ func TestEmitConversationStarted(t *testing.T) {
108108

109109
emitter.EmitConversationStarted("round-robin", "Hello agents", 10, agents)
110110

111-
// Wait for async event to be received
112-
select {
113-
case event := <-receivedEvent:
114-
if event.Type != EventConversationStarted {
115-
t.Errorf("Expected type=%s, got %s", EventConversationStarted, event.Type)
116-
}
111+
// Collect both events (bridge.connected and conversation.started)
112+
events := collectEvents(t, receivedEvents, 2)
117113

118-
data, ok := event.Data.(map[string]interface{})
119-
if !ok {
120-
t.Fatal("Expected data to be a map")
121-
}
114+
// First event should be bridge.connected
115+
if events[0].Type != EventBridgeConnected {
116+
t.Errorf("Expected first event type=%s, got %s", EventBridgeConnected, events[0].Type)
117+
}
122118

123-
if data["mode"] != "round-robin" {
124-
t.Errorf("Expected mode=round-robin, got %v", data["mode"])
125-
}
119+
// Second event should be conversation.started
120+
event := events[1]
121+
if event.Type != EventConversationStarted {
122+
t.Errorf("Expected second event type=%s, got %s", EventConversationStarted, event.Type)
123+
}
126124

127-
if data["initial_prompt"] != "Hello agents" {
128-
t.Errorf("Expected initial_prompt='Hello agents', got %v", data["initial_prompt"])
129-
}
125+
data, ok := event.Data.(map[string]interface{})
126+
if !ok {
127+
t.Fatal("Expected data to be a map")
128+
}
130129

131-
// Verify system_info is present
132-
if _, ok := data["system_info"]; !ok {
133-
t.Error("Expected system_info to be present in conversation.started event")
134-
}
130+
if data["mode"] != "round-robin" {
131+
t.Errorf("Expected mode=round-robin, got %v", data["mode"])
132+
}
135133

136-
case <-time.After(1 * time.Second):
137-
t.Fatal("Timeout waiting for event")
134+
if data["initial_prompt"] != "Hello agents" {
135+
t.Errorf("Expected initial_prompt='Hello agents', got %v", data["initial_prompt"])
136+
}
137+
138+
// Verify system_info is present
139+
if _, ok := data["system_info"]; !ok {
140+
t.Error("Expected system_info to be present in conversation.started event")
138141
}
139142
}
140143

@@ -170,11 +173,17 @@ func TestEmitMessageCreated(t *testing.T) {
170173
emitter.EmitMessageCreated("claude", "Claude", "Hello", "claude-sonnet-4", 1, 100, 50, 50, 0.001, 1234*time.Millisecond)
171174
emitter.EmitMessageCreated("gemini", "Gemini", "Hi", "gemini-pro", 1, 80, 40, 40, 0.0008, 987*time.Millisecond)
172175

173-
// Collect both events (they may arrive in any order due to async sending)
174-
events := collectEvents(t, receivedEvents, 2)
176+
// Collect all three events (bridge.connected + two messages)
177+
events := collectEvents(t, receivedEvents, 3)
175178

176-
// Verify both events by sequence number
177-
for _, event := range events {
179+
// First event should be bridge.connected
180+
if events[0].Type != EventBridgeConnected {
181+
t.Errorf("Expected first event type=%s, got %s", EventBridgeConnected, events[0].Type)
182+
}
183+
184+
// Verify the two message events by sequence number
185+
messageEvents := events[1:]
186+
for _, event := range messageEvents {
178187
verifyMessageEvent(t, event)
179188
}
180189
}
@@ -234,7 +243,7 @@ func verifyMessageEvent(t *testing.T, event *Event) {
234243
}
235244

236245
func TestEmitConversationCompleted(t *testing.T) {
237-
receivedEvent := make(chan *Event, 1)
246+
receivedEvents := make(chan *Event, 10)
238247

239248
// Create mock server
240249
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -245,7 +254,7 @@ func TestEmitConversationCompleted(t *testing.T) {
245254
return
246255
}
247256

248-
receivedEvent <- &event
257+
receivedEvents <- &event
249258
w.WriteHeader(http.StatusCreated)
250259
}))
251260
defer server.Close()
@@ -263,37 +272,40 @@ func TestEmitConversationCompleted(t *testing.T) {
263272

264273
emitter.EmitConversationCompleted("completed", 20, 10, 3000, 0.03, 300*time.Second)
265274

266-
// Wait for async event to be received
267-
select {
268-
case event := <-receivedEvent:
269-
if event.Type != EventConversationCompleted {
270-
t.Errorf("Expected type=%s, got %s", EventConversationCompleted, event.Type)
271-
}
275+
// Collect both events (bridge.connected and conversation.completed)
276+
events := collectEvents(t, receivedEvents, 2)
272277

273-
data, ok := event.Data.(map[string]interface{})
274-
if !ok {
275-
t.Fatal("Expected data to be a map")
276-
}
278+
// First event should be bridge.connected
279+
if events[0].Type != EventBridgeConnected {
280+
t.Errorf("Expected first event type=%s, got %s", EventBridgeConnected, events[0].Type)
281+
}
277282

278-
if data["status"] != "completed" {
279-
t.Errorf("Expected status=completed, got %v", data["status"])
280-
}
283+
// Second event should be conversation.completed
284+
event := events[1]
285+
if event.Type != EventConversationCompleted {
286+
t.Errorf("Expected second event type=%s, got %s", EventConversationCompleted, event.Type)
287+
}
281288

282-
if data["total_messages"].(float64) != 20 {
283-
t.Errorf("Expected total_messages=20, got %v", data["total_messages"])
284-
}
289+
data, ok := event.Data.(map[string]interface{})
290+
if !ok {
291+
t.Fatal("Expected data to be a map")
292+
}
285293

286-
if data["duration_seconds"].(float64) != 300.0 {
287-
t.Errorf("Expected duration_seconds=300.0, got %v", data["duration_seconds"])
288-
}
294+
if data["status"] != "completed" {
295+
t.Errorf("Expected status=completed, got %v", data["status"])
296+
}
297+
298+
if data["total_messages"].(float64) != 20 {
299+
t.Errorf("Expected total_messages=20, got %v", data["total_messages"])
300+
}
289301

290-
case <-time.After(1 * time.Second):
291-
t.Fatal("Timeout waiting for event")
302+
if data["duration_seconds"].(float64) != 300.0 {
303+
t.Errorf("Expected duration_seconds=300.0, got %v", data["duration_seconds"])
292304
}
293305
}
294306

295307
func TestEmitConversationError(t *testing.T) {
296-
receivedEvent := make(chan *Event, 1)
308+
receivedEvents := make(chan *Event, 10)
297309

298310
// Create mock server
299311
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -304,7 +316,7 @@ func TestEmitConversationError(t *testing.T) {
304316
return
305317
}
306318

307-
receivedEvent <- &event
319+
receivedEvents <- &event
308320
w.WriteHeader(http.StatusCreated)
309321
}))
310322
defer server.Close()
@@ -322,32 +334,35 @@ func TestEmitConversationError(t *testing.T) {
322334

323335
emitter.EmitConversationError("API rate limit exceeded", "rate_limit", "claude")
324336

325-
// Wait for async event to be received
326-
select {
327-
case event := <-receivedEvent:
328-
if event.Type != EventConversationError {
329-
t.Errorf("Expected type=%s, got %s", EventConversationError, event.Type)
330-
}
337+
// Collect both events (bridge.connected and conversation.error)
338+
events := collectEvents(t, receivedEvents, 2)
331339

332-
data, ok := event.Data.(map[string]interface{})
333-
if !ok {
334-
t.Fatal("Expected data to be a map")
335-
}
340+
// First event should be bridge.connected
341+
if events[0].Type != EventBridgeConnected {
342+
t.Errorf("Expected first event type=%s, got %s", EventBridgeConnected, events[0].Type)
343+
}
336344

337-
if data["error_message"] != "API rate limit exceeded" {
338-
t.Errorf("Expected error_message='API rate limit exceeded', got %v", data["error_message"])
339-
}
345+
// Second event should be conversation.error
346+
event := events[1]
347+
if event.Type != EventConversationError {
348+
t.Errorf("Expected second event type=%s, got %s", EventConversationError, event.Type)
349+
}
340350

341-
if data["error_type"] != "rate_limit" {
342-
t.Errorf("Expected error_type=rate_limit, got %v", data["error_type"])
343-
}
351+
data, ok := event.Data.(map[string]interface{})
352+
if !ok {
353+
t.Fatal("Expected data to be a map")
354+
}
344355

345-
if data["agent_type"] != "claude" {
346-
t.Errorf("Expected agent_type=claude, got %v", data["agent_type"])
347-
}
356+
if data["error_message"] != "API rate limit exceeded" {
357+
t.Errorf("Expected error_message='API rate limit exceeded', got %v", data["error_message"])
358+
}
359+
360+
if data["error_type"] != "rate_limit" {
361+
t.Errorf("Expected error_type=rate_limit, got %v", data["error_type"])
362+
}
348363

349-
case <-time.After(1 * time.Second):
350-
t.Fatal("Timeout waiting for event")
364+
if data["agent_type"] != "claude" {
365+
t.Errorf("Expected agent_type=claude, got %v", data["agent_type"])
351366
}
352367
}
353368

@@ -409,3 +424,71 @@ func TestUniqueConversationIDs(t *testing.T) {
409424
t.Error("Expected unique conversation IDs for different emitters")
410425
}
411426
}
427+
428+
func TestBridgeConnectedEvent(t *testing.T) {
429+
// Track received events
430+
receivedEvents := []Event{}
431+
432+
// Create mock server
433+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
434+
var event Event
435+
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
436+
t.Errorf("Failed to decode event: %v", err)
437+
w.WriteHeader(http.StatusBadRequest)
438+
return
439+
}
440+
441+
receivedEvents = append(receivedEvents, event)
442+
w.WriteHeader(http.StatusCreated)
443+
}))
444+
defer server.Close()
445+
446+
config := &Config{
447+
Enabled: true,
448+
URL: server.URL,
449+
APIKey: "sk_test",
450+
TimeoutMs: 5000,
451+
RetryAttempts: 0,
452+
LogLevel: "debug",
453+
}
454+
455+
// Creating the emitter should automatically send bridge.connected event
456+
version := "0.3.8-test"
457+
_ = NewEmitter(config, version)
458+
459+
// Verify we received the bridge.connected event
460+
if len(receivedEvents) != 1 {
461+
t.Fatalf("Expected 1 event, got %d", len(receivedEvents))
462+
}
463+
464+
event := receivedEvents[0]
465+
if event.Type != EventBridgeConnected {
466+
t.Errorf("Expected event type=%s, got %s", EventBridgeConnected, event.Type)
467+
}
468+
469+
// Verify data structure
470+
dataMap, ok := event.Data.(map[string]interface{})
471+
if !ok {
472+
t.Fatal("Expected data to be a map")
473+
}
474+
475+
// Verify system_info is present
476+
systemInfoMap, ok := dataMap["system_info"].(map[string]interface{})
477+
if !ok {
478+
t.Fatal("Expected system_info to be present")
479+
}
480+
481+
if systemInfoMap["agentpipe_version"] != version {
482+
t.Errorf("Expected agentpipe_version=%s, got %v", version, systemInfoMap["agentpipe_version"])
483+
}
484+
485+
// Verify connected_at is present
486+
connectedAt, ok := dataMap["connected_at"].(string)
487+
if !ok {
488+
t.Fatal("Expected connected_at to be a string")
489+
}
490+
491+
if connectedAt == "" {
492+
t.Error("Expected connected_at to be non-empty")
493+
}
494+
}

0 commit comments

Comments
 (0)