66 "io"
77 "net/http"
88 "strconv"
9+ "time"
910
1011 "github.com/pkg/errors"
1112 "github.com/rs/zerolog"
@@ -139,7 +140,11 @@ func (p *Proxy) ProxyHTTP(
139140 return fmt .Errorf ("response writer is not a flusher" )
140141 }
141142 rws := connection .NewHTTPResponseReadWriterAcker (w , flusher , req )
142- if err := p .proxyStream (tr .ToTracedContext (), rws , dest , originProxy ); err != nil {
143+ connectedLogger := p .log .Debug ().
144+ Int (management .EventTypeKey , int (management .HTTP )).
145+ Str (LogFieldDestAddr , dest ).
146+ Uint8 (LogFieldConnIndex , tr .ConnIndex )
147+ if err := p .proxyStream (tr .ToTracedContext (), rws , dest , originProxy , connectedLogger ); err != nil {
143148 rule , srv := ruleField (p .ingressRules , ruleNum )
144149 p .logRequestError (err , cfRay , "" , rule , srv )
145150 return err
@@ -173,24 +178,22 @@ func (p *Proxy) ProxyTCP(
173178
174179 tracedCtx := tracing .NewTracedContext (serveCtx , req .CfTraceID , p .log )
175180
176- p .log .Debug ().
177- Int (management .EventTypeKey , int (management .TCP )).
178- Str (LogFieldFlowID , req .FlowID ).
179- Str (LogFieldDestAddr , req .Dest ).
180- Uint8 (LogFieldConnIndex , req .ConnIndex ).
181- Msg ("tcp proxy stream started" )
181+ debugLogger := func () * zerolog.Event {
182+ return p .log .Debug ().
183+ Int (management .EventTypeKey , int (management .TCP )).
184+ Str (LogFieldFlowID , req .FlowID ).
185+ Str (LogFieldDestAddr , req .Dest ).
186+ Uint8 (LogFieldConnIndex , req .ConnIndex )
187+ }
188+
189+ debugLogger ().Msg ("tcp proxy stream started" )
182190
183- if err := p .proxyStream (tracedCtx , rwa , req .Dest , p .warpRouting .Proxy ); err != nil {
191+ if err := p .proxyStream (tracedCtx , rwa , req .Dest , p .warpRouting .Proxy , debugLogger () ); err != nil {
184192 p .logRequestError (err , req .CFRay , req .FlowID , "" , ingress .ServiceWarpRouting )
185193 return err
186194 }
187195
188- p .log .Debug ().
189- Int (management .EventTypeKey , int (management .TCP )).
190- Str (LogFieldFlowID , req .FlowID ).
191- Str (LogFieldDestAddr , req .Dest ).
192- Uint8 (LogFieldConnIndex , req .ConnIndex ).
193- Msg ("tcp proxy stream finished successfully" )
196+ debugLogger ().Msg ("tcp proxy stream finished successfully" )
194197
195198 return nil
196199}
@@ -294,16 +297,21 @@ func (p *Proxy) proxyHTTPRequest(
294297
295298// proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
296299// ingress rule.
300+ // connectedLogger is used to log when the connection is acknowledged
297301func (p * Proxy ) proxyStream (
298302 tr * tracing.TracedContext ,
299303 rwa connection.ReadWriteAcker ,
300304 dest string ,
301305 connectionProxy ingress.StreamBasedOriginProxy ,
306+ connectedLogger * zerolog.Event ,
302307) error {
303308 ctx := tr .Context
304309 _ , connectSpan := tr .Tracer ().Start (ctx , "stream-connect" )
310+
311+ start := time .Now ()
305312 originConn , err := connectionProxy .EstablishConnection (ctx , dest )
306313 if err != nil {
314+ connectStreamErrors .Inc ()
307315 tracing .EndWithErrorStatus (connectSpan , err )
308316 return err
309317 }
@@ -313,9 +321,13 @@ func (p *Proxy) proxyStream(
313321 encodedSpans := tr .GetSpans ()
314322
315323 if err := rwa .AckConnection (encodedSpans ); err != nil {
324+ connectStreamErrors .Inc ()
316325 return err
317326 }
318327
328+ connectLatency .Observe (float64 (time .Since (start ).Milliseconds ()))
329+ connectedLogger .Msg ("proxy stream established" )
330+
319331 originConn .Stream (ctx , rwa , p .log )
320332 return nil
321333}
0 commit comments