99 "encoding/binary"
1010 "encoding/json"
1111 "fmt"
12+
1213 "net"
1314 "os"
1415 "strings"
@@ -126,6 +127,8 @@ type BackendConnManager struct {
126127 closeStatus atomic.Int32
127128 // The last time when the backend is active.
128129 lastActiveTime monotime.Time
130+ // The traffic recorded last time.
131+ inBytes , inPackets , outBytes , outPackets uint64
129132 // cancelFunc is used to cancel the signal processing goroutine.
130133 cancelFunc context.CancelFunc
131134 clientIO * pnet.PacketIO
@@ -179,6 +182,7 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
179182 mgr .quitSource = SrcProxyQuit
180183 return errors .New ("graceful shutdown before connecting" )
181184 }
185+ startTime := monotime .Now ()
182186 err := mgr .authenticator .handshakeFirstTime (ctx , mgr .logger .Named ("authenticator" ), mgr , clientIO , mgr .handshakeHandler , mgr .getBackendIO , frontendTLSConfig , backendTLSConfig )
183187 if err != nil {
184188 src := Error2Source (err )
@@ -191,11 +195,14 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
191195 return err
192196 }
193197 mgr .handshakeHandler .OnHandshake (mgr , mgr .ServerAddr (), nil , SrcNone )
198+ endTime := monotime .Now ()
199+ addHandshakeMetrics (mgr .ServerAddr (), time .Duration (endTime - startTime ))
200+ mgr .updateTraffic (mgr .backendIO .Load ())
194201
195202 mgr .cmdProcessor .capability = mgr .authenticator .capability
196203 childCtx , cancelFunc := context .WithCancel (ctx )
197204 mgr .cancelFunc = cancelFunc
198- mgr .lastActiveTime = monotime . Now ()
205+ mgr .lastActiveTime = endTime
199206 mgr .wg .Run (func () {
200207 mgr .processSignals (childCtx )
201208 })
@@ -290,9 +297,11 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
290297 }
291298 waitingRedirect := mgr .redirectInfo .Load () != nil
292299 var holdRequest bool
293- holdRequest , err = mgr .cmdProcessor .executeCmd (request , mgr .clientIO , mgr .backendIO .Load (), waitingRedirect )
300+ backendIO := mgr .backendIO .Load ()
301+ holdRequest , err = mgr .cmdProcessor .executeCmd (request , mgr .clientIO , backendIO , waitingRedirect )
294302 if ! holdRequest {
295- addCmdMetrics (cmd , mgr .ServerAddr (), startTime )
303+ addCmdMetrics (cmd , backendIO .RemoteAddr ().String (), startTime )
304+ mgr .updateTraffic (backendIO )
296305 }
297306 if err != nil {
298307 if ! pnet .IsMySQLError (err ) {
@@ -326,25 +335,33 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
326335 }
327336 // Even if it meets an MySQL error, it may have changed the status, such as when executing multi-statements.
328337 if mgr .cmdProcessor .finishedTxn () {
329- if waitingRedirect && holdRequest {
330- mgr .tryRedirect (ctx )
331- // Execute the held request no matter redirection succeeds or not.
332- _ , err = mgr .cmdProcessor .executeCmd (request , mgr .clientIO , mgr .backendIO .Load (), false )
333- addCmdMetrics (cmd , mgr .ServerAddr (), startTime )
334- if err != nil && ! pnet .IsMySQLError (err ) {
335- return
336- }
337- } else if mgr .closeStatus .Load () == statusNotifyClose {
338+ if mgr .closeStatus .Load () == statusNotifyClose {
338339 mgr .tryGracefulClose (ctx )
339340 } else if waitingRedirect {
340341 mgr .tryRedirect (ctx )
341342 }
342343 }
344+ // Execute the held request no matter redirection succeeds or not.
345+ if holdRequest && mgr .closeStatus .Load () < statusNotifyClose {
346+ backendIO = mgr .backendIO .Load ()
347+ _ , err = mgr .cmdProcessor .executeCmd (request , mgr .clientIO , backendIO , false )
348+ addCmdMetrics (cmd , backendIO .RemoteAddr ().String (), startTime )
349+ mgr .updateTraffic (backendIO )
350+ if err != nil && ! pnet .IsMySQLError (err ) {
351+ return
352+ }
353+ }
343354 // Ignore MySQL errors, only return unexpected errors.
344355 err = nil
345356 return
346357}
347358
359+ func (mgr * BackendConnManager ) updateTraffic (backendIO * pnet.PacketIO ) {
360+ inBytes , inPackets , outBytes , outPackets := backendIO .InBytes (), backendIO .InPackets (), backendIO .OutBytes (), backendIO .OutPackets ()
361+ addTraffic (backendIO .RemoteAddr ().String (), inBytes - mgr .inBytes , inPackets - mgr .inPackets , outBytes - mgr .outBytes , outPackets - mgr .outPackets )
362+ mgr .inBytes , mgr .inPackets , mgr .outBytes , mgr .outPackets = inBytes , inPackets , outBytes , outPackets
363+ }
364+
348365// SetEventReceiver implements RedirectableConn.SetEventReceiver interface.
349366// The receiver sends redirection signals and watches redirecting events.
350367func (mgr * BackendConnManager ) SetEventReceiver (receiver router.ConnEventReceiver ) {
@@ -484,6 +501,9 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
484501 }
485502 return
486503 }
504+ mgr .updateTraffic (backendIO )
505+ mgr .inBytes , mgr .inPackets , mgr .outBytes , mgr .outPackets = 0 , 0 , 0 , 0
506+ mgr .updateTraffic (newBackendIO )
487507 if ignoredErr := backendIO .Close (); ignoredErr != nil && ! pnet .IsDisconnectError (ignoredErr ) {
488508 mgr .logger .Error ("close previous backend connection failed" , zap .Error (ignoredErr ))
489509 }
0 commit comments