diff --git a/src/listener.ts b/src/listener.ts index 6a9f207..aec051f 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -5,6 +5,7 @@ import { newRequest, Request as LightweightRequest, toRequestError, + bodyCancelledKey, } from './request' import { cacheKey, Response as LightweightResponse } from './response' import type { InternalCache } from './response' @@ -196,6 +197,8 @@ export const getRequestListener = ( req[abortControllerKey].abort(incoming.errored.toString()) } else if (!outgoing.writableFinished) { req[abortControllerKey].abort('Client connection prematurely closed.') + } else if (req[bodyCancelledKey]) { + incoming.destroy(new Error('The request ended before the request body was consumed.')) } }) diff --git a/src/request.ts b/src/request.ts index 4065204..2468a5e 100644 --- a/src/request.ts +++ b/src/request.ts @@ -6,6 +6,8 @@ import { Http2ServerRequest } from 'node:http2' import { Readable } from 'node:stream' import type { TLSSocket } from 'node:tls' +export const bodyCancelledKey = Symbol('bodyCancelledKey') + export class RequestError extends Error { constructor( message: string, @@ -41,12 +43,19 @@ export class Request extends GlobalRequest { } } -const newRequestFromIncoming = ( - method: string, - url: string, - incoming: IncomingMessage | Http2ServerRequest, - abortController: AbortController -): Request => { +const newRequestFromIncoming = (originalRequest: Record): Request => { + const { + method, + [urlKey]: url, + [incomingKey]: incoming, + [abortControllerKey]: abortController, + } = originalRequest as { + method: string + [urlKey]: string + [incomingKey]: IncomingMessage | Http2ServerRequest + [abortControllerKey]: AbortController + } + const headerRecord: [string, string][] = [] const rawHeaders = incoming.rawHeaders for (let i = 0; i < rawHeaders.length; i += 2) { @@ -85,7 +94,23 @@ const newRequestFromIncoming = ( }) } else { // lazy-consume request body - init.body = Readable.toWeb(incoming) as ReadableStream + init.body = new ReadableStream({ + async start(controller) { + const stream = Readable.toWeb(incoming) as ReadableStream + + try { + for await (const chunk of stream) { + controller.enqueue(chunk) + } + controller.close() + } catch (error: unknown) { + controller.error(error) + } + }, + cancel() { + originalRequest[bodyCancelledKey] = true + }, + }) } } @@ -115,12 +140,7 @@ const requestPrototype: Record = { [getRequestCache]() { this[abortControllerKey] ||= new AbortController() - return (this[requestCache] ||= newRequestFromIncoming( - this.method, - this[urlKey], - this[incomingKey], - this[abortControllerKey] - )) + return (this[requestCache] ||= newRequestFromIncoming(this)) }, } ;[ diff --git a/test/server.test.ts b/test/server.test.ts index 1d524f5..abf16f8 100644 --- a/test/server.test.ts +++ b/test/server.test.ts @@ -28,6 +28,40 @@ describe('Basic', () => { app.post('/posts', (c) => { return c.redirect('/posts') }) + app.post('/no-body-consumed', (c) => { + if (!c.req.raw.body) { + // force create new request object + throw new Error('No body consumed') + } + return c.text('No body consumed') + }) + app.post('/body-cancelled', (c) => { + if (!c.req.raw.body) { + // force create new request object + throw new Error('No body consumed') + } + c.req.raw.body.cancel() + return c.text('No body consumed') + }) + app.post('/partially-consumed', async (c) => { + if (!c.req.raw.body) { + // force create new request object + throw new Error('No body consumed') + } + const reader = c.req.raw.body.getReader() + await reader.read() // read only one chunk + return c.text('No body consumed') + }) + app.post('/partially-consumed-and-cancelled', async (c) => { + if (!c.req.raw.body) { + // force create new request object + throw new Error('No body consumed') + } + const reader = c.req.raw.body.getReader() + await reader.read() // read only one chunk + reader.cancel() + return c.text('No body consumed') + }) app.delete('/posts/:id', (c) => { return c.text(`DELETE ${c.req.param('id')}`) }) @@ -82,6 +116,60 @@ describe('Basic', () => { expect(res.headers['location']).toBe('/posts') }) + it('Should return 200 response - POST /no-body-consumed', async () => { + const res = await request(server).post('/no-body-consumed').send('') + expect(res.status).toBe(200) + expect(res.text).toBe('No body consumed') + }) + + it('Should return 200 response - POST /body-cancelled', async () => { + const res = await request(server).post('/body-cancelled').send('') + expect(res.status).toBe(200) + expect(res.text).toBe('No body consumed') + }) + + it('Should return 200 response - POST /partially-consumed', async () => { + const buffer = Buffer.alloc(1024 * 10) // large buffer + const res = await new Promise((resolve, reject) => { + const req = request(server) + .post('/partially-consumed') + .set('Content-Length', buffer.length.toString()) + + req.write(buffer) + req.end((err, res) => { + if (err) { + reject(err) + } else { + resolve(res) + } + }) + }) + + expect(res.status).toBe(200) + expect(res.text).toBe('No body consumed') + }) + + it('Should return 200 response - POST /partially-consumed-and-cancelled', async () => { + const buffer = Buffer.alloc(1) // A large buffer will not make the test go far, so keep it small because it won't go far. + const res = await new Promise((resolve, reject) => { + const req = request(server) + .post('/partially-consumed-and-cancelled') + .set('Content-Length', buffer.length.toString()) + + req.write(buffer) + req.end((err, res) => { + if (err) { + reject(err) + } else { + resolve(res) + } + }) + }) + + expect(res.status).toBe(200) + expect(res.text).toBe('No body consumed') + }) + it('Should return 201 response - DELETE /posts/123', async () => { const res = await request(server).delete('/posts/123') expect(res.status).toBe(200)