Skip to content

Commit 28b36e5

Browse files
committed
feat: enhance useStreamingChat with background job handling and response event processing
1 parent 3893537 commit 28b36e5

File tree

6 files changed

+278
-23
lines changed

6 files changed

+278
-23
lines changed

src/hooks/useStreamingChat.test.tsx

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,123 @@ describe('useStreamingChat', () => {
11401140
})
11411141
})
11421142

1143+
describe('response.created events', () => {
1144+
it('should handle response.created with background: true option', async () => {
1145+
const { result } = renderHook(() => useStreamingChat())
1146+
1147+
const responseCreatedEvent = JSON.stringify({
1148+
type: 'response.created',
1149+
response: {
1150+
id: 'bg-job-123',
1151+
},
1152+
})
1153+
1154+
const mockReader = {
1155+
read: vi
1156+
.fn()
1157+
.mockResolvedValueOnce({
1158+
done: false,
1159+
value: new TextEncoder().encode(`t:${responseCreatedEvent}\n`),
1160+
})
1161+
.mockResolvedValueOnce({ done: true, value: undefined }),
1162+
}
1163+
1164+
const mockResponse = createMockResponse({
1165+
clone: vi.fn().mockReturnValue({
1166+
body: { getReader: vi.fn().mockReturnValue(mockReader) },
1167+
}),
1168+
})
1169+
1170+
act(() => {
1171+
result.current.handleResponse(mockResponse, {
1172+
background: true,
1173+
title: 'Test Background Job',
1174+
})
1175+
})
1176+
1177+
await waitFor(() => {
1178+
expect(result.current.streamBuffer).toHaveLength(1)
1179+
})
1180+
1181+
expect(result.current.streamBuffer[0]).toEqual({
1182+
type: 'assistant',
1183+
id: 'bg-job-123',
1184+
content:
1185+
"Background job started. Streaming the response in, but you can view it in the 'Background Jobs' if you leave.",
1186+
})
1187+
})
1188+
1189+
it('should ignore response.created with background: false option', async () => {
1190+
const { result } = renderHook(() => useStreamingChat())
1191+
1192+
const responseCreatedEvent = JSON.stringify({
1193+
type: 'response.created',
1194+
response: {
1195+
id: 'regular-job-123',
1196+
},
1197+
})
1198+
1199+
const mockReader = {
1200+
read: vi
1201+
.fn()
1202+
.mockResolvedValueOnce({
1203+
done: false,
1204+
value: new TextEncoder().encode(`t:${responseCreatedEvent}\n`),
1205+
})
1206+
.mockResolvedValueOnce({ done: true, value: undefined }),
1207+
}
1208+
1209+
const mockResponse = createMockResponse({
1210+
clone: vi.fn().mockReturnValue({
1211+
body: { getReader: vi.fn().mockReturnValue(mockReader) },
1212+
}),
1213+
})
1214+
1215+
act(() => {
1216+
result.current.handleResponse(mockResponse, { background: false })
1217+
})
1218+
1219+
await waitFor(() => {
1220+
expect(result.current.streamBuffer).toHaveLength(0)
1221+
})
1222+
})
1223+
1224+
it('should ignore response.created with no options', async () => {
1225+
const { result } = renderHook(() => useStreamingChat())
1226+
1227+
const responseCreatedEvent = JSON.stringify({
1228+
type: 'response.created',
1229+
response: {
1230+
id: 'no-options-job-123',
1231+
},
1232+
})
1233+
1234+
const mockReader = {
1235+
read: vi
1236+
.fn()
1237+
.mockResolvedValueOnce({
1238+
done: false,
1239+
value: new TextEncoder().encode(`t:${responseCreatedEvent}\n`),
1240+
})
1241+
.mockResolvedValueOnce({ done: true, value: undefined }),
1242+
}
1243+
1244+
const mockResponse = createMockResponse({
1245+
clone: vi.fn().mockReturnValue({
1246+
body: { getReader: vi.fn().mockReturnValue(mockReader) },
1247+
}),
1248+
})
1249+
1250+
act(() => {
1251+
result.current.handleResponse(mockResponse)
1252+
})
1253+
1254+
await waitFor(() => {
1255+
expect(result.current.streamBuffer).toHaveLength(0)
1256+
})
1257+
})
1258+
})
1259+
11431260
describe('cleanup', () => {
11441261
it('should cleanup timeouts on unmount', async () => {
11451262
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout')

src/hooks/useStreamingChat.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { generateMessageId } from '../mcp/client'
33
import type { AnnotatedFile } from '@/lib/utils/code-interpreter'
44
import { stopStreamProcessing } from '@/lib/utils/streaming'
55
import { getTimestamp } from '@/lib/utils/date'
6+
import { useBackgroundJobs } from './useBackgroundJobs'
67

78
export type AssistantStreamEvent = {
89
type: 'assistant'
@@ -139,7 +140,10 @@ interface UseStreamingChatReturn {
139140
streaming: boolean
140141
timedOut: boolean
141142
requestId: string | null
142-
handleResponse: (response: Response) => void
143+
handleResponse: (
144+
response: Response,
145+
options?: { background: true; title: string } | { background: false },
146+
) => void
143147
handleError: (error: Error) => void
144148
addUserMessage: (content: string) => void
145149
cancelStream: () => void
@@ -151,6 +155,7 @@ export function useStreamingChat(): UseStreamingChatReturn {
151155
const [streaming, setStreaming] = useState(false)
152156
const [timedOut, setTimedOut] = useState(false)
153157
const [requestId, setRequestId] = useState<string | null>(null)
158+
const { addJob: addBackgroundJob } = useBackgroundJobs()
154159

155160
const streamUpdateTimeoutRef = useRef<NodeJS.Timeout | null>(null)
156161
const textBufferRef = useRef<string>('')
@@ -263,7 +268,10 @@ export function useStreamingChat(): UseStreamingChatReturn {
263268
}, [])
264269

265270
const handleResponse = useCallback(
266-
(response: Response) => {
271+
(
272+
response: Response,
273+
options?: { background: true; title: string } | { background: false },
274+
) => {
267275
const xRequestId = response.headers.get('x-request-id')
268276
setRequestId(xRequestId)
269277

@@ -366,6 +374,33 @@ export function useStreamingChat(): UseStreamingChatReturn {
366374
return
367375
}
368376

377+
// Currently only used for background jobs
378+
if (toolState.type === 'response.created') {
379+
if (options?.background) {
380+
const requestId = toolState.response.id
381+
382+
const backgroundJob = {
383+
id: requestId,
384+
status: 'running' as const,
385+
createdAt: getTimestamp(),
386+
title: options.title,
387+
}
388+
389+
addBackgroundJob(backgroundJob)
390+
391+
setStreamBuffer((prev: Array<StreamEvent>) => [
392+
...prev,
393+
{
394+
type: 'assistant',
395+
id: backgroundJob.id,
396+
content:
397+
"Background job started. Streaming the response in, but you can view it in the 'Background Jobs' if you leave.",
398+
},
399+
])
400+
}
401+
return
402+
}
403+
369404
if (toolState.type === 'reasoning_summary_delta') {
370405
setStreamBuffer((prev: Array<StreamEvent>) => {
371406
const last = prev[prev.length - 1]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
2+
3+
exports[`streamText > emits stream_done when an error occurs 1`] = `
4+
"f:{"messageId":"msg-2"}
5+
e:{"type":"error","message":"An unexpected error occurred."}
6+
t:{"type":"stream_done"}
7+
"
8+
`;
9+
10+
exports[`streamText > emits stream_done when the stream ends 1`] = `
11+
"f:{"messageId":"msg-1"}
12+
0:"I'm glad you asked!"
13+
0:" Here are a few universally nice things that"
14+
0:" could have happened today:\\n\\n"
15+
0:"- Someone smiled at a stranger, brightening"
16+
0:" their day\\n"
17+
0:"- A teacher helped a student understand a"
18+
0:" difficult concept\\n"
19+
0:"- A kind person paid for someone’s coffee"
20+
0:" in line\\n"
21+
0:"- A pet reunited with its owner at a local"
22+
0:" animal shelter\\n"
23+
0:"- A friend reached out just to say hello\\n\\n"
24+
0:"Would you like to hear some real, uplifting"
25+
0:" news from today?"
26+
0:" Just let me know!"
27+
t:{"type":"tool_call_completed","response":{}}
28+
t:{"type":"stream_done"}
29+
"
30+
`;
31+
32+
exports[`streamText > passes through response.created events 1`] = `
33+
"f:{"messageId":"msg-3"}
34+
t:{"type":"response.created","response":{"id":"resp_123","model":"gpt-4","created":1234567890}}
35+
t:{"type":"stream_done"}
36+
"
37+
`;

src/lib/streaming.test.ts

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1-
import { describe, expect, it } from 'vitest'
2-
import { stopStreamProcessing } from './utils/streaming'
1+
import { describe, expect, it, beforeEach, afterEach } from 'vitest'
2+
import { stopStreamProcessing, getMessageId } from './utils/streaming'
33
import { streamText } from './streaming'
44

5+
vi.mock('./utils/streaming', async () => {
6+
const actual = await vi.importActual('./utils/streaming')
7+
return {
8+
...actual,
9+
getMessageId: vi.fn(),
10+
}
11+
})
12+
513
function iterableFromArray<T>(arr: Array<T>): AsyncIterable<T> {
614
return {
715
[Symbol.asyncIterator]() {
@@ -17,6 +25,19 @@ function iterableFromArray<T>(arr: Array<T>): AsyncIterable<T> {
1725
}
1826
}
1927

28+
async function readStreamToString(
29+
reader: ReadableStreamDefaultReader<Uint8Array>,
30+
): Promise<string> {
31+
let result = ''
32+
let done = false
33+
while (!done) {
34+
const { value, done: d } = await reader.read()
35+
if (value) result += new TextDecoder().decode(value)
36+
done = d
37+
}
38+
return result
39+
}
40+
2041
describe('stopStreamProcessing', () => {
2142
it('replaces response body with an empty closed stream', async () => {
2243
const originalStream = new ReadableStream({
@@ -70,7 +91,31 @@ describe('stopStreamProcessing', () => {
7091
})
7192
})
7293

94+
describe('getMessageId', () => {
95+
it('returns a unique message ID with msg prefix', async () => {
96+
const actual = await vi.importActual('./utils/streaming') as { getMessageId: () => string }
97+
const id1 = actual.getMessageId()
98+
const id2 = actual.getMessageId()
99+
100+
expect(id1).toMatch(/^msg-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/)
101+
expect(id2).toMatch(/^msg-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/)
102+
expect(id1).not.toBe(id2)
103+
})
104+
})
105+
73106
describe('streamText', () => {
107+
let messageCounter = 0
108+
109+
beforeEach(() => {
110+
vi.mocked(getMessageId).mockImplementation(() => {
111+
messageCounter++
112+
return `msg-${messageCounter}`
113+
})
114+
})
115+
116+
afterEach(() => {
117+
vi.restoreAllMocks()
118+
})
74119
it('emits stream_done when the stream ends', async () => {
75120
const chunks = [
76121
{ type: 'response.output_text.delta', delta: "I'm glad you asked!" },
@@ -127,14 +172,8 @@ describe('streamText', () => {
127172
]
128173
const response = streamText(iterableFromArray(chunks))
129174
const reader = response.body!.getReader()
130-
let result = ''
131-
let done = false
132-
while (!done) {
133-
const { value, done: d } = await reader.read()
134-
if (value) result += new TextDecoder().decode(value)
135-
done = d
136-
}
137-
expect(result).toMatch(/t:{"type":"stream_done"}/)
175+
const result = await readStreamToString(reader)
176+
expect(result).toMatchSnapshot()
138177
})
139178

140179
it('emits stream_done when an error occurs', async () => {
@@ -153,16 +192,9 @@ describe('streamText', () => {
153192
}
154193
const response = streamText(errorIterable)
155194
const reader = response.body!.getReader()
156-
let result = ''
157-
let done = false
158-
while (!done) {
159-
const { value, done: d } = await reader.read()
160-
if (value) result += new TextDecoder().decode(value)
161-
done = d
162-
}
195+
const result = await readStreamToString(reader)
163196

164-
expect(result).toMatch(/t:{"type":"stream_done"}/)
165-
expect(result).toMatch(/e:{"type":"error".*}/)
197+
expect(result).toMatchSnapshot()
166198
expect(consoleErrorSpy).toHaveBeenCalledWith(
167199
'Error during streamed response:',
168200
expect.any(Error),
@@ -173,4 +205,22 @@ describe('streamText', () => {
173205

174206
consoleErrorSpy.mockRestore()
175207
})
208+
209+
it('passes through response.created events', async () => {
210+
const chunks = [
211+
{
212+
type: 'response.created',
213+
response: {
214+
id: 'resp_123',
215+
model: 'gpt-4',
216+
created: 1234567890,
217+
},
218+
},
219+
]
220+
const response = streamText(iterableFromArray(chunks))
221+
const reader = response.body!.getReader()
222+
const result = await readStreamToString(reader)
223+
224+
expect(result).toMatchSnapshot()
225+
})
176226
})

src/lib/streaming.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { APIError } from 'openai'
2+
import { getMessageId } from './utils/streaming'
23

34
// Event chunk for stream completion
45
const STREAM_DONE_CHUNK = new TextEncoder().encode(
@@ -10,7 +11,7 @@ export function streamText(
1011
onMessageId?: (messageId: string) => void,
1112
): Response {
1213
const encoder = new TextEncoder()
13-
const messageId = `msg-${Math.random().toString(36).slice(2)}`
14+
const messageId = getMessageId()
1415

1516
const stream = new ReadableStream({
1617
async start(controller) {
@@ -336,8 +337,14 @@ export function streamText(
336337
),
337338
)
338339
break
339-
// Web search tool events
340+
341+
case 'response.created':
342+
// Pass through response.created events so background jobs can extract response ID
343+
controller.enqueue(encoder.encode(`t:${JSON.stringify(chunk)}\n`))
344+
break
345+
340346
default:
347+
// Web search tool events
341348
if (
342349
chunk.type &&
343350
chunk.type.startsWith('response.web_search_call.')

0 commit comments

Comments
 (0)