Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
newRequest,
Request as LightweightRequest,
toRequestError,
bodyCancelledKey,
} from './request'
import { cacheKey, Response as LightweightResponse } from './response'
import type { InternalCache } from './response'
Expand Down Expand Up @@ -196,6 +197,8 @@ export const getRequestListener = (
req[abortControllerKey].abort(incoming.errored.toString())
} else if (!outgoing.writableFinished) {
req[abortControllerKey].abort('Client connection prematurely closed.')
} else if (req[bodyCancelledKey]) {
incoming.destroy(new Error('The request ended before the request body was consumed.'))
}
})

Expand Down
46 changes: 33 additions & 13 deletions src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { Http2ServerRequest } from 'node:http2'
import { Readable } from 'node:stream'
import type { TLSSocket } from 'node:tls'

export const bodyCancelledKey = Symbol('bodyCancelledKey')

export class RequestError extends Error {
constructor(
message: string,
Expand Down Expand Up @@ -41,12 +43,19 @@ export class Request extends GlobalRequest {
}
}

const newRequestFromIncoming = (
method: string,
url: string,
incoming: IncomingMessage | Http2ServerRequest,
abortController: AbortController
): Request => {
const newRequestFromIncoming = (originalRequest: Record<string | symbol, any>): Request => {
const {
method,
[urlKey]: url,
[incomingKey]: incoming,
[abortControllerKey]: abortController,
} = originalRequest as {
method: string
[urlKey]: string
[incomingKey]: IncomingMessage | Http2ServerRequest
[abortControllerKey]: AbortController
}

const headerRecord: [string, string][] = []
const rawHeaders = incoming.rawHeaders
for (let i = 0; i < rawHeaders.length; i += 2) {
Expand Down Expand Up @@ -85,7 +94,23 @@ const newRequestFromIncoming = (
})
} else {
// lazy-consume request body
init.body = Readable.toWeb(incoming) as ReadableStream<Uint8Array>
init.body = new ReadableStream({
async start(controller) {
const stream = Readable.toWeb(incoming) as ReadableStream<Uint8Array>

try {
for await (const chunk of stream) {
controller.enqueue(chunk)
}
controller.close()
} catch (error: unknown) {
controller.error(error)
}
},
cancel() {
originalRequest[bodyCancelledKey] = true
},
})
}
}

Expand Down Expand Up @@ -115,12 +140,7 @@ const requestPrototype: Record<string | symbol, any> = {

[getRequestCache]() {
this[abortControllerKey] ||= new AbortController()
return (this[requestCache] ||= newRequestFromIncoming(
this.method,
this[urlKey],
this[incomingKey],
this[abortControllerKey]
))
return (this[requestCache] ||= newRequestFromIncoming(this))
},
}
;[
Expand Down
88 changes: 88 additions & 0 deletions test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,40 @@
app.post('/posts', (c) => {
return c.redirect('/posts')
})
app.post('/no-body-consumed', (c) => {
if (!c.req.raw.body) {
// force create new request object
throw new Error('No body consumed')
}
return c.text('No body consumed')
})
app.post('/body-cancelled', (c) => {
if (!c.req.raw.body) {
// force create new request object
throw new Error('No body consumed')
}
c.req.raw.body.cancel()
return c.text('No body consumed')
})
app.post('/partially-consumed', async (c) => {
if (!c.req.raw.body) {
// force create new request object
throw new Error('No body consumed')
}
const reader = c.req.raw.body.getReader()
await reader.read() // read only one chunk
return c.text('No body consumed')
})
app.post('/partially-consumed-and-cancelled', async (c) => {
if (!c.req.raw.body) {
// force create new request object
throw new Error('No body consumed')
}
const reader = c.req.raw.body.getReader()
await reader.read() // read only one chunk
reader.cancel()
return c.text('No body consumed')
})
app.delete('/posts/:id', (c) => {
return c.text(`DELETE ${c.req.param('id')}`)
})
Expand Down Expand Up @@ -82,6 +116,60 @@
expect(res.headers['location']).toBe('/posts')
})

it('Should return 200 response - POST /no-body-consumed', async () => {
const res = await request(server).post('/no-body-consumed').send('')
expect(res.status).toBe(200)
expect(res.text).toBe('No body consumed')
})

it('Should return 200 response - POST /body-cancelled', async () => {
const res = await request(server).post('/body-cancelled').send('')
expect(res.status).toBe(200)
expect(res.text).toBe('No body consumed')
})

it('Should return 200 response - POST /partially-consumed', async () => {
const buffer = Buffer.alloc(1024 * 10) // large buffer
const res = await new Promise<any>((resolve, reject) => {

Check warning on line 133 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (18.x)

Unexpected any. Specify a different type

Check warning on line 133 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (20.x)

Unexpected any. Specify a different type

Check warning on line 133 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (22.x)

Unexpected any. Specify a different type
const req = request(server)
.post('/partially-consumed')
.set('Content-Length', buffer.length.toString())

req.write(buffer)
req.end((err, res) => {
if (err) {
reject(err)
} else {
resolve(res)
}
})
})

expect(res.status).toBe(200)
expect(res.text).toBe('No body consumed')
})

it('Should return 200 response - POST /partially-consumed-and-cancelled', async () => {
const buffer = Buffer.alloc(1) // A large buffer will not make the test go far, so keep it small because it won't go far.
const res = await new Promise<any>((resolve, reject) => {

Check warning on line 154 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (18.x)

Unexpected any. Specify a different type

Check warning on line 154 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (20.x)

Unexpected any. Specify a different type

Check warning on line 154 in test/server.test.ts

View workflow job for this annotation

GitHub Actions / ci (22.x)

Unexpected any. Specify a different type
const req = request(server)
.post('/partially-consumed-and-cancelled')
.set('Content-Length', buffer.length.toString())

req.write(buffer)
req.end((err, res) => {
if (err) {
reject(err)
} else {
resolve(res)
}
})
})

expect(res.status).toBe(200)
expect(res.text).toBe('No body consumed')
})

it('Should return 201 response - DELETE /posts/123', async () => {
const res = await request(server).delete('/posts/123')
expect(res.status).toBe(200)
Expand Down