Skip to content

Commit a938851

Browse files
author
Eric Oliver
committed
fix duplicate SSE content to the client
1 parent 74a574d commit a938851

File tree

5 files changed

+469
-13
lines changed

5 files changed

+469
-13
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# SSE Streaming Duplication Fix
2+
3+
## Problem Statement
4+
5+
The API streaming endpoint (`/execute/stream`) is experiencing content duplication where each piece of LLM-generated content appears twice in the SSE stream. This creates poor user experience with garbled, repeated text output.
6+
7+
## Root Cause Analysis
8+
9+
The issue is caused by **double event emission** happening in two places simultaneously:
10+
11+
1. **Direct SSE Integration**: The Task uses `SSEOutputAdapter` as its `userInterface`, so LLM streaming content flows directly through `showInformation()`, `showProgress()`, etc.
12+
13+
2. **Message Forwarding**: The `ApiTaskExecutionHandler.onTaskMessage()` method is **ALSO** forwarding the same messages to the SSE adapter again.
14+
15+
This creates a duplicate emission pipeline:
16+
17+
-**First time**: Task → SSEOutputAdapter.showInformation() → SSE stream
18+
-**Second time**: Task → ApiTaskExecutionHandler.onTaskMessage() → SSEOutputAdapter.showInformation() → SSE stream
19+
20+
## Architecture Diagram
21+
22+
```mermaid
23+
graph TD
24+
A[LLM Response] --> B[Task Engine]
25+
B --> C[SSEOutputAdapter as userInterface]
26+
C --> D[SSE Stream]
27+
28+
B -.-> E[ApiTaskExecutionHandler.onTaskMessage]
29+
E -.-> F[SSEOutputAdapter.showInformation]
30+
F -.-> D
31+
32+
style E fill:#ffcccc
33+
style F fill:#ffcccc
34+
35+
classDef duplicate stroke:#ff0000,stroke-width:3px,stroke-dasharray: 5 5
36+
class E,F duplicate
37+
```
38+
39+
## Stories
40+
41+
### Story 1: Remove Duplicate Message Forwarding (HIGH PRIORITY)
42+
43+
**Goal**: Eliminate the duplicate event emission by removing redundant message forwarding
44+
45+
**Acceptance Criteria**:
46+
47+
- [ ] LLM streaming content appears only once in SSE output
48+
- [ ] All essential task messages are still forwarded appropriately
49+
- [ ] No functionality regression in task execution
50+
- [ ] Logging shows single emission per content chunk
51+
52+
**Tasks**:
53+
54+
1. **Analyze current event flow** to confirm which path is the primary one
55+
2. **Modify ApiTaskExecutionHandler.onTaskMessage()** to remove duplicate forwarding:
56+
- Remove forwarding for `say` actions that are already handled by direct SSE integration
57+
- Keep forwarding only for events that aren't automatically handled by userInterface
58+
3. **Add source identification** in logs to track event origins
59+
4. **Test with various query types** to ensure no regression
60+
61+
**Files to modify**:
62+
63+
- `src/core/task/execution/ApiTaskExecutionHandler.ts` (lines 45-61)
64+
65+
**Implementation Details**:
66+
67+
```typescript
68+
async onTaskMessage(taskId: string, event: any): Promise<void> {
69+
if (this.verbose) {
70+
console.log(`[ApiTaskExecutionHandler] Task ${taskId} message:`, event.action)
71+
}
72+
73+
// Remove duplicate forwarding - the Task already uses SSEOutputAdapter as userInterface
74+
// Only forward events that require special handling beyond the standard userInterface methods
75+
76+
// Keep specialized forwarding for events that need custom handling
77+
if (event.action === "ask" && event.message?.text) {
78+
// Questions might need special SSE handling beyond standard askQuestion
79+
await this.sseAdapter.showInformation(`Question: ${event.message.text}`)
80+
}
81+
82+
// Remove the duplicate "say" forwarding - this is handled by userInterface directly
83+
// if (event.action === "say" && event.message?.text) {
84+
// await this.sseAdapter.showInformation(event.message.text) // ← REMOVED
85+
// }
86+
}
87+
```
88+
89+
### Story 2: Add SSE Event Deduplication Safety Net (MEDIUM PRIORITY)
90+
91+
**Goal**: Add deduplication logic as a safety measure to prevent future duplicate issues
92+
93+
**Acceptance Criteria**:
94+
95+
- [ ] SSEOutputAdapter can detect and prevent duplicate events
96+
- [ ] Deduplication window is configurable (default: 100ms)
97+
- [ ] Metrics track deduplication hits for monitoring
98+
- [ ] Performance impact is minimal (< 1ms overhead per event)
99+
100+
**Tasks**:
101+
102+
1. **Add event deduplication** in SSEOutputAdapter
103+
2. **Implement content hashing** for text-based events using fast hash
104+
3. **Add configurable deduplication window** with environment variable override
105+
4. **Add metrics collection** for duplicate detection rates
106+
5. **Add logging** for debugging duplicate detection
107+
108+
**Files to modify**:
109+
110+
- `src/api/streaming/SSEOutputAdapter.ts`
111+
112+
**Implementation Details**:
113+
114+
```typescript
115+
export class SSEOutputAdapter implements IUserInterface {
116+
private recentEvents = new Map<string, number>() // hash -> timestamp
117+
private deduplicationWindowMs = 100
118+
119+
private isDuplicateEvent(content: string): boolean {
120+
const hash = this.simpleHash(content)
121+
const now = Date.now()
122+
const lastEmitted = this.recentEvents.get(hash)
123+
124+
if (lastEmitted && now - lastEmitted < this.deduplicationWindowMs) {
125+
return true
126+
}
127+
128+
this.recentEvents.set(hash, now)
129+
// Cleanup old entries periodically
130+
if (this.recentEvents.size > 100) {
131+
this.cleanupOldEvents(now)
132+
}
133+
134+
return false
135+
}
136+
}
137+
```
138+
139+
### Story 3: Improve Logging and Debugging (LOW PRIORITY)
140+
141+
**Goal**: Add better observability to prevent and debug similar issues
142+
143+
**Acceptance Criteria**:
144+
145+
- [ ] Detailed SSE event logging with source identification
146+
- [ ] Flow tracing capability for debugging event paths
147+
- [ ] API debugging endpoints for SSE stream inspection
148+
- [ ] Integration tests verify single emission behavior
149+
150+
**Tasks**:
151+
152+
1. **Add detailed SSE event logging** with source metadata
153+
2. **Add flow tracing** to track event paths through the system
154+
3. **Create debugging endpoints** for SSE stream inspection (`/debug/streams`)
155+
4. **Add integration tests** to verify single emission per content chunk
156+
5. **Add performance monitoring** for SSE throughput
157+
158+
**Files to modify**:
159+
160+
- `src/api/streaming/SSEOutputAdapter.ts`
161+
- `src/api/streaming/__tests__/SSEOutputAdapter.test.ts`
162+
- `src/api/server/FastifyServer.ts` (for debug endpoints)
163+
164+
## Testing Strategy
165+
166+
### Unit Tests
167+
168+
- [ ] Test ApiTaskExecutionHandler without duplicate forwarding
169+
- [ ] Test SSEOutputAdapter deduplication logic
170+
- [ ] Test event source identification
171+
172+
### Integration Tests
173+
174+
- [ ] Test `/execute/stream` endpoint with various query types
175+
- [ ] Verify single emission per LLM content chunk
176+
- [ ] Test error handling doesn't cause duplication
177+
178+
### Manual Testing
179+
180+
- [ ] Test with short queries (like "list MCP servers")
181+
- [ ] Test with long reasoning queries
182+
- [ ] Test with coding tasks that involve multiple tool uses
183+
184+
## Implementation Priority
185+
186+
**Phase 1 (Immediate Fix - 15 minutes)**:
187+
188+
- Story 1: Remove duplicate message forwarding
189+
190+
**Phase 2 (Safety & Monitoring - 75 minutes)**:
191+
192+
- Story 2: Add deduplication safety net
193+
- Story 3: Improve logging and add tests
194+
195+
## Success Metrics
196+
197+
- [ ] Zero duplicate content in SSE streams
198+
- [ ] No regression in task execution functionality
199+
- [ ] Improved user experience with clean, single-emission content
200+
- [ ] Debugging capabilities for future SSE issues

src/api/streaming/SSEOutputAdapter.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,16 +294,32 @@ export class SSEOutputAdapter implements IUserInterface {
294294
* Core event emission method
295295
*/
296296
private emitEvent(event: SSEEvent): void {
297-
console.log(`[SSE] Emitting event for job ${this.jobId}:`, JSON.stringify(event, null, 2))
298-
console.log(`[SSE] StreamManager active streams:`, this.streamManager.hasActiveStream(this.jobId))
297+
// Enhanced logging with source tracking and content preview
298+
const contentPreview = event.message
299+
? event.message.length > 100
300+
? event.message.substring(0, 100) + "..."
301+
: event.message
302+
: "no message"
303+
304+
console.log(`[SSE] Emitting ${event.type} for job ${this.jobId}:`, {
305+
eventType: event.type,
306+
contentPreview,
307+
contentLength: event.message?.length || 0,
308+
hasProgress: event.progress !== undefined,
309+
timestamp: event.timestamp,
310+
source: "userInterface",
311+
})
299312

300313
const success = this.streamManager.sendEvent(this.jobId, event)
301314
if (!success) {
302315
this.logger.warn(`Failed to emit event ${event.type} for job ${this.jobId}`)
303-
console.log(`[SSE] Failed to send event ${event.type} for job ${this.jobId}`)
316+
console.log(`[SSE] Failed to send ${event.type} for job ${this.jobId}`)
304317
console.log(`[SSE] Available streams:`, this.streamManager.getActiveStreamIds())
318+
console.log(`[SSE] Stream manager active status:`, this.streamManager.hasActiveStream(this.jobId))
305319
} else {
306-
console.log(`[SSE] Successfully sent event ${event.type} for job ${this.jobId}`)
320+
console.log(
321+
`[SSE] ✅ Successfully sent ${event.type} for job ${this.jobId} (${event.message?.length || 0} chars)`,
322+
)
307323
}
308324
}
309325

src/api/streaming/__tests__/SSEOutputAdapter.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,57 @@ describe("SSEOutputAdapter", () => {
323323
consoleSpy.mockRestore()
324324
})
325325
})
326+
327+
describe("enhanced logging", () => {
328+
it("should log detailed event information", async () => {
329+
const consoleSpy = jest.spyOn(console, "log").mockImplementation(() => {})
330+
const message = "Test message with detailed logging"
331+
332+
await adapter.showInformation(message)
333+
334+
// Check that enhanced logging includes source and content preview
335+
expect(consoleSpy).toHaveBeenCalledWith(
336+
expect.stringContaining("[SSE] Emitting information for job test-job-123:"),
337+
expect.objectContaining({
338+
eventType: "information",
339+
contentPreview: message,
340+
contentLength: message.length,
341+
source: "userInterface",
342+
}),
343+
)
344+
345+
consoleSpy.mockRestore()
346+
})
347+
348+
it("should truncate long content in preview", async () => {
349+
const consoleSpy = jest.spyOn(console, "log").mockImplementation(() => {})
350+
const longMessage = "a".repeat(150) // 150 characters
351+
352+
await adapter.showInformation(longMessage)
353+
354+
// Check that content is truncated to 100 chars + "..."
355+
expect(consoleSpy).toHaveBeenCalledWith(
356+
expect.any(String),
357+
expect.objectContaining({
358+
contentPreview: "a".repeat(100) + "...",
359+
contentLength: 150,
360+
}),
361+
)
362+
363+
consoleSpy.mockRestore()
364+
})
365+
366+
it("should show success indicator in logs", async () => {
367+
const consoleSpy = jest.spyOn(console, "log").mockImplementation(() => {})
368+
369+
await adapter.showInformation("Test")
370+
371+
// Check for success indicator
372+
expect(consoleSpy).toHaveBeenCalledWith(
373+
expect.stringContaining("[SSE] ✅ Successfully sent information for job test-job-123 (4 chars)"),
374+
)
375+
376+
consoleSpy.mockRestore()
377+
})
378+
})
326379
})

src/core/task/execution/ApiTaskExecutionHandler.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,43 @@ export class ApiTaskExecutionHandler implements ITaskExecutionHandler {
4444

4545
async onTaskMessage(taskId: string, event: any): Promise<void> {
4646
if (this.verbose) {
47-
console.log(`[ApiTaskExecutionHandler] Task ${taskId} message:`, event.action)
47+
console.log(`[ApiTaskExecutionHandler] Task ${taskId} message:`, event.action, {
48+
hasText: !!event.message?.text,
49+
textLength: event.message?.text?.length || 0,
50+
source: "message_handler",
51+
})
4852
}
4953

50-
// Forward task messages to the SSE stream
51-
if (event.action === "say" && event.message?.text) {
52-
// Stream the actual task response content to the client
53-
await this.sseAdapter.showInformation(event.message.text)
54-
} else if (event.action === "ask" && event.message?.text) {
55-
// Handle questions from the task
54+
// NOTE: Do NOT forward "say" actions - they are already handled by the Task using SSEOutputAdapter as userInterface
55+
// This was causing duplicate content emission in the SSE stream
56+
57+
// Only forward specialized events that need custom handling beyond standard userInterface methods
58+
if (event.action === "ask" && event.message?.text) {
59+
// Questions might need special SSE handling beyond standard askQuestion
60+
if (this.verbose) {
61+
console.log(
62+
`[ApiTaskExecutionHandler] Forwarding question to SSE:`,
63+
event.message.text.substring(0, 100),
64+
)
65+
}
5666
await this.sseAdapter.showInformation(`Question: ${event.message.text}`)
57-
} else if (event.message?.text) {
58-
// Handle other message types
67+
} else if (event.action !== "say" && event.message?.text) {
68+
// Handle other message types (but NOT "say" to avoid duplication)
69+
if (this.verbose) {
70+
console.log(
71+
`[ApiTaskExecutionHandler] Forwarding ${event.action} to SSE log:`,
72+
event.message.text.substring(0, 100),
73+
)
74+
}
5975
await this.sseAdapter.log(event.message.text)
6076
}
77+
78+
// Log skipped "say" actions for debugging
79+
if (event.action === "say" && this.verbose) {
80+
console.log(
81+
`[ApiTaskExecutionHandler] SKIPPED duplicate "say" forwarding - handled by userInterface directly`,
82+
)
83+
}
6184
}
6285

6386
async onTaskActivity(taskId: string, eventType: string, data?: any): Promise<void> {

0 commit comments

Comments
 (0)