Skip to content

Commit 1c9e1a4

Browse files
usualomayusukebe
andauthored
feat: Clean up the incoming object if the request is not completely finished. (#252)
* feat: Clean up the incoming object if the request is not completely finished. * feat: add autoCleanupIncoming option to disable auto cleanup of incoming request body * test: add test cases for partially consumed and cancelled request body * test: added test for incomplete requests with low-level sockets, a pattern that causes automatic cleanup. * test: ignore test/app.ts in jest.config.js * test: skip test for partially consumed pattern in v18 * fix: typo * docs: fix typo * fix: eliminate environment dependence of type loading Co-authored-by: Yusuke Wada <[email protected]> --------- Co-authored-by: Yusuke Wada <[email protected]>
1 parent 7f327f5 commit 1c9e1a4

File tree

9 files changed

+778
-39
lines changed

9 files changed

+778
-39
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,18 @@ serve({
125125
})
126126
```
127127

128+
### `autoCleanupIncoming`
129+
130+
The default value is `true`. The Node.js Adapter automatically cleans up (explicitly call `destroy()` method) if application is not finished to consume the incoming request. If you don't want to do that, set `false`.
131+
If the application accepts connections from arbitrary clients, this cleanup must be done otherwise incomplete requests from clients may cause the application to stop responding. If your application only accepts connections from trusted clients, such as in a reverse proxy environment and there is no process that returns a response without reading the body of the POST request all the way through, you can improve performance by setting it to `false`.
132+
133+
```ts
134+
serve({
135+
fetch: app.fetch,
136+
autoCleanupIncoming: false,
137+
})
138+
```
139+
128140
## Middleware
129141

130142
Most built-in middleware also works with Node.js.

jest.config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module.exports = {
22
testMatch: ['**/test/**/*.+(ts)', '**/src/**/(*.)+(test).+(ts)'],
3-
modulePathIgnorePatterns: ["test/setup.ts"],
3+
modulePathIgnorePatterns: ["test/setup.ts", "test/app.ts"],
44
transform: {
55
'^.+\\.(ts)$': 'ts-jest',
66
},

src/listener.ts

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import type { IncomingMessage, ServerResponse, OutgoingHttpHeaders } from 'node:http'
2-
import type { Http2ServerRequest, Http2ServerResponse } from 'node:http2'
2+
import { Http2ServerRequest } from 'node:http2'
3+
import type { Http2ServerResponse } from 'node:http2'
4+
import type { IncomingMessageWithWrapBodyStream } from './request'
35
import {
46
abortControllerKey,
57
newRequest,
68
Request as LightweightRequest,
9+
wrapBodyStream,
710
toRequestError,
811
} from './request'
912
import { cacheKey, Response as LightweightResponse } from './response'
@@ -13,6 +16,11 @@ import { writeFromReadableStream, buildOutgoingHttpHeaders } from './utils'
1316
import { X_ALREADY_SENT } from './utils/response/constants'
1417
import './globals'
1518

19+
const outgoingEnded = Symbol('outgoingEnded')
20+
type OutgoingHasOutgoingEnded = Http2ServerResponse & {
21+
[outgoingEnded]?: () => void
22+
}
23+
1624
const regBuffer = /^no$/i
1725
const regContentType = /^(application\/json\b|text\/(?!event-stream\b))/i
1826

@@ -78,10 +86,12 @@ const responseViaCache = async (
7886
outgoing.end(new Uint8Array(await body.arrayBuffer()))
7987
} else {
8088
flushHeaders(outgoing)
81-
return writeFromReadableStream(body, outgoing)?.catch(
89+
await writeFromReadableStream(body, outgoing)?.catch(
8290
(e) => handleResponseError(e, outgoing) as undefined
8391
)
8492
}
93+
94+
;(outgoing as OutgoingHasOutgoingEnded)[outgoingEnded]?.()
8595
}
8696

8797
const responseViaResponseObject = async (
@@ -154,6 +164,8 @@ const responseViaResponseObject = async (
154164
outgoing.writeHead(res.status, resHeaderRecord)
155165
outgoing.end()
156166
}
167+
168+
;(outgoing as OutgoingHasOutgoingEnded)[outgoingEnded]?.()
157169
}
158170

159171
export const getRequestListener = (
@@ -162,8 +174,10 @@ export const getRequestListener = (
162174
hostname?: string
163175
errorHandler?: CustomErrorHandler
164176
overrideGlobalObjects?: boolean
177+
autoCleanupIncoming?: boolean
165178
} = {}
166179
) => {
180+
const autoCleanupIncoming = options.autoCleanupIncoming ?? true
167181
if (options.overrideGlobalObjects !== false && global.Request !== LightweightRequest) {
168182
Object.defineProperty(global, 'Request', {
169183
value: LightweightRequest,
@@ -185,17 +199,59 @@ export const getRequestListener = (
185199
// so generate a pseudo Request object with only the minimum required information.
186200
req = newRequest(incoming, options.hostname)
187201

202+
let incomingEnded =
203+
!autoCleanupIncoming || incoming.method === 'GET' || incoming.method === 'HEAD'
204+
if (!incomingEnded) {
205+
;(incoming as IncomingMessageWithWrapBodyStream)[wrapBodyStream] = true
206+
incoming.on('end', () => {
207+
incomingEnded = true
208+
})
209+
210+
if (incoming instanceof Http2ServerRequest) {
211+
// a Http2ServerResponse instance requires additional processing on exit
212+
// since outgoing.on('close') is not called even after outgoing.end() is called
213+
// when the state is incomplete
214+
;(outgoing as OutgoingHasOutgoingEnded)[outgoingEnded] = () => {
215+
// incoming is not consumed to the end
216+
if (!incomingEnded) {
217+
setTimeout(() => {
218+
// in the case of a simple POST request, the cleanup process may be done automatically
219+
// and end is called at this point. At that point, nothing is done.
220+
if (!incomingEnded) {
221+
setTimeout(() => {
222+
incoming.destroy()
223+
// a Http2ServerResponse instance will not terminate without also calling outgoing.destroy()
224+
outgoing.destroy()
225+
})
226+
}
227+
})
228+
}
229+
}
230+
}
231+
}
232+
188233
// Detect if request was aborted.
189234
outgoing.on('close', () => {
190235
const abortController = req[abortControllerKey] as AbortController | undefined
191-
if (!abortController) {
192-
return
236+
if (abortController) {
237+
if (incoming.errored) {
238+
req[abortControllerKey].abort(incoming.errored.toString())
239+
} else if (!outgoing.writableFinished) {
240+
req[abortControllerKey].abort('Client connection prematurely closed.')
241+
}
193242
}
194243

195-
if (incoming.errored) {
196-
req[abortControllerKey].abort(incoming.errored.toString())
197-
} else if (!outgoing.writableFinished) {
198-
req[abortControllerKey].abort('Client connection prematurely closed.')
244+
// incoming is not consumed to the end
245+
if (!incomingEnded) {
246+
setTimeout(() => {
247+
// in the case of a simple POST request, the cleanup process may be done automatically
248+
// and end is called at this point. At that point, nothing is done.
249+
if (!incomingEnded) {
250+
setTimeout(() => {
251+
incoming.destroy()
252+
})
253+
}
254+
})
199255
}
200256
})
201257

src/request.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import type { IncomingMessage } from 'node:http'
55
import { Http2ServerRequest } from 'node:http2'
66
import { Readable } from 'node:stream'
7+
import type { ReadableStreamDefaultReader } from 'node:stream/web'
78
import type { TLSSocket } from 'node:tls'
89

910
export class RequestError extends Error {
@@ -41,6 +42,8 @@ export class Request extends GlobalRequest {
4142
}
4243
}
4344

45+
export type IncomingMessageWithWrapBodyStream = IncomingMessage & { [wrapBodyStream]: boolean }
46+
export const wrapBodyStream = Symbol('wrapBodyStream')
4447
const newRequestFromIncoming = (
4548
method: string,
4649
url: string,
@@ -83,6 +86,23 @@ const newRequestFromIncoming = (
8386
controller.close()
8487
},
8588
})
89+
} else if ((incoming as IncomingMessageWithWrapBodyStream)[wrapBodyStream]) {
90+
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined
91+
init.body = new ReadableStream({
92+
async pull(controller) {
93+
try {
94+
reader ||= Readable.toWeb(incoming).getReader()
95+
const { done, value } = await reader.read()
96+
if (done) {
97+
controller.close()
98+
} else {
99+
controller.enqueue(value)
100+
}
101+
} catch (error) {
102+
controller.error(error)
103+
}
104+
},
105+
})
86106
} else {
87107
// lazy-consume request body
88108
init.body = Readable.toWeb(incoming) as ReadableStream<Uint8Array>

src/server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export const createAdaptorServer = (options: Options): ServerType => {
88
const requestListener = getRequestListener(fetchCallback, {
99
hostname: options.hostname,
1010
overrideGlobalObjects: options.overrideGlobalObjects,
11+
autoCleanupIncoming: options.autoCleanupIncoming,
1112
})
1213
// ts will complain about createServerHTTP and createServerHTTP2 not being callable, which works just fine
1314
// eslint-disable-next-line @typescript-eslint/no-explicit-any

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export type ServerOptions =
7070
export type Options = {
7171
fetch: FetchCallback
7272
overrideGlobalObjects?: boolean
73+
autoCleanupIncoming?: boolean
7374
port?: number
7475
hostname?: string
7576
} & ServerOptions

test/app.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { Response as PonyfillResponse } from '@whatwg-node/fetch'
2+
import { Hono } from 'hono'
3+
4+
export const app = new Hono()
5+
6+
app.get('/', (c) => c.text('Hello! Node!'))
7+
app.get('/url', (c) => c.text(c.req.url))
8+
9+
app.get('/posts', (c) => {
10+
return c.text(`Page ${c.req.query('page')}`)
11+
})
12+
app.get('/user-agent', (c) => {
13+
return c.text(c.req.header('user-agent') as string)
14+
})
15+
app.post('/posts', (c) => {
16+
return c.redirect('/posts')
17+
})
18+
app.post('/body-consumed', async (c) => {
19+
return c.text(`Body length: ${(await c.req.text()).length}`)
20+
})
21+
app.post('/no-body-consumed', (c) => {
22+
if (!c.req.raw.body) {
23+
// force create new request object
24+
throw new Error('No body consumed')
25+
}
26+
return c.text('No body consumed')
27+
})
28+
app.post('/body-cancelled', (c) => {
29+
if (!c.req.raw.body) {
30+
// force create new request object
31+
throw new Error('No body consumed')
32+
}
33+
c.req.raw.body.cancel()
34+
return c.text('Body cancelled')
35+
})
36+
app.post('/partially-consumed', async (c) => {
37+
if (!c.req.raw.body) {
38+
// force create new request object
39+
throw new Error('No body consumed')
40+
}
41+
const reader = c.req.raw.body.getReader()
42+
await reader.read() // read only one chunk
43+
return c.text('Partially consumed')
44+
})
45+
app.post('/partially-consumed-and-cancelled', async (c) => {
46+
if (!c.req.raw.body) {
47+
// force create new request object
48+
throw new Error('No body consumed')
49+
}
50+
const reader = c.req.raw.body.getReader()
51+
await reader.read() // read only one chunk
52+
reader.cancel()
53+
return c.text('Partially consumed and cancelled')
54+
})
55+
app.delete('/posts/:id', (c) => {
56+
return c.text(`DELETE ${c.req.param('id')}`)
57+
})
58+
// @ts-expect-error the response is string
59+
app.get('/invalid', () => {
60+
return '<h1>HTML</h1>'
61+
})
62+
app.get('/ponyfill', () => {
63+
return new PonyfillResponse('Pony')
64+
})
65+
66+
app.on('trace', '/', (c) => {
67+
const headers = c.req.raw.headers // build new request object
68+
return c.text(`headers: ${JSON.stringify(headers)}`)
69+
})

test/server.test.ts

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { Response as PonyfillResponse } from '@whatwg-node/fetch'
21
import { Hono } from 'hono'
32
import { basicAuth } from 'hono/basic-auth'
43
import { compress } from 'hono/compress'
@@ -13,37 +12,9 @@ import { GlobalRequest, Request as LightweightRequest, getAbortController } from
1312
import { GlobalResponse, Response as LightweightResponse } from '../src/response'
1413
import { createAdaptorServer, serve } from '../src/server'
1514
import type { HttpBindings } from '../src/types'
15+
import { app } from './app'
1616

1717
describe('Basic', () => {
18-
const app = new Hono()
19-
app.get('/', (c) => c.text('Hello! Node!'))
20-
app.get('/url', (c) => c.text(c.req.url))
21-
22-
app.get('/posts', (c) => {
23-
return c.text(`Page ${c.req.query('page')}`)
24-
})
25-
app.get('/user-agent', (c) => {
26-
return c.text(c.req.header('user-agent') as string)
27-
})
28-
app.post('/posts', (c) => {
29-
return c.redirect('/posts')
30-
})
31-
app.delete('/posts/:id', (c) => {
32-
return c.text(`DELETE ${c.req.param('id')}`)
33-
})
34-
// @ts-expect-error the response is string
35-
app.get('/invalid', () => {
36-
return '<h1>HTML</h1>'
37-
})
38-
app.get('/ponyfill', () => {
39-
return new PonyfillResponse('Pony')
40-
})
41-
42-
app.on('trace', '/', (c) => {
43-
const headers = c.req.raw.headers // build new request object
44-
return c.text(`headers: ${JSON.stringify(headers)}`)
45-
})
46-
4718
const server = createAdaptorServer(app)
4819

4920
it('Should return 200 response - GET /', async () => {
@@ -82,6 +53,60 @@ describe('Basic', () => {
8253
expect(res.headers['location']).toBe('/posts')
8354
})
8455

56+
it('Should return 200 response - POST /no-body-consumed', async () => {
57+
const res = await request(server).post('/no-body-consumed').send('')
58+
expect(res.status).toBe(200)
59+
expect(res.text).toBe('No body consumed')
60+
})
61+
62+
it('Should return 200 response - POST /body-cancelled', async () => {
63+
const res = await request(server).post('/body-cancelled').send('')
64+
expect(res.status).toBe(200)
65+
expect(res.text).toBe('Body cancelled')
66+
})
67+
68+
it('Should return 200 response - POST /partially-consumed', async () => {
69+
const buffer = Buffer.alloc(1024 * 10) // large buffer
70+
const res = await new Promise<any>((resolve, reject) => {
71+
const req = request(server)
72+
.post('/partially-consumed')
73+
.set('Content-Length', buffer.length.toString())
74+
75+
req.write(buffer)
76+
req.end((err, res) => {
77+
if (err) {
78+
reject(err)
79+
} else {
80+
resolve(res)
81+
}
82+
})
83+
})
84+
85+
expect(res.status).toBe(200)
86+
expect(res.text).toBe('Partially consumed')
87+
})
88+
89+
it('Should return 200 response - POST /partially-consumed-and-cancelled', async () => {
90+
const buffer = Buffer.alloc(1) // A large buffer will not make the test go far, so keep it small because it won't go far.
91+
const res = await new Promise<any>((resolve, reject) => {
92+
const req = request(server)
93+
.post('/partially-consumed-and-cancelled')
94+
.set('Content-Length', buffer.length.toString())
95+
96+
req.write(buffer)
97+
req.end((err, res) => {
98+
if (err) {
99+
reject(err)
100+
} else {
101+
resolve(res)
102+
}
103+
})
104+
})
105+
106+
expect(res.status).toBe(200)
107+
expect(res.text).toBe('Partially consumed and cancelled')
108+
})
109+
85110
it('Should return 201 response - DELETE /posts/123', async () => {
86111
const res = await request(server).delete('/posts/123')
87112
expect(res.status).toBe(200)

0 commit comments

Comments
 (0)