@@ -17,7 +17,16 @@ import {
1717 type ServerResponse as Response ,
1818} from "node:http" ;
1919import { type Socket } from "node:net" ;
20- import type { ErrorCallback , NormalizedServerOptions , NormalizeProxyTarget , ProxyServer , ProxyTarget , ProxyTargetUrl , ServerOptions } from ".." ;
20+ import type {
21+ ErrorCallback ,
22+ NormalizedServerOptions ,
23+ NormalizeProxyTarget ,
24+ ProxyServer ,
25+ ProxyTarget ,
26+ ProxyTargetUrl ,
27+ ServerOptions ,
28+ } from ".." ;
29+ import { Dispatcher , request , stream as uStream , Client } from "undici" ;
2130
2231export type ProxyResponse = Request & {
2332 headers : { [ key : string ] : string | string [ ] } ;
@@ -73,44 +82,132 @@ export function XHeaders(req: Request, _res: Response, options: ServerOptions) {
7382// Does the actual proxying. If `forward` is enabled fires up
7483// a ForwardStream (there is NO RESPONSE), same happens for ProxyStream. The request
7584// just dies otherwise.
76- export function stream ( req : Request , res : Response , options : NormalizedServerOptions , _ : Buffer | undefined , server : ProxyServer , cb : ErrorCallback | undefined ) {
85+ export async function stream (
86+ req : Request ,
87+ res : Response ,
88+ options : NormalizedServerOptions ,
89+ _ : Buffer | undefined ,
90+ server : ProxyServer ,
91+ cb : ErrorCallback | undefined ,
92+ ) {
7793 // And we begin!
7894 server . emit ( "start" , req , res , options . target || options . forward ! ) ;
7995
8096 const agents = options . followRedirects ? followRedirects : nativeAgents ;
81- const http = agents . http as typeof import ( ' http' ) ;
82- const https = agents . https as typeof import ( ' https' ) ;
97+ const http = agents . http as typeof import ( " http" ) ;
98+ const https = agents . https as typeof import ( " https" ) ;
8399
84100 if ( options . forward ) {
85- // forward enabled, so just pipe the request
86- const proto = options . forward . protocol === "https:" ? https : http ;
87101 const outgoingOptions = common . setupOutgoing (
88102 options . ssl || { } ,
89103 options ,
90104 req ,
91105 "forward" ,
92106 ) ;
93- const forwardReq = proto . request ( outgoingOptions ) ;
94107
95- // error handler (e.g. ECONNRESET, ECONNREFUSED)
96- // Handle errors on incoming request as well as it makes sense to
97- const forwardError = createErrorHandler ( forwardReq , options . forward ) ;
98- req . on ( "error" , forwardError ) ;
99- forwardReq . on ( "error" , forwardError ) ;
108+ const targetUrl = `${ outgoingOptions . url } ` ;
109+
110+ const undiciOptions : any = {
111+ method : outgoingOptions . method as Dispatcher . HttpMethod ,
112+ headers : outgoingOptions . headers ,
113+ path : outgoingOptions . path ,
114+ } ;
115+
116+ // Handle request body
117+ if ( options . buffer ) {
118+ undiciOptions . body = options . buffer ;
119+ } else if ( req . method !== "GET" && req . method !== "HEAD" ) {
120+ undiciOptions . body = req ;
121+ }
122+
123+ try {
124+ const client = new Client ( targetUrl ) ;
125+ await client . request ( undiciOptions ) ;
126+ } catch ( err ) {
127+ if ( cb ) {
128+ cb ( err as Error , req , res , options . forward ) ;
129+ } else {
130+ server . emit ( "error" , err as Error , req , res , options . forward ) ;
131+ }
132+ }
100133
101- ( options . buffer || req ) . pipe ( forwardReq ) ;
102134 if ( ! options . target ) {
103- // no target, so we do not send anything back to the client.
104- // If target is set, we do a separate proxy below, which might be to a
105- // completely different server.
106135 return res . end ( ) ;
107136 }
108137 }
109138
110139 // Request initalization
111- const proto = options . target ! . protocol === "https:" ? https : http ;
112140 const outgoingOptions = common . setupOutgoing ( options . ssl || { } , options , req ) ;
113- const proxyReq = proto . request ( outgoingOptions ) ;
141+ const client = new Client ( outgoingOptions . url , {
142+ allowH2 : req . httpVersionMajor === 2 ,
143+ } ) ;
144+ // const proxyReq = proto.request(outgoingOptions);
145+
146+ const dispatchOptions : Dispatcher . DispatchOptions = {
147+ method : outgoingOptions . method as Dispatcher . HttpMethod ,
148+ path : outgoingOptions . path || "/" ,
149+ headers : outgoingOptions . headers ,
150+
151+ body :
152+ options . buffer ||
153+ ( req . method !== "GET" && req . method !== "HEAD" ? req : undefined ) ,
154+ } ;
155+
156+ let responseStarted = false ;
157+
158+ client . dispatch ( dispatchOptions , {
159+ onRequestStart ( controller , context ) {
160+ // Can modify the request just before headers are sent
161+ console . log ( "onRequestStart" ) ;
162+ } ,
163+ onResponseStart ( controller , statusCode , headers , statusMessage ) {
164+ // Set response status and headers - crucial for SSE
165+ res . statusCode = statusCode ;
166+
167+ // Set headers from the record object
168+ for ( const [ name , value ] of Object . entries ( headers ) ) {
169+ res . setHeader ( name , value ) ;
170+ }
171+
172+ // For SSE, ensure headers are sent immediately
173+ const contentType = headers [ "content-type" ] || headers [ "Content-Type" ] ;
174+ if ( contentType && contentType . toString ( ) . includes ( "text/event-stream" ) ) {
175+ res . flushHeaders ( ) ;
176+ }
177+
178+ responseStarted = true ;
179+ } ,
180+ onResponseError ( controller , err ) {
181+ if (
182+ req . socket . destroyed &&
183+ ( err as NodeJS . ErrnoException ) . code === "ECONNRESET"
184+ ) {
185+ server . emit ( "econnreset" , err , req , res , outgoingOptions . url ) ;
186+ controller . abort ( err ) ;
187+ return ;
188+ }
189+
190+ if ( cb ) {
191+ cb ( err , req , res , outgoingOptions . url ) ;
192+ } else {
193+ server . emit ( "error" , err , req , res , outgoingOptions . url ) ;
194+ }
195+ } ,
196+ onResponseData ( controller , chunk ) {
197+ if ( responseStarted ) {
198+ res . write ( chunk ) ;
199+ }
200+ } ,
201+ onResponseEnd ( controller , trailers ) {
202+ if ( trailers ) {
203+ res . addTrailers ( trailers ) ;
204+ }
205+ res . end ( ) ;
206+ client . close ( ) ;
207+ } ,
208+ } ) ;
209+
210+ return ;
114211
115212 // Enable developers to modify the proxyReq before headers are sent
116213 proxyReq . on ( "socket" , ( socket : Socket ) => {
@@ -140,9 +237,15 @@ export function stream(req: Request, res: Response, options: NormalizedServerOpt
140237 req . on ( "error" , proxyError ) ;
141238 proxyReq . on ( "error" , proxyError ) ;
142239
143- function createErrorHandler ( proxyReq : http . ClientRequest , url : NormalizeProxyTarget < ProxyTargetUrl > ) {
240+ function createErrorHandler (
241+ proxyReq : http . ClientRequest ,
242+ url : NormalizeProxyTarget < ProxyTargetUrl > ,
243+ ) {
144244 return ( err : Error ) => {
145- if ( req . socket . destroyed && ( err as NodeJS . ErrnoException ) . code === "ECONNRESET" ) {
245+ if (
246+ req . socket . destroyed &&
247+ ( err as NodeJS . ErrnoException ) . code === "ECONNRESET"
248+ ) {
146249 server . emit ( "econnreset" , err , req , res , url ) ;
147250 proxyReq . destroy ( ) ;
148251 return ;
@@ -164,7 +267,14 @@ export function stream(req: Request, res: Response, options: NormalizedServerOpt
164267 if ( ! res . headersSent && ! options . selfHandleResponse ) {
165268 for ( const pass of web_o ) {
166269 // note: none of these return anything
167- pass ( req , res as EditableResponse , proxyRes , options as NormalizedServerOptions & { target : NormalizeProxyTarget < ProxyTarget > } ) ;
270+ pass (
271+ req ,
272+ res as EditableResponse ,
273+ proxyRes ,
274+ options as NormalizedServerOptions & {
275+ target : NormalizeProxyTarget < ProxyTarget > ;
276+ } ,
277+ ) ;
168278 }
169279 }
170280
0 commit comments