Skip to content

Commit 2c70eb4

Browse files
authored
fix partial JSON sources streaming by appending to the leftover stream chunk (#55717)
1 parent 372fbe9 commit 2c70eb4

File tree

1 file changed

+76
-44
lines changed

1 file changed

+76
-44
lines changed

src/search/components/input/AskAIResults.tsx

Lines changed: 76 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -232,18 +232,75 @@ export function AskAIResults({
232232
const decoder = new TextDecoder('utf-8')
233233
const reader = response.body.getReader()
234234
let done = false
235+
let leftover = '' // <= carry‑over buffer
235236
setInitialLoading(false)
237+
238+
const processLine = (parsedLine: any) => {
239+
switch (parsedLine.chunkType) {
240+
// A conversation ID will still be sent when a question cannot be answered
241+
case 'CONVERSATION_ID':
242+
conversationIdBuffer = parsedLine.conversation_id
243+
setConversationId(parsedLine.conversation_id)
244+
break
245+
246+
case 'NO_CONTENT_SIGNAL':
247+
// Serve canned response. A question that cannot be answered was asked
248+
handleAICannotAnswer(conversationIdBuffer, 200)
249+
break
250+
251+
case 'SOURCES':
252+
if (!isCancelled) {
253+
sourcesBuffer = uniqBy(
254+
sourcesBuffer.concat(parsedLine.sources as AIReference[]),
255+
'url',
256+
)
257+
setReferences(sourcesBuffer)
258+
}
259+
break
260+
261+
case 'MESSAGE_CHUNK':
262+
if (!isCancelled) {
263+
messageBuffer += parsedLine.text
264+
setMessage(messageBuffer)
265+
}
266+
break
267+
268+
case 'INPUT_CONTENT_FILTER':
269+
// Serve canned response. A spam question was asked
270+
handleAICannotAnswer(
271+
conversationIdBuffer,
272+
200,
273+
t('search.ai.responses.invalid_query'),
274+
)
275+
break
276+
}
277+
278+
if (!isCancelled) setAnnouncement('Copilot Response Loading...')
279+
}
280+
236281
while (!done && !isCancelled) {
237282
const { value, done: readerDone } = await reader.read()
238283
done = readerDone
284+
285+
// The sources JSON chunk may be sent in multiple parts, so we need to decode it with a leftover buffer so that it can be parsed all at once
286+
// So when we say "incomplete" or "leftover" we mean that the JSON is not complete yet, not that the message is incomplete
239287
if (value) {
240-
const chunkStr = decoder.decode(value, { stream: true })
241-
const chunkLines = chunkStr.split('\n').filter((line) => line.trim() !== '')
242-
for (const line of chunkLines) {
243-
let parsedLine
288+
// 1 append this chunk's text to whatever was left over
289+
leftover += decoder.decode(value, { stream: true })
290+
291+
// 2 split on newline
292+
const lines = leftover.split('\n')
293+
294+
// 3 keep the *last* item (maybe incomplete) for next round
295+
leftover = lines.pop() ?? ''
296+
297+
// 4 parse all complete lines
298+
for (const raw of lines) {
299+
if (!raw.trim()) continue
300+
301+
let parsedLine: any
244302
try {
245-
parsedLine = JSON.parse(line)
246-
// If midstream there is an error, like a connection reset / lost, our backend will send an error JSON
303+
parsedLine = JSON.parse(raw)
247304
if (parsedLine?.errors) {
248305
sendAISearchResultEvent({
249306
sources: [],
@@ -255,50 +312,25 @@ export function AskAIResults({
255312
setAISearchError()
256313
return
257314
}
258-
} catch (e) {
259-
console.warn(
260-
'Failed to parse JSON:',
261-
e,
262-
'Line:',
263-
line,
264-
'Typeof line: ',
265-
typeof line,
266-
)
315+
} catch (err) {
316+
console.warn('Failed to parse JSON line:', raw, err)
267317
continue
268318
}
269319

270-
// A conversation ID will still be sent when a question cannot be answered
271-
if (parsedLine.chunkType === 'CONVERSATION_ID') {
272-
conversationIdBuffer = parsedLine.conversation_id
273-
setConversationId(parsedLine.conversation_id)
274-
} else if (parsedLine.chunkType === 'NO_CONTENT_SIGNAL') {
275-
// Serve canned response. A question that cannot be answered was asked
276-
handleAICannotAnswer(conversationIdBuffer, 200)
277-
} else if (parsedLine.chunkType === 'SOURCES') {
278-
if (!isCancelled) {
279-
sourcesBuffer = sourcesBuffer.concat(parsedLine.sources)
280-
sourcesBuffer = uniqBy(sourcesBuffer, 'url')
281-
setReferences(sourcesBuffer)
282-
}
283-
} else if (parsedLine.chunkType === 'MESSAGE_CHUNK') {
284-
if (!isCancelled) {
285-
messageBuffer += parsedLine.text
286-
setMessage(messageBuffer)
287-
}
288-
} else if (parsedLine.chunkType === 'INPUT_CONTENT_FILTER') {
289-
// Serve canned response. A spam question was asked
290-
handleAICannotAnswer(
291-
conversationIdBuffer,
292-
200,
293-
t('search.ai.responses.invalid_query'),
294-
)
295-
}
296-
if (!isCancelled) {
297-
setAnnouncement('Copilot Response Loading...')
298-
}
320+
processLine(parsedLine)
299321
}
300322
}
301323
}
324+
325+
// 5 flush whatever remains after the stream ends
326+
if (!isCancelled && leftover.trim()) {
327+
try {
328+
const tail = JSON.parse(leftover)
329+
processLine(tail)
330+
} catch (err) {
331+
console.warn('Failed to parse tail JSON:', leftover, err)
332+
}
333+
}
302334
} catch (error: any) {
303335
if (!isCancelled) {
304336
console.error('Failed to fetch search results:', error)

0 commit comments

Comments
 (0)