Skip to content

Commit e3cc58c

Browse files
fix(client): Buffer SSE stream chunks before parsing (#3643)
1 parent 5194823 commit e3cc58c

File tree

3 files changed

+212
-6
lines changed

3 files changed

+212
-6
lines changed

packages/feathers/src/client/fetch.test.ts

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { beforeAll, describe, it, expect } from 'vitest'
1+
import { beforeAll, describe, it, expect, vi } from 'vitest'
22
import { feathers } from '../index.js'
33
import { clientTests } from '../../fixtures/client.js'
44
import { NotAcceptable, NotFound, MethodNotAllowed, BadRequest } from '../errors.js'
55

66
import { getApp, createTestServer, TestServiceTypes, verify } from '../../fixtures/index.js'
7-
import { fetchClient } from './index.js'
7+
import { fetchClient, FetchClient } from './index.js'
88

99
describe('fetch REST connector', function () {
1010
const port = 8888
@@ -211,3 +211,135 @@ describe('fetch REST connector', function () {
211211

212212
clientTests(app, 'todos')
213213
})
214+
215+
describe('FetchClient.handleEventStream', () => {
216+
/**
217+
* Creates a mock Response with a ReadableStream that emits chunks
218+
* simulating TCP fragmentation of SSE data
219+
*/
220+
function createChunkedSSEResponse(chunks: string[]): Response {
221+
const encoder = new TextEncoder()
222+
let chunkIndex = 0
223+
224+
const stream = new ReadableStream({
225+
pull(controller) {
226+
if (chunkIndex < chunks.length) {
227+
controller.enqueue(encoder.encode(chunks[chunkIndex]))
228+
chunkIndex++
229+
} else {
230+
controller.close()
231+
}
232+
}
233+
})
234+
235+
return new Response(stream, {
236+
headers: { 'content-type': 'text/event-stream' }
237+
})
238+
}
239+
240+
it('handles SSE events split across chunks', async () => {
241+
// Simulate TCP fragmentation where JSON is split mid-object
242+
const chunks = ['data: {"message":"Hel', 'lo"}\n\ndata: {"message":" wor', 'ld"}\n\n']
243+
244+
const client = new FetchClient({
245+
name: 'test',
246+
baseUrl: 'http://localhost',
247+
connection: fetch,
248+
stringify: (q) => ''
249+
})
250+
251+
const response = createChunkedSSEResponse(chunks)
252+
const messages: any[] = []
253+
254+
for await (const data of client.handleEventStream(response)) {
255+
messages.push(data)
256+
}
257+
258+
expect(messages).toHaveLength(2)
259+
expect(messages[0]).toEqual({ message: 'Hello' })
260+
expect(messages[1]).toEqual({ message: ' world' })
261+
})
262+
263+
it('handles multiple events in a single chunk', async () => {
264+
const chunks = ['data: {"a":1}\n\ndata: {"b":2}\n\ndata: {"c":3}\n\n']
265+
266+
const client = new FetchClient({
267+
name: 'test',
268+
baseUrl: 'http://localhost',
269+
connection: fetch,
270+
stringify: (q) => ''
271+
})
272+
273+
const response = createChunkedSSEResponse(chunks)
274+
const messages: any[] = []
275+
276+
for await (const data of client.handleEventStream(response)) {
277+
messages.push(data)
278+
}
279+
280+
expect(messages).toHaveLength(3)
281+
expect(messages[0]).toEqual({ a: 1 })
282+
expect(messages[1]).toEqual({ b: 2 })
283+
expect(messages[2]).toEqual({ c: 3 })
284+
})
285+
286+
it('handles event split at delimiter boundary', async () => {
287+
// Split right at the \n\n boundary
288+
const chunks = ['data: {"first":true}\n', '\ndata: {"second":true}\n\n']
289+
290+
const client = new FetchClient({
291+
name: 'test',
292+
baseUrl: 'http://localhost',
293+
connection: fetch,
294+
stringify: (q) => ''
295+
})
296+
297+
const response = createChunkedSSEResponse(chunks)
298+
const messages: any[] = []
299+
300+
for await (const data of client.handleEventStream(response)) {
301+
messages.push(data)
302+
}
303+
304+
expect(messages).toHaveLength(2)
305+
expect(messages[0]).toEqual({ first: true })
306+
expect(messages[1]).toEqual({ second: true })
307+
})
308+
309+
it('handles multi-byte UTF-8 characters split across chunks', async () => {
310+
// UTF-8 encoding of emoji can be split across chunks
311+
const fullMessage = 'data: {"emoji":"🎉"}\n\n'
312+
const bytes = new TextEncoder().encode(fullMessage)
313+
// Split in the middle of the emoji (which is 4 bytes in UTF-8)
314+
const chunk1 = bytes.slice(0, 18) // cuts into the emoji
315+
const chunk2 = bytes.slice(18)
316+
317+
const stream = new ReadableStream({
318+
start(controller) {
319+
controller.enqueue(chunk1)
320+
controller.enqueue(chunk2)
321+
controller.close()
322+
}
323+
})
324+
325+
const response = new Response(stream, {
326+
headers: { 'content-type': 'text/event-stream' }
327+
})
328+
329+
const client = new FetchClient({
330+
name: 'test',
331+
baseUrl: 'http://localhost',
332+
connection: fetch,
333+
stringify: (q) => ''
334+
})
335+
336+
const messages: any[] = []
337+
338+
for await (const data of client.handleEventStream(response)) {
339+
messages.push(data)
340+
}
341+
342+
expect(messages).toHaveLength(1)
343+
expect(messages[0]).toEqual({ emoji: '🎉' })
344+
})
345+
})

packages/feathers/src/client/fetch.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ export class FetchClient<T = any, D = Partial<T>, P extends Params = FetchClient
130130
async *handleEventStream(res: Response) {
131131
const reader = res.body.getReader()
132132
const decoder = new TextDecoder()
133+
let buffer = ''
133134

134135
while (true) {
135136
const { value, done } = await reader.read()
@@ -139,11 +140,17 @@ export class FetchClient<T = any, D = Partial<T>, P extends Params = FetchClient
139140
}
140141

141142
if (value) {
142-
const text = decoder.decode(value)
143-
const eventChunks = text.split('\n\n').filter(Boolean)
143+
buffer += decoder.decode(value, { stream: true })
144144

145-
for (const chunk of eventChunks) {
146-
const lines = chunk.split('\n')
145+
// SSE events are separated by \n\n
146+
const events = buffer.split('\n\n')
147+
// Keep the last potentially incomplete event in the buffer
148+
buffer = events.pop() || ''
149+
150+
for (const event of events) {
151+
if (!event.trim()) continue
152+
153+
const lines = event.split('\n')
147154
const dataLine = lines.find((line) => line.startsWith('data: '))
148155

149156
if (dataLine) {

website/content/api/client/rest.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,73 @@ If no `Content-Type` header is specified, streaming requests default to `applica
285285
Streaming uploads are only supported with the REST/HTTP transport. Socket.io does not support streaming request bodies.
286286
::
287287

288+
### Streaming Responses (SSE)
289+
290+
When a service returns an [async generator or async iterable](../http#async-iterators-sse), the server sends the response as Server-Sent Events (SSE). The REST client automatically detects this and returns an async iterable that you can consume with `for await...of`:
291+
292+
```ts
293+
// Server - service returns an async generator
294+
class ChatService {
295+
async *create(data: { prompt: string }) {
296+
const stream = await ai.generateStream(data.prompt)
297+
298+
for await (const chunk of stream) {
299+
yield { type: 'text', content: chunk }
300+
}
301+
}
302+
}
303+
304+
// Client - consume the stream
305+
const response = app.service('chat').create({ prompt: 'Hello' })
306+
307+
for await (const chunk of response) {
308+
console.log(chunk.content) // Streams in real-time
309+
}
310+
```
311+
312+
This is useful for:
313+
314+
- **AI/LLM responses** - Stream tokens as they're generated
315+
- **Progress updates** - Report status during long-running operations
316+
- **Live data feeds** - Push data to clients as it becomes available
317+
318+
```ts
319+
// Example: Streaming AI chat with status updates
320+
class AIChatService {
321+
async *create(data: { messages: Message[] }, params: Params) {
322+
yield { type: 'status', text: 'Thinking...' }
323+
324+
const stream = await llm.chat(data.messages)
325+
326+
for await (const token of stream) {
327+
yield { type: 'text', text: token }
328+
}
329+
330+
yield { type: 'done' }
331+
}
332+
}
333+
334+
// Client
335+
let fullResponse = ''
336+
337+
for await (const event of app.service('ai-chat').create({ messages })) {
338+
if (event.type === 'status') {
339+
showStatus(event.text)
340+
} else if (event.type === 'text') {
341+
fullResponse += event.text
342+
updateUI(fullResponse)
343+
}
344+
}
345+
```
346+
347+
::note[Automatic buffering]
348+
The client automatically handles SSE stream buffering, correctly parsing events even when they arrive split across network chunks. This ensures reliable streaming regardless of network conditions.
349+
::
350+
351+
::warning[REST only]
352+
Streaming responses are only supported with the REST/HTTP transport. For real-time updates over Socket.io, use [channels and events](../channels) instead.
353+
::
354+
288355
### Custom Methods
289356

290357
On the client, [custom service methods](../services#custom-methods) registered using the `methods` option when registering the service via `restClient.service()`:

0 commit comments

Comments
 (0)