diff --git a/src/listener.ts b/src/listener.ts index 8d7e2a5..852c054 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -1,6 +1,7 @@ import type { IncomingMessage, ServerResponse, OutgoingHttpHeaders } from 'node:http' import { Http2ServerRequest } from 'node:http2' import type { Http2ServerResponse } from 'node:http2' +import type { Writable } from 'node:stream' import type { IncomingMessageWithWrapBodyStream } from './request' import { abortControllerKey, @@ -12,7 +13,12 @@ import { import { cacheKey, Response as LightweightResponse } from './response' import type { InternalCache } from './response' import type { CustomErrorHandler, FetchCallback, HttpBindings } from './types' -import { writeFromReadableStream, buildOutgoingHttpHeaders } from './utils' +import { + readWithoutBlocking, + writeFromReadableStream, + writeFromReadableStreamDefaultReader, + buildOutgoingHttpHeaders, +} from './utils' import { X_ALREADY_SENT } from './utils/response/constants' import './globals' @@ -21,9 +27,6 @@ type OutgoingHasOutgoingEnded = Http2ServerResponse & { [outgoingEnded]?: () => void } -const regBuffer = /^no$/i -const regContentType = /^(application\/json\b|text\/(?!event-stream\b))/i - const handleRequestError = (): Response => new Response(null, { status: 400, @@ -122,41 +125,51 @@ const responseViaResponseObject = async ( const resHeaderRecord: OutgoingHttpHeaders = buildOutgoingHttpHeaders(res.headers) if (res.body) { - /** - * If content-encoding is set, we assume that the response should be not decoded. - * Else if transfer-encoding is set, we assume that the response should be streamed. - * Else if content-length is set, we assume that the response content has been taken care of. - * Else if x-accel-buffering is set to no, we assume that the response should be streamed. - * Else if content-type is not application/json nor text/* but can be text/event-stream, - * we assume that the response should be streamed. - */ + const reader = res.body.getReader() - const { - 'transfer-encoding': transferEncoding, - 'content-encoding': contentEncoding, - 'content-length': contentLength, - 'x-accel-buffering': accelBuffering, - 'content-type': contentType, - } = resHeaderRecord + const values: Uint8Array[] = [] + let done = false + let currentReadPromise: Promise> | undefined = undefined - if ( - transferEncoding || - contentEncoding || - contentLength || - // nginx buffering variant - (accelBuffering && regBuffer.test(accelBuffering as string)) || - !regContentType.test(contentType as string) - ) { - outgoing.writeHead(res.status, resHeaderRecord) - flushHeaders(outgoing) + // In the case of synchronous responses, usually a maximum of two readings is done + for (let i = 0; i < 2; i++) { + currentReadPromise = reader.read() + const chunk = await readWithoutBlocking(currentReadPromise).catch((e) => { + console.error(e) + done = true + }) + if (!chunk) { + // Error occurred or currentReadPromise is not yet resolved. + // If an error occurs, immediately break the loop. + // If currentReadPromise is not yet resolved, pass it to writeFromReadableStreamDefaultReader. + break + } + currentReadPromise = undefined - await writeFromReadableStream(res.body, outgoing) - } else { - const buffer = await res.arrayBuffer() - resHeaderRecord['content-length'] = buffer.byteLength + if (chunk.value) { + values.push(chunk.value) + } + if (chunk.done) { + done = true + break + } + } + + if (done && !('content-length' in resHeaderRecord)) { + resHeaderRecord['content-length'] = values.reduce((acc, value) => acc + value.length, 0) + } - outgoing.writeHead(res.status, resHeaderRecord) - outgoing.end(new Uint8Array(buffer)) + outgoing.writeHead(res.status, resHeaderRecord) + values.forEach((value) => { + ;(outgoing as Writable).write(value) + }) + if (done) { + outgoing.end() + } else { + if (values.length === 0) { + flushHeaders(outgoing) + } + await writeFromReadableStreamDefaultReader(reader, outgoing, currentReadPromise) } } else if (resHeaderRecord[X_ALREADY_SENT]) { // do nothing, the response has already been sent diff --git a/src/utils.ts b/src/utils.ts index 1f4240d..7230bc8 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,22 +1,23 @@ import type { OutgoingHttpHeaders } from 'node:http' import type { Writable } from 'node:stream' -export function writeFromReadableStream(stream: ReadableStream, writable: Writable) { - if (stream.locked) { - throw new TypeError('ReadableStream is locked.') - } else if (writable.destroyed) { - return - } - - const reader = stream.getReader() +export async function readWithoutBlocking( + readPromise: Promise> +): Promise | undefined> { + return Promise.race([readPromise, Promise.resolve().then(() => Promise.resolve(undefined))]) +} +export function writeFromReadableStreamDefaultReader( + reader: ReadableStreamDefaultReader, + writable: Writable, + currentReadPromise?: Promise> | undefined +) { const handleError = () => { // ignore the error } writable.on('error', handleError) - - reader.read().then(flow, handleStreamError) + ;(currentReadPromise ?? reader.read()).then(flow, handleStreamError) return reader.closed.finally(() => { writable.off('error', handleError) @@ -48,6 +49,16 @@ export function writeFromReadableStream(stream: ReadableStream, writ } } +export function writeFromReadableStream(stream: ReadableStream, writable: Writable) { + if (stream.locked) { + throw new TypeError('ReadableStream is locked.') + } else if (writable.destroyed) { + return + } + + return writeFromReadableStreamDefaultReader(stream.getReader(), writable) +} + export const buildOutgoingHttpHeaders = ( headers: Headers | HeadersInit | null | undefined ): OutgoingHttpHeaders => { diff --git a/test/server.test.ts b/test/server.test.ts index 0a2e1c3..b802c9e 100644 --- a/test/server.test.ts +++ b/test/server.test.ts @@ -11,7 +11,7 @@ import { createServer as createHTTPSServer } from 'node:https' import { GlobalRequest, Request as LightweightRequest, getAbortController } from '../src/request' import { GlobalResponse, Response as LightweightResponse } from '../src/response' import { createAdaptorServer, serve } from '../src/server' -import type { HttpBindings } from '../src/types' +import type { HttpBindings, ServerType } from '../src/types' import { app } from './app' describe('Basic', () => { @@ -133,126 +133,153 @@ describe('Basic', () => { }) describe('various response body types', () => { - const app = new Hono() - app.use('*', async (c, next) => { - await next() + const runner = (Response: typeof GlobalResponse) => { + const largeText = 'a'.repeat(1024 * 1024 * 10) + let server: ServerType + let resolveReadableStreamPromise: () => void + beforeAll(() => { + const app = new Hono() + app.use('*', async (c, next) => { + await next() + + // generate internal response object + const status = c.res.status + if (status > 999) { + c.res = new Response('Internal Server Error', { status: 500 }) + } + }) + app.get('/', () => { + const response = new Response('Hello! Node!') + return response + }) + app.get('/large', () => { + // 10MB text + const response = new Response(largeText) + return response + }) + app.get('/uint8array', () => { + const response = new Response(new Uint8Array([1, 2, 3]), { + headers: { 'content-type': 'application/octet-stream' }, + }) + return response + }) + app.get('/blob', () => { + const response = new Response(new Blob([new Uint8Array([1, 2, 3])]), { + headers: { 'content-type': 'application/octet-stream' }, + }) + return response + }) + const readableStreamPromise = new Promise((resolve) => { + resolveReadableStreamPromise = resolve + }) + app.get('/readable-stream', () => { + const stream = new ReadableStream({ + async start(controller) { + await readableStreamPromise + controller.enqueue('Hello!') + controller.enqueue(' Node!') + controller.close() + }, + }) + return new Response(stream) + }) + app.get('/buffer', () => { + const response = new Response(Buffer.from('Hello Hono!'), { + headers: { 'content-type': 'text/plain' }, + }) + return response + }) - // generate internal response object - const status = c.res.status - if (status > 999) { - c.res = new Response('Internal Server Error', { status: 500 }) - } - }) - app.get('/', () => { - const response = new Response('Hello! Node!') - return response - }) - app.get('/uint8array', () => { - const response = new Response(new Uint8Array([1, 2, 3]), { - headers: { 'content-type': 'application/octet-stream' }, - }) - return response - }) - app.get('/blob', () => { - const response = new Response(new Blob([new Uint8Array([1, 2, 3])]), { - headers: { 'content-type': 'application/octet-stream' }, - }) - return response - }) - let resolveReadableStreamPromise: () => void - const readableStreamPromise = new Promise((resolve) => { - resolveReadableStreamPromise = resolve - }) - app.get('/readable-stream', () => { - const stream = new ReadableStream({ - async start(controller) { - await readableStreamPromise - controller.enqueue('Hello!') - controller.enqueue(' Node!') - controller.close() - }, - }) - return new Response(stream) - }) - app.get('/buffer', () => { - const response = new Response(Buffer.from('Hello Hono!'), { - headers: { 'content-type': 'text/plain' }, - }) - return response - }) + app.use('/etag/*', etag()) + app.get('/etag/buffer', () => { + const response = new Response(Buffer.from('Hello Hono!'), { + headers: { 'content-type': 'text/plain' }, + }) + return response + }) - app.use('/etag/*', etag()) - app.get('/etag/buffer', () => { - const response = new Response(Buffer.from('Hello Hono!'), { - headers: { 'content-type': 'text/plain' }, + server = createAdaptorServer(app) }) - return response - }) - const server = createAdaptorServer(app) + it('Should return 200 response - GET /', async () => { + const res = await request(server).get('/') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('text/plain') + expect(res.headers['content-length']).toMatch('12') + expect(res.text).toBe('Hello! Node!') + }) - it('Should return 200 response - GET /', async () => { - const res = await request(server).get('/') - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('text/plain') - expect(res.headers['content-length']).toMatch('12') - expect(res.text).toBe('Hello! Node!') - }) + it('Should return 200 response - GET /large', async () => { + const res = await request(server).get('/large') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('text/plain') + expect(res.headers['content-length']).toMatch(largeText.length.toString()) + expect(res.text).toBe(largeText) + }) - it('Should return 200 response - GET /uint8array', async () => { - const res = await request(server).get('/uint8array') - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('application/octet-stream') - expect(res.headers['content-length']).toMatch('3') - expect(res.body).toEqual(Buffer.from([1, 2, 3])) - }) + it('Should return 200 response - GET /uint8array', async () => { + const res = await request(server).get('/uint8array') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('application/octet-stream') + expect(res.headers['content-length']).toMatch('3') + expect(res.body).toEqual(Buffer.from([1, 2, 3])) + }) - it('Should return 200 response - GET /blob', async () => { - const res = await request(server).get('/blob') - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('application/octet-stream') - expect(res.headers['content-length']).toMatch('3') - expect(res.body).toEqual(Buffer.from([1, 2, 3])) - }) + it('Should return 200 response - GET /blob', async () => { + const res = await request(server).get('/blob') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('application/octet-stream') + expect(res.headers['content-length']).toMatch('3') + expect(res.body).toEqual(Buffer.from([1, 2, 3])) + }) + + it('Should return 200 response - GET /readable-stream', async () => { + const expectedChunks = ['Hello!', ' Node!'] + const resPromise = request(server) + .get('/readable-stream') + .parse((res, fn) => { + // response header should be sent before sending data. + expect(res.headers['transfer-encoding']).toBe('chunked') + resolveReadableStreamPromise() + + res.on('data', (chunk) => { + const str = chunk.toString() + expect(str).toBe(expectedChunks.shift()) + }) + res.on('end', () => fn(null, '')) + }) + await new Promise((resolve) => setTimeout(resolve, 100)) + const res = await resPromise + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('text/plain; charset=UTF-8') + expect(res.headers['content-length']).toBeUndefined() + expect(expectedChunks.length).toBe(0) // all chunks are received + }) - it('Should return 200 response - GET /readable-stream', async () => { - const expectedChunks = ['Hello!', ' Node!'] - const resPromise = request(server) - .get('/readable-stream') - .parse((res, fn) => { - // response header should be sent before sending data. - expect(res.headers['transfer-encoding']).toBe('chunked') - resolveReadableStreamPromise() + it('Should return 200 response - GET /buffer', async () => { + const res = await request(server).get('/buffer') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('text/plain') + expect(res.headers['content-length']).toMatch('11') + expect(res.text).toBe('Hello Hono!') + }) - res.on('data', (chunk) => { - const str = chunk.toString() - expect(str).toBe(expectedChunks.shift()) - }) - res.on('end', () => fn(null, '')) - }) - await new Promise((resolve) => setTimeout(resolve, 100)) - const res = await resPromise - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('text/plain; charset=UTF-8') - expect(res.headers['content-length']).toBeUndefined() - expect(expectedChunks.length).toBe(0) // all chunks are received - }) + it('Should return 200 response - GET /etag/buffer', async () => { + const res = await request(server).get('/etag/buffer') + expect(res.status).toBe(200) + expect(res.headers['content-type']).toMatch('text/plain') + expect(res.headers['etag']).toMatch('"7e03b9b8ed6156932691d111c81c34c3c02912f9"') + expect(res.headers['content-length']).toMatch('11') + expect(res.text).toBe('Hello Hono!') + }) + } - it('Should return 200 response - GET /buffer', async () => { - const res = await request(server).get('/buffer') - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('text/plain') - expect(res.headers['content-length']).toMatch('11') - expect(res.text).toBe('Hello Hono!') + describe('GlobalResponse', () => { + runner(GlobalResponse) }) - it('Should return 200 response - GET /etag/buffer', async () => { - const res = await request(server).get('/etag/buffer') - expect(res.status).toBe(200) - expect(res.headers['content-type']).toMatch('text/plain') - expect(res.headers['etag']).toMatch('"7e03b9b8ed6156932691d111c81c34c3c02912f9"') - expect(res.headers['content-length']).toMatch('11') - expect(res.text).toBe('Hello Hono!') + describe('LightweightResponse', () => { + runner(LightweightResponse as unknown as typeof GlobalResponse) }) }) diff --git a/test/utils.test.ts b/test/utils.test.ts index 53f6972..391a647 100644 --- a/test/utils.test.ts +++ b/test/utils.test.ts @@ -1,5 +1,9 @@ import { Writable } from 'node:stream' -import { buildOutgoingHttpHeaders, writeFromReadableStream } from '../src/utils' +import { + buildOutgoingHttpHeaders, + writeFromReadableStream, + readWithoutBlocking, +} from '../src/utils' describe('buildOutgoingHttpHeaders', () => { it('original content-type is preserved', () => { @@ -110,3 +114,92 @@ describe('writeFromReadableStream', () => { expect(cancelCalled).toBe(false) // cancel should not be called }) }) + +describe('readWithoutBlocking', () => { + const encode = (body: string) => new TextEncoder().encode(body) + it('should return the body for simple text', async () => { + const text = 'Hello! Node!' + const response = new Response(text) + const reader = response.body!.getReader() + const firstChunk = await readWithoutBlocking(reader.read()) + expect(firstChunk).toEqual({ done: false, value: encode(text) }) + const secondChunk = await readWithoutBlocking(reader.read()) + expect(secondChunk).toEqual({ done: true, value: undefined }) + }) + + it('should return the body for large text', async () => { + const text = 'a'.repeat(1024 * 1024 * 10) + const response = new Response(text) + const reader = response.body!.getReader() + const firstChunk = await readWithoutBlocking(reader.read()) + expect(firstChunk?.done).toBe(false) + expect(firstChunk?.value?.length).toEqual(10 * 1024 * 1024) + const secondChunk = await readWithoutBlocking(reader.read()) + expect(secondChunk).toEqual({ done: true, value: undefined }) + }) + + it('should return the body simple synchronous readable stream', async () => { + const text = 'Hello! Node!' + const body = new ReadableStream({ + start(controller) { + controller.enqueue(encode(text)) + controller.close() + }, + }) + const response = new Response(body) + const reader = response.body!.getReader() + const result = await readWithoutBlocking(reader.read()) + expect(result).toEqual({ done: false, value: encode(text) }) + }) + + it('should return undefined if stream is not ready', async () => { + const text = 'Hello! Node!' + const body = new ReadableStream({ + async start(controller) { + await new Promise((resolve) => setTimeout(resolve)) + controller.enqueue(encode(text)) + controller.close() + }, + }) + const response = new Response(body) + const reader = response.body!.getReader() + const readPromise = reader.read() + + const result = await readWithoutBlocking(readPromise) + expect(result).toBeUndefined() + + await new Promise((resolve) => setTimeout(resolve)) + const result2 = await readWithoutBlocking(readPromise) + expect(result2).toEqual({ done: false, value: encode(text) }) + const result3 = await readWithoutBlocking(reader.read()) + expect(result3).toEqual({ done: true, value: undefined }) + }) + + it('should return undefined if stream is closed', async () => { + const body = new ReadableStream({ + async start() { + throw new Error('test') + }, + }) + const response = new Response(body) + const reader = response.body!.getReader() + const readPromise = reader.read() + + const result = await readWithoutBlocking(readPromise) + expect(result).toBeUndefined() + }) + + it('should return undefined if stream is errored', async () => { + const body = new ReadableStream({ + pull() { + throw new Error('test') + }, + }) + const response = new Response(body) + const reader = response.body!.getReader() + const readPromise = reader.read() + + const result = await readWithoutBlocking(readPromise).catch(() => undefined) + expect(result).toBeUndefined() + }) +})