@@ -26,14 +26,19 @@ const {
2626 Error,
2727 MathMin,
2828 NumberIsFinite,
29+ ObjectGetPrototypeOf,
2930 ObjectKeys,
3031 ObjectSetPrototypeOf,
32+ Proxy,
3133 ReflectApply,
34+ ReflectGet,
35+ ReflectSet,
3236 Symbol,
3337 SymbolAsyncDispose,
3438 SymbolFor,
3539} = primordials ;
3640
41+ const { Duplex } = require ( 'stream' ) ;
3742const net = require ( 'net' ) ;
3843const EE = require ( 'events' ) ;
3944const assert = require ( 'internal/assert' ) ;
@@ -43,6 +48,7 @@ const {
4348 continueExpression,
4449 chunkExpression,
4550 kIncomingMessage,
51+ kSocket,
4652 HTTPParser,
4753 isLenient,
4854 _checkInvalidHeaderChar : checkInvalidHeaderChar ,
@@ -106,6 +112,7 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');
106112
107113const kServerResponse = Symbol ( 'ServerResponse' ) ;
108114const kServerResponseStatistics = Symbol ( 'ServerResponseStatistics' ) ;
115+ const kUpgradeStream = Symbol ( 'UpgradeStream' ) ;
109116
110117const kOptimizeEmptyRequests = Symbol ( 'OptimizeEmptyRequestsOption' ) ;
111118
@@ -953,43 +960,159 @@ function socketOnError(e) {
953960 }
954961}
955962
963+ class UpgradeStream extends Duplex {
964+ constructor ( socket , req ) {
965+ super ( {
966+ allowHalfOpen : socket . allowHalfOpen ,
967+ } ) ;
968+
969+ this [ kSocket ] = socket ;
970+ this [ kIncomingMessage ] = req ;
971+
972+ // Proxy error, end & closure events immediately.
973+ socket . on ( 'error' , ( err ) => this . destroy ( err ) ) ;
974+ this . on ( 'error' , ( err ) => socket . destroy ( err ) ) ;
975+
976+ socket . on ( 'close' , ( ) => this . destroy ( ) ) ;
977+ this . on ( 'close' , ( ) => socket . destroy ( ) ) ;
978+
979+ socket . on ( 'end' , ( ) => {
980+ this . push ( null ) ;
981+
982+ // Match the socket behaviour, where 'end' will fire despite no 'data'
983+ // listeners if a socket with no pending data ends:
984+ if ( this . readableLength === 0 ) {
985+ this . resume ( ) ;
986+ }
987+ } ) ;
988+
989+ // Other events (most notably, reading) all only
990+ // activate after requestBodyCompleted is called.
991+
992+ // This stream wraps itself in a proxy which forwards all non-stream
993+ // property lookups back to the underlying socket, for backward
994+ // compatibiility.
995+ // eslint-disable-next-line no-constructor-return
996+ return new Proxy ( this , {
997+ __proto__ : null ,
998+ has ( target , prop ) {
999+ return prop in target || prop in socket ;
1000+ } ,
1001+ get ( target , prop , receiver ) {
1002+ if ( prop in target ) {
1003+ return ReflectGet ( target , prop , receiver ) ;
1004+ }
1005+
1006+ return socket [ prop ] ;
1007+ } ,
1008+ set ( target , prop , value , receiver ) {
1009+ if ( prop in target ) {
1010+ return ReflectSet ( target , prop , value , receiver ) ;
1011+ }
1012+
1013+ socket [ prop ] = value ;
1014+ return true ;
1015+ } ,
1016+ getPrototypeOf ( target ) {
1017+ return ObjectGetPrototypeOf ( socket ) ;
1018+ } ,
1019+ } ) ;
1020+ }
1021+
1022+ requestBodyCompleted ( upgradeHead ) {
1023+ // When the request body is completed, we begin streaming all the
1024+ // post-body data for the upgraded protocol:
1025+ if ( upgradeHead ?. length > 0 ) {
1026+ if ( ! this . push ( upgradeHead ) ) {
1027+ this [ kSocket ] . pause ( ) ;
1028+ }
1029+ }
1030+
1031+ this [ kSocket ] . on ( 'data' , ( data ) => {
1032+ if ( ! this . push ( data ) ) {
1033+ this [ kSocket ] . pause ( ) ;
1034+ }
1035+ } ) ;
1036+ }
1037+
1038+ _read ( size ) {
1039+ // Reading the upgrade stream starts the request stream flowing. It's
1040+ // important that this happens, even if there are no listeners, or it
1041+ // would be impossible to read this without explicitly reading all the
1042+ // request body first, which is backward incompatible & awkward.
1043+ this [ kIncomingMessage ] . resume ( ) ;
1044+
1045+ this [ kSocket ] . resume ( ) ;
1046+ }
1047+
1048+ _final ( callback ) {
1049+ this [ kSocket ] . end ( callback ) ;
1050+ }
1051+
1052+ _write ( chunk , encoding , callback ) {
1053+ this [ kSocket ] . write ( chunk , encoding , callback ) ;
1054+ }
1055+ }
1056+
9561057function onParserExecuteCommon ( server , socket , parser , state , ret , d ) {
9571058 if ( ret instanceof Error ) {
9581059 prepareError ( ret , parser , d ) ;
9591060 debug ( 'parse error' , ret ) ;
9601061 socketOnError . call ( socket , ret ) ;
9611062 } else if ( parser . incoming ?. upgrade ) {
962- if ( ! parser . incoming . complete ) {
963- // The request is still incomplete - this means it's an upgrade with a request
964- // body that hasn't yet been received. We have to wait for the body.
965- return ;
966- }
967-
9681063 // Upgrade or CONNECT
9691064 const req = parser . incoming ;
9701065 debug ( 'SERVER upgrade or connect' , req . method ) ;
1066+ const eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
9711067
972- d ||= parser . getCurrentBuffer ( ) ;
1068+ let upgradeStream ;
1069+ if ( req . complete ) {
1070+ d ||= parser . getCurrentBuffer ( ) ;
1071+
1072+ socket . removeListener ( 'data' , state . onData ) ;
1073+ socket . removeListener ( 'end' , state . onEnd ) ;
1074+ socket . removeListener ( 'close' , state . onClose ) ;
1075+ socket . removeListener ( 'drain' , state . onDrain ) ;
1076+ socket . removeListener ( 'error' , socketOnError ) ;
1077+ socket . removeListener ( 'timeout' , socketOnTimeout ) ;
1078+
1079+ unconsume ( parser , socket ) ;
1080+ parser . finish ( ) ;
1081+ freeParser ( parser , req , socket ) ;
1082+ parser = null ;
1083+
1084+ // If the request is complete (no body, or all body read upfront) then
1085+ // we just emit the socket directly as the upgrade stream.
1086+ upgradeStream = socket ;
1087+ } else {
1088+ // If the body hasn't been fully parsed yet, we emit immediately but
1089+ // we add a wrapper around the socket to not expose incoming data
1090+ // until the request body has finished.
1091+
1092+ if ( socket [ kUpgradeStream ] ) {
1093+ // We've already emitted the incomplete upgrade - nothing do to
1094+ // until actual body parsing completion.
1095+ return ;
1096+ }
9731097
974- socket . removeListener ( 'data' , state . onData ) ;
975- socket . removeListener ( 'end' , state . onEnd ) ;
976- socket . removeListener ( 'close' , state . onClose ) ;
977- socket . removeListener ( 'drain' , state . onDrain ) ;
978- socket . removeListener ( 'error' , socketOnError ) ;
979- socket . removeListener ( 'timeout' , socketOnTimeout ) ;
980- unconsume ( parser , socket ) ;
981- parser . finish ( ) ;
982- freeParser ( parser , req , socket ) ;
983- parser = null ;
1098+ d ||= Buffer . from ( [ ] ) ;
1099+
1100+ upgradeStream = new UpgradeStream ( socket , req ) ;
1101+ socket [ kUpgradeStream ] = upgradeStream ;
1102+ }
9841103
985- const eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
9861104 if ( server . listenerCount ( eventName ) > 0 ) {
9871105 debug ( 'SERVER have listener for %s' , eventName ) ;
988- const bodyHead = d . slice ( ret , d . length ) ;
9891106
990- socket . readableFlowing = null ;
1107+ const bodyHead = d . slice ( ret , d . length ) ;
9911108
992- server . emit ( eventName , req , socket , bodyHead ) ;
1109+ if ( req . complete && socket [ kUpgradeStream ] ) {
1110+ // Previously emitted, now completed - just activate the stream
1111+ socket [ kUpgradeStream ] . requestBodyCompleted ( bodyHead ) ;
1112+ } else {
1113+ socket . readableFlowing = null ;
1114+ server . emit ( eventName , req , upgradeStream , bodyHead ) ;
1115+ }
9931116 } else {
9941117 // Got upgrade or CONNECT method, but have no handler.
9951118 socket . destroy ( ) ;
0 commit comments