Skip to content

Commit ccbd9e4

Browse files
committed
📘 fix: #1772 resolve merge conflict
1 parent b48ade6 commit ccbd9e4

File tree

4 files changed

+163
-58
lines changed

4 files changed

+163
-58
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Feature:
33
- [#1803](https://github.com/elysiajs/elysia/pull/1803) stream response with pull based backpressure
44
- [#1802](https://github.com/elysiajs/elysia/pull/1802) handle range header for file/blob response
5+
- [#1722](https://github.com/elysiajs/elysia/pull/1772), [#1741](https://github.com/elysiajs/elysia/issues/1741) direct ReadableStream perf blow-up
56

67
Bug fix:
78
- [#1805](https://github.com/elysiajs/elysia/pull/1805) dynamic imports inside .guard not registering routes
@@ -10,6 +11,8 @@ Bug fix:
1011
- [#1794](https://github.com/elysiajs/elysia/pull/1794) merge app cookie config into route cookie validator config
1112
- [#1796](https://github.com/elysiajs/elysia/pull/1796) check custom parser by full name
1213
- [#1795](https://github.com/elysiajs/elysia/pull/1795) write transformed cookie value to cookie entry directly
14+
- [#1793](https://github.com/elysiajs/elysia/pull/1793) use cookie schema for cookie noValidate check
15+
- [#1792](https://github.com/elysiajs/elysia/pull/1792) throw ValidationError instead of boolean in response encode path
1316
- detect HTML bundle when inline response is Promise
1417

1518
Change:

example/a.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
11
import { Elysia, t } from '../src'
2-
import html from '../example/index.html'
2+
import { req } from '../test/utils'
33

44
const app = new Elysia()
5-
.get(
6-
'/',
7-
({ cookie: { thing } }) => {
8-
console.log(typeof thing.value)
5+
.get('/', async () => {
6+
const file = Bun.file('test/kyuukurarin.mp4')
97

10-
return thing.value
11-
},
12-
{
13-
cookie: t.Object({
14-
thing: t.Number()
15-
})
16-
}
17-
)
18-
.listen(3000)
8+
// Wrap the stream in another ReadableStream
9+
// perhaps we are concatenating streams or whatever
10+
const body = new ReadableStream({
11+
async start(controller) {
12+
const reader = file.stream().getReader()
13+
try {
14+
while (true) {
15+
const { done, value } = await reader.read()
16+
if (done) break
17+
controller.enqueue(value)
18+
}
19+
controller.close()
20+
} catch (err) {
21+
controller.error(err)
22+
} finally {
23+
reader.releaseLock()
24+
}
25+
}
26+
})
27+
28+
// Returning the stream uses 100% for several minutes
29+
return body
1930

20-
app.handle(
21-
new Request('http://localhost:3000/', {
22-
headers: {
23-
cookie: 'thing=9'
24-
}
31+
// Returning the same stream wrapped in a Response servers the stream in a fraction of a second
32+
// return new Response(body);
2533
})
26-
)
27-
.then((response) => response.json())
28-
.then(console.log)
34+
.listen(3000)

src/adapter/utils.ts

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { hasHeaderShorthand, isNotEmpty, StatusMap } from '../utils'
44
import type { Context } from '../context'
55
import { env } from '../universal'
66
import { isBun } from '../universal/utils'
7+
import { MaybePromise } from '../types'
78

89
export const handleFile = (
910
response: File | Blob,
@@ -65,7 +66,11 @@ export const handleFile = (
6566
return new Response(
6667
(
6768
response as unknown as {
68-
slice(start: number, end: number, contentType?: string): Blob
69+
slice(
70+
start: number,
71+
end: number,
72+
contentType?: string
73+
): Blob
6974
}
7075
).slice(start, end + 1, response.type),
7176
{
@@ -208,7 +213,35 @@ interface CreateHandlerParameter {
208213
mapCompactResponse(response: unknown, request?: Request): Response
209214
}
210215

211-
const allowRapidStream = env.ELYSIA_RAPID_STREAM === 'true'
216+
const enqueueBinaryChunk = (
217+
controller: ReadableStreamDefaultController,
218+
chunk: unknown
219+
): MaybePromise<boolean> => {
220+
if (chunk instanceof Blob)
221+
return chunk.arrayBuffer().then((buffer) => {
222+
controller.enqueue(new Uint8Array(buffer))
223+
return true as const
224+
})
225+
226+
if (chunk instanceof Uint8Array) {
227+
controller.enqueue(chunk)
228+
return true
229+
}
230+
231+
if (chunk instanceof ArrayBuffer) {
232+
controller.enqueue(new Uint8Array(chunk))
233+
return true
234+
}
235+
236+
if (ArrayBuffer.isView(chunk)) {
237+
controller.enqueue(
238+
new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength)
239+
)
240+
return true
241+
}
242+
243+
return false
244+
}
212245

213246
export const createStreamHandler =
214247
({ mapResponse, mapCompactResponse }: CreateHandlerParameter) =>
@@ -301,27 +334,28 @@ export const createStreamHandler =
301334
// Enqueue the already-extracted init value (first generator
302335
// result, used above for SSE detection). Subsequent values
303336
// are produced on-demand by pull().
304-
if (!init || init.value instanceof ReadableStream) {
305-
} else if (
306-
init.value !== undefined &&
307-
init.value !== null
308-
) {
337+
if (
338+
!init ||
339+
init.value instanceof ReadableStream ||
340+
init.value === undefined ||
341+
init.value === null
342+
)
343+
return
344+
345+
// @ts-ignore
346+
if (init.value.toSSE)
309347
// @ts-ignore
310-
if (init.value.toSSE)
311-
// @ts-ignore
312-
controller.enqueue(init.value.toSSE())
313-
else if (typeof init.value === 'object')
314-
try {
315-
controller.enqueue(
316-
format(JSON.stringify(init.value))
317-
)
318-
} catch {
319-
controller.enqueue(
320-
format(init.value.toString())
321-
)
322-
}
323-
else controller.enqueue(format(init.value.toString()))
324-
}
348+
controller.enqueue(init.value.toSSE())
349+
else if (enqueueBinaryChunk(controller, init.value)) return
350+
else if (typeof init.value === 'object')
351+
try {
352+
controller.enqueue(
353+
format(JSON.stringify(init.value))
354+
)
355+
} catch {
356+
controller.enqueue(format(init.value.toString()))
357+
}
358+
else controller.enqueue(format(init.value.toString()))
325359
},
326360

327361
async pull(controller) {
@@ -348,23 +382,19 @@ export const createStreamHandler =
348382
if (chunk === undefined || chunk === null) return
349383

350384
// @ts-ignore
351-
if (chunk.toSSE) {
385+
if (chunk.toSSE)
352386
// @ts-ignore
353387
controller.enqueue(chunk.toSSE())
354-
} else {
355-
if (typeof chunk === 'object')
356-
try {
357-
controller.enqueue(
358-
format(JSON.stringify(chunk))
359-
)
360-
} catch {
361-
controller.enqueue(
362-
format(chunk.toString())
363-
)
364-
}
365-
else
388+
else if (enqueueBinaryChunk(controller, chunk)) return
389+
else if (typeof chunk === 'object')
390+
try {
391+
controller.enqueue(
392+
format(JSON.stringify(chunk))
393+
)
394+
} catch {
366395
controller.enqueue(format(chunk.toString()))
367-
}
396+
}
397+
else controller.enqueue(format(chunk.toString()))
368398
} catch (error) {
369399
console.warn(error)
370400

test/response/stream.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,4 +707,70 @@ describe('Stream', () => {
707707

708708
reader.cancel()
709709
})
710+
711+
it('stream ReadableStream binary chunks', async () => {
712+
const payload = new Uint8Array(128 * 1024)
713+
714+
for (let i = 0; i < payload.length; i++) payload[i] = i % 251
715+
716+
const app = new Elysia().get(
717+
'/',
718+
() =>
719+
new ReadableStream({
720+
start(controller) {
721+
controller.enqueue(payload.subarray(0, 32768))
722+
controller.enqueue(payload.subarray(32768, 65536))
723+
controller.enqueue(payload.subarray(65536, 98304))
724+
controller.enqueue(payload.subarray(98304))
725+
controller.close()
726+
}
727+
})
728+
)
729+
730+
const response = await app.handle(req('/'))
731+
const result = new Uint8Array(await response.arrayBuffer())
732+
733+
expect(result.byteLength).toBe(payload.byteLength)
734+
expect(result).toEqual(payload)
735+
})
736+
737+
it('stream ReadableStream binary views and blob chunks', async () => {
738+
const source = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
739+
const expected = new Uint8Array([6, 7, 8, 9, 2, 3, 4, 5, 10, 11, 12])
740+
741+
const app = new Elysia().get(
742+
'/',
743+
() =>
744+
new ReadableStream({
745+
start(controller) {
746+
controller.enqueue(source.subarray(6, 10))
747+
controller.enqueue(new DataView(source.buffer, 2, 4))
748+
controller.enqueue(
749+
new Blob([new Uint8Array([10, 11, 12])])
750+
)
751+
controller.close()
752+
}
753+
})
754+
)
755+
756+
const response = await app.handle(req('/'))
757+
const result = new Uint8Array(await response.arrayBuffer())
758+
759+
expect(result).toEqual(expected)
760+
})
761+
762+
it('stream generator Uint8Array chunks as binary', async () => {
763+
const app = new Elysia().get('/', async function* () {
764+
yield new Uint8Array([1, 2])
765+
await Bun.sleep(1)
766+
yield new Uint8Array([3, 4])
767+
})
768+
769+
const response = await app.handle(req('/'))
770+
const result = new Uint8Array(await response.arrayBuffer())
771+
772+
// expect(result).toEqual(result.toBase64())
773+
774+
expect(result).toEqual(new Uint8Array([1, 2, 3, 4]))
775+
})
710776
})

0 commit comments

Comments
 (0)