11import type { IncomingMessage , ServerResponse , OutgoingHttpHeaders } from 'node:http'
22import { Http2ServerRequest } from 'node:http2'
33import type { Http2ServerResponse } from 'node:http2'
4+ import type { Writable } from 'node:stream'
45import type { IncomingMessageWithWrapBodyStream } from './request'
56import {
67 abortControllerKey ,
@@ -12,7 +13,12 @@ import {
1213import { cacheKey , Response as LightweightResponse } from './response'
1314import type { InternalCache } from './response'
1415import type { CustomErrorHandler , FetchCallback , HttpBindings } from './types'
15- import { writeFromReadableStream , buildOutgoingHttpHeaders } from './utils'
16+ import {
17+ readWithoutBlocking ,
18+ writeFromReadableStream ,
19+ writeFromReadableStreamDefaultReader ,
20+ buildOutgoingHttpHeaders ,
21+ } from './utils'
1622import { X_ALREADY_SENT } from './utils/response/constants'
1723import './globals'
1824
@@ -21,9 +27,6 @@ type OutgoingHasOutgoingEnded = Http2ServerResponse & {
2127 [ outgoingEnded ] ?: ( ) => void
2228}
2329
24- const regBuffer = / ^ n o $ / i
25- const regContentType = / ^ ( a p p l i c a t i o n \/ j s o n \b | t e x t \/ (? ! e v e n t - s t r e a m \b ) ) / i
26-
2730const handleRequestError = ( ) : Response =>
2831 new Response ( null , {
2932 status : 400 ,
@@ -122,41 +125,51 @@ const responseViaResponseObject = async (
122125 const resHeaderRecord : OutgoingHttpHeaders = buildOutgoingHttpHeaders ( res . headers )
123126
124127 if ( res . body ) {
125- /**
126- * If content-encoding is set, we assume that the response should be not decoded.
127- * Else if transfer-encoding is set, we assume that the response should be streamed.
128- * Else if content-length is set, we assume that the response content has been taken care of.
129- * Else if x-accel-buffering is set to no, we assume that the response should be streamed.
130- * Else if content-type is not application/json nor text/* but can be text/event-stream,
131- * we assume that the response should be streamed.
132- */
128+ const reader = res . body . getReader ( )
133129
134- const {
135- 'transfer-encoding' : transferEncoding ,
136- 'content-encoding' : contentEncoding ,
137- 'content-length' : contentLength ,
138- 'x-accel-buffering' : accelBuffering ,
139- 'content-type' : contentType ,
140- } = resHeaderRecord
130+ const values : Uint8Array [ ] = [ ]
131+ let done = false
132+ let currentReadPromise : Promise < ReadableStreamReadResult < Uint8Array > > | undefined = undefined
141133
142- if (
143- transferEncoding ||
144- contentEncoding ||
145- contentLength ||
146- // nginx buffering variant
147- ( accelBuffering && regBuffer . test ( accelBuffering as string ) ) ||
148- ! regContentType . test ( contentType as string )
149- ) {
150- outgoing . writeHead ( res . status , resHeaderRecord )
151- flushHeaders ( outgoing )
134+ // In the case of synchronous responses, usually a maximum of two readings is done
135+ for ( let i = 0 ; i < 2 ; i ++ ) {
136+ currentReadPromise = reader . read ( )
137+ const chunk = await readWithoutBlocking ( currentReadPromise ) . catch ( ( e ) => {
138+ console . error ( e )
139+ done = true
140+ } )
141+ if ( ! chunk ) {
142+ // Error occurred or currentReadPromise is not yet resolved.
143+ // If an error occurs, immediately break the loop.
144+ // If currentReadPromise is not yet resolved, pass it to writeFromReadableStreamDefaultReader.
145+ break
146+ }
147+ currentReadPromise = undefined
152148
153- await writeFromReadableStream ( res . body , outgoing )
154- } else {
155- const buffer = await res . arrayBuffer ( )
156- resHeaderRecord [ 'content-length' ] = buffer . byteLength
149+ if ( chunk . value ) {
150+ values . push ( chunk . value )
151+ }
152+ if ( chunk . done ) {
153+ done = true
154+ break
155+ }
156+ }
157+
158+ if ( done && ! ( 'content-length' in resHeaderRecord ) ) {
159+ resHeaderRecord [ 'content-length' ] = values . reduce ( ( acc , value ) => acc + value . length , 0 )
160+ }
157161
158- outgoing . writeHead ( res . status , resHeaderRecord )
159- outgoing . end ( new Uint8Array ( buffer ) )
162+ outgoing . writeHead ( res . status , resHeaderRecord )
163+ values . forEach ( ( value ) => {
164+ ; ( outgoing as Writable ) . write ( value )
165+ } )
166+ if ( done ) {
167+ outgoing . end ( )
168+ } else {
169+ if ( values . length === 0 ) {
170+ flushHeaders ( outgoing )
171+ }
172+ await writeFromReadableStreamDefaultReader ( reader , outgoing , currentReadPromise )
160173 }
161174 } else if ( resHeaderRecord [ X_ALREADY_SENT ] ) {
162175 // do nothing, the response has already been sent
0 commit comments