@@ -26,14 +26,19 @@ const {
2626 Error,
2727 MathMin,
2828 NumberIsFinite,
29+ ObjectGetPrototypeOf,
2930 ObjectKeys,
3031 ObjectSetPrototypeOf,
32+ Proxy,
3133 ReflectApply,
34+ ReflectGet,
35+ ReflectSet,
3236 Symbol,
3337 SymbolAsyncDispose,
3438 SymbolFor,
3539} = primordials ;
3640
41+ const { Duplex } = require ( 'stream' ) ;
3742const net = require ( 'net' ) ;
3843const EE = require ( 'events' ) ;
3944const assert = require ( 'internal/assert' ) ;
@@ -43,6 +48,7 @@ const {
4348 continueExpression,
4449 chunkExpression,
4550 kIncomingMessage,
51+ kSocket,
4652 HTTPParser,
4753 isLenient,
4854 _checkInvalidHeaderChar : checkInvalidHeaderChar ,
@@ -106,6 +112,7 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');
106112
107113const kServerResponse = Symbol ( 'ServerResponse' ) ;
108114const kServerResponseStatistics = Symbol ( 'ServerResponseStatistics' ) ;
115+ const kUpgradeStream = Symbol ( 'UpgradeStream' ) ;
109116
110117const {
111118 hasObserver,
@@ -946,43 +953,159 @@ function socketOnError(e) {
946953 }
947954}
948955
956+ class UpgradeStream extends Duplex {
957+ constructor ( socket , req ) {
958+ super ( {
959+ allowHalfOpen : socket . allowHalfOpen ,
960+ } ) ;
961+
962+ this [ kSocket ] = socket ;
963+ this [ kIncomingMessage ] = req ;
964+
965+ // Proxy error, end & closure events immediately.
966+ socket . on ( 'error' , ( err ) => this . destroy ( err ) ) ;
967+ this . on ( 'error' , ( err ) => socket . destroy ( err ) ) ;
968+
969+ socket . on ( 'close' , ( ) => this . destroy ( ) ) ;
970+ this . on ( 'close' , ( ) => socket . destroy ( ) ) ;
971+
972+ socket . on ( 'end' , ( ) => {
973+ this . push ( null ) ;
974+
975+ // Match the socket behaviour, where 'end' will fire despite no 'data'
976+ // listeners if a socket with no pending data ends:
977+ if ( this . readableLength === 0 ) {
978+ this . resume ( ) ;
979+ }
980+ } ) ;
981+
982+ // Other events (most notably, reading) all only
983+ // activate after requestBodyCompleted is called.
984+
985+ // This stream wraps itself in a proxy which forwards all non-stream
986+ // property lookups back to the underlying socket, for backward
987+ // compatibiility.
988+ // eslint-disable-next-line no-constructor-return
989+ return new Proxy ( this , {
990+ __proto__ : null ,
991+ has ( target , prop ) {
992+ return prop in target || prop in socket ;
993+ } ,
994+ get ( target , prop , receiver ) {
995+ if ( prop in target ) {
996+ return ReflectGet ( target , prop , receiver ) ;
997+ }
998+
999+ return socket [ prop ] ;
1000+ } ,
1001+ set ( target , prop , value , receiver ) {
1002+ if ( prop in target ) {
1003+ return ReflectSet ( target , prop , value , receiver ) ;
1004+ }
1005+
1006+ socket [ prop ] = value ;
1007+ return true ;
1008+ } ,
1009+ getPrototypeOf ( target ) {
1010+ return ObjectGetPrototypeOf ( socket ) ;
1011+ } ,
1012+ } ) ;
1013+ }
1014+
1015+ requestBodyCompleted ( upgradeHead ) {
1016+ // When the request body is completed, we begin streaming all the
1017+ // post-body data for the upgraded protocol:
1018+ if ( upgradeHead ?. length > 0 ) {
1019+ if ( ! this . push ( upgradeHead ) ) {
1020+ this [ kSocket ] . pause ( ) ;
1021+ }
1022+ }
1023+
1024+ this [ kSocket ] . on ( 'data' , ( data ) => {
1025+ if ( ! this . push ( data ) ) {
1026+ this [ kSocket ] . pause ( ) ;
1027+ }
1028+ } ) ;
1029+ }
1030+
1031+ _read ( size ) {
1032+ // Reading the upgrade stream starts the request stream flowing. It's
1033+ // important that this happens, even if there are no listeners, or it
1034+ // would be impossible to read this without explicitly reading all the
1035+ // request body first, which is backward incompatible & awkward.
1036+ this [ kIncomingMessage ] . resume ( ) ;
1037+
1038+ this [ kSocket ] . resume ( ) ;
1039+ }
1040+
1041+ _final ( callback ) {
1042+ this [ kSocket ] . end ( callback ) ;
1043+ }
1044+
1045+ _write ( chunk , encoding , callback ) {
1046+ this [ kSocket ] . write ( chunk , encoding , callback ) ;
1047+ }
1048+ }
1049+
9491050function onParserExecuteCommon ( server , socket , parser , state , ret , d ) {
9501051 if ( ret instanceof Error ) {
9511052 prepareError ( ret , parser , d ) ;
9521053 debug ( 'parse error' , ret ) ;
9531054 socketOnError . call ( socket , ret ) ;
9541055 } else if ( parser . incoming ?. upgrade ) {
955- if ( ! parser . incoming . complete ) {
956- // The request is still incomplete - this means it's an upgrade with a request
957- // body that hasn't yet been received. We have to wait for the body.
958- return ;
959- }
960-
9611056 // Upgrade or CONNECT
9621057 const req = parser . incoming ;
9631058 debug ( 'SERVER upgrade or connect' , req . method ) ;
1059+ const eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
9641060
965- d ||= parser . getCurrentBuffer ( ) ;
1061+ let upgradeStream ;
1062+ if ( req . complete ) {
1063+ d ||= parser . getCurrentBuffer ( ) ;
1064+
1065+ socket . removeListener ( 'data' , state . onData ) ;
1066+ socket . removeListener ( 'end' , state . onEnd ) ;
1067+ socket . removeListener ( 'close' , state . onClose ) ;
1068+ socket . removeListener ( 'drain' , state . onDrain ) ;
1069+ socket . removeListener ( 'error' , socketOnError ) ;
1070+ socket . removeListener ( 'timeout' , socketOnTimeout ) ;
1071+
1072+ unconsume ( parser , socket ) ;
1073+ parser . finish ( ) ;
1074+ freeParser ( parser , req , socket ) ;
1075+ parser = null ;
1076+
1077+ // If the request is complete (no body, or all body read upfront) then
1078+ // we just emit the socket directly as the upgrade stream.
1079+ upgradeStream = socket ;
1080+ } else {
1081+ // If the body hasn't been fully parsed yet, we emit immediately but
1082+ // we add a wrapper around the socket to not expose incoming data
1083+ // until the request body has finished.
1084+
1085+ if ( socket [ kUpgradeStream ] ) {
1086+ // We've already emitted the incomplete upgrade - nothing do to
1087+ // until actual body parsing completion.
1088+ return ;
1089+ }
9661090
967- socket . removeListener ( 'data' , state . onData ) ;
968- socket . removeListener ( 'end' , state . onEnd ) ;
969- socket . removeListener ( 'close' , state . onClose ) ;
970- socket . removeListener ( 'drain' , state . onDrain ) ;
971- socket . removeListener ( 'error' , socketOnError ) ;
972- socket . removeListener ( 'timeout' , socketOnTimeout ) ;
973- unconsume ( parser , socket ) ;
974- parser . finish ( ) ;
975- freeParser ( parser , req , socket ) ;
976- parser = null ;
1091+ d ||= Buffer . from ( [ ] ) ;
1092+
1093+ upgradeStream = new UpgradeStream ( socket , req ) ;
1094+ socket [ kUpgradeStream ] = upgradeStream ;
1095+ }
9771096
978- const eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
9791097 if ( server . listenerCount ( eventName ) > 0 ) {
9801098 debug ( 'SERVER have listener for %s' , eventName ) ;
981- const bodyHead = d . slice ( ret , d . length ) ;
9821099
983- socket . readableFlowing = null ;
1100+ const bodyHead = d . slice ( ret , d . length ) ;
9841101
985- server . emit ( eventName , req , socket , bodyHead ) ;
1102+ if ( req . complete && socket [ kUpgradeStream ] ) {
1103+ // Previously emitted, now completed - just activate the stream
1104+ socket [ kUpgradeStream ] . requestBodyCompleted ( bodyHead ) ;
1105+ } else {
1106+ socket . readableFlowing = null ;
1107+ server . emit ( eventName , req , upgradeStream , bodyHead ) ;
1108+ }
9861109 } else {
9871110 // Got upgrade or CONNECT method, but have no handler.
9881111 socket . destroy ( ) ;
0 commit comments