Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"eslint": "^8.49.0",
"eslint-plugin-security": "^2.1.0",
"eslint-plugin-sonarjs": "^0.23.0",
"eventsource-parser": "^1.1.2",
"expect-type": "^0.16.0",
"memoirist": "^0.2.0",
"prettier": "^3.3.3",
Expand Down
67 changes: 32 additions & 35 deletions src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,24 @@ export const serializeCookie = (cookies: Context['set']['cookie']) => {

// return arr
// }

const handleStream = async (
generator: Generator | AsyncGenerator,
set?: Context['set'],
request?: Request
) => {
let init = generator.next()
if (init instanceof Promise) init = await init

if (init.done) {
if (set) return mapResponse(init.value, set, request)
return mapCompactResponse(init.value, request)
let init
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove try catch in here, handleStream function is already wrapped in try-catch that handle onError already

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the app will crash if you throw an error in a generator before yield, i added a test that reproduces this here: https://github.com/remorses/elysia/blob/932951679c89aef6bad08779b6eb7fe94c30335d/test/response/stream.test.ts#L75

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this crash removing the try catch when aot is false. In aot mode it still crashes.

the app crashes because the error is throws asynchronously while mapResponse is a not async, i think there are many more of these bugs hidden in the code, the ideal solution would be to always await mapResonse in aot too

try {
init = generator.next()
if (init instanceof Promise) init = await init

if (init?.done) {
if (set) return mapResponse(init.value, set, request)
return mapCompactResponse(init.value, request)
}
} catch (error) {
// TODO should call app.onError if set
if (set) return mapResponse(error, set, request)
return mapCompactResponse(error, request)
}

return new Response(
Expand All @@ -146,38 +152,29 @@ const handleStream = async (
}
})

if (init.value !== undefined && init.value !== null) {
if (
typeof init.value === "object"
if (init?.value !== undefined && init?.value !== null)
controller.enqueue(
Buffer.from(
`event: message\ndata: ${JSON.stringify(init.value)}\n\n`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make sense to check typeof object before using JSON.stringify

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't always call JSON.stringify and the user yields a string that contains 2 new lines the response will break the text/event-stream protocol. If you always call JSON.stringify you can be sure that all the new lines are encoded with \n and put in a single line.

The parsing in Eden also becomes simpler because you know messages are always a JSON encoded string instead of using heuristics.

)
)
try {
controller.enqueue(
Buffer.from(JSON.stringify(init.value))
)
} catch {
controller.enqueue(Buffer.from(init.value.toString()))
}
else controller.enqueue(Buffer.from(init.value.toString()))
}

for await (const chunk of generator) {
if (end) break
if (chunk === undefined || chunk === null) continue
try {
for await (const chunk of generator) {
if (end) break
if (chunk === undefined || chunk === null) continue

if (typeof chunk === 'object')
try {
controller.enqueue(
Buffer.from(JSON.stringify(chunk))
controller.enqueue(
Buffer.from(
`event: message\ndata: ${JSON.stringify(chunk)}\n\n`
)
} catch {
controller.enqueue(Buffer.from(chunk.toString()))
}
else controller.enqueue(Buffer.from(chunk.toString()))

// Wait for the next event loop
// Otherwise the data will be mixed up
await new Promise<void>((resolve) =>
setTimeout(() => resolve(), 0)
)
}
} catch (error: any) {
controller.enqueue(
Buffer.from(
`event: error\ndata: ${JSON.stringify(error.message || error.name || 'Error')}\n\n`
)
)
}

Expand Down
107 changes: 101 additions & 6 deletions test/response/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import { describe, it, expect } from 'bun:test'
import { req } from '../utils'
import { createParser } from 'eventsource-parser'

import { Elysia } from '../../src'

function textEventStream(items: string[]) {
return items
.map((item) => `event: message\ndata: ${JSON.stringify(item)}\n\n`)
.join('')
}

function parseTextEventStreamItem(item: string) {
const data = item.split('data: ')[1].split('\n')[0]
return JSON.parse(data)
}

describe('Stream', () => {
it('handle stream', async () => {
const expected = ['a', 'b', 'c']
Expand Down Expand Up @@ -31,7 +43,9 @@ describe('Stream', () => {
reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve(acc)

expect(value.toString()).toBe(expected.shift()!)
expect(parseTextEventStreamItem(value.toString())).toBe(
expected.shift()!
)

acc += value.toString()
return reader.read().then(pump)
Expand All @@ -41,7 +55,46 @@ describe('Stream', () => {
})

expect(expected).toHaveLength(0)
expect(response).toBe('abc')
expect(response).toBe(textEventStream(['a', 'b', 'c']))
})
it('handle errors after yield', async () => {
const app = new Elysia().get('/', async function* () {
yield 'a'
await Bun.sleep(10)

throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toBe(
'event: message\ndata: "a"\n\nevent: error\ndata: "an error"\n\n'
)
})
// TODO
it.skip('handle errors before yield', async () => {
const app = new Elysia().get('/', async function* () {
throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toInclude('an error')
})

it.todo('handle errors before yield with onError', async () => {
const expected = 'error expected'
const app = new Elysia()
.onError(({}) => {
return new Response(expected)
})
.get('/', async function* () {
throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toBe(expected)
})

it('stop stream on canceled request', async () => {
Expand Down Expand Up @@ -79,9 +132,13 @@ describe('Stream', () => {
const { promise, resolve } = Promise.withResolvers()

reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve(acc)
if (done) {
return resolve(acc)
}

expect(value.toString()).toBe(expected.shift()!)
expect(parseTextEventStreamItem(value.toString())).toBe(
expected.shift()!
)

acc += value.toString()
return reader.read().then(pump)
Expand All @@ -91,7 +148,7 @@ describe('Stream', () => {
})

expect(expected).toHaveLength(0)
expect(response).toBe('ab')
expect(response).toBe(textEventStream(['a', 'b']))
})

it('mutate set before yield is called', async () => {
Expand All @@ -111,6 +168,42 @@ describe('Stream', () => {
'http://saltyaom.com'
)
})
it('handle stream with objects', async () => {
const objects = [
{ message: 'hello' },
{ response: 'world' },
{ data: [1, 2, 3] },
{ result: [4, 5, 6] }
]
const app = new Elysia().get('/', async function* ({}) {
for (const obj of objects) {
yield obj
}
})

const body = await app.handle(req('/')).then((x) => x.body)

let events = [] as any[]
const parser = createParser((event) => {
events.push(event)
})
const { promise, resolve } = Promise.withResolvers()
const reader = body?.getReader()!

reader.read().then(function pump({ done, value }): unknown {
if (done) {
return resolve()
}
const text = value.toString()
parser.feed(text)
return reader.read().then(pump)
})
await promise

expect(events.map((x) => x.data)).toEqual(
objects.map((x) => JSON.stringify(x))
)
})

it('mutate set before yield is called', async () => {
const expected = ['a', 'b', 'c']
Expand Down Expand Up @@ -216,7 +309,9 @@ describe('Stream', () => {
reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve()

expect(value.toString()).toBe(JSON.stringify(expected[i++]))
expect(parseTextEventStreamItem(value.toString())).toEqual(
expected[i++]
)

return reader.read().then(pump)
})
Expand Down