88 "errors"
99 "fmt"
1010 "io"
11- "log"
11+ "log/slog "
1212 "math"
1313 "net"
1414 "runtime"
@@ -32,7 +32,6 @@ const (
3232const shutdownEventKey = "box.shutdown"
3333
3434type ConnEventKind int
35- type ConnLogKind int
3635
3736var (
3837 errUnknownRequest = errors .New ("the passed connected request doesn't belong " +
@@ -50,19 +49,6 @@ const (
5049 Shutdown
5150 // Either reconnect attempts exhausted, or explicit Close is called.
5251 Closed
53-
54- // LogReconnectFailed is logged when reconnect attempt failed.
55- LogReconnectFailed ConnLogKind = iota + 1
56- // LogLastReconnectFailed is logged when last reconnect attempt failed,
57- // connection will be closed after that.
58- LogLastReconnectFailed
59- // LogUnexpectedResultId is logged when response with unknown id was received.
60- // Most probably it is due to request timeout.
61- LogUnexpectedResultId
62- // LogWatchEventReadFailed is logged when failed to read a watch event.
63- LogWatchEventReadFailed
64- // LogBoxSessionPushUnsupported is logged when response type turned IPROTO_CHUNK.
65- LogBoxSessionPushUnsupported
6652)
6753
6854// ConnEvent is sent throw Notify channel specified in Opts.
@@ -80,53 +66,6 @@ type connWatchEvent struct {
8066
8167var epoch = time .Now ()
8268
83- // Logger is logger type expected to be passed in options.
84- type Logger interface {
85- Report (event ConnLogKind , conn * Connection , v ... interface {})
86- }
87-
88- type defaultLogger struct {}
89-
90- func (d defaultLogger ) Report (event ConnLogKind , conn * Connection , v ... interface {}) {
91- switch event {
92- case LogReconnectFailed :
93- reconnects := v [0 ].(uint )
94- err := v [1 ].(error )
95- addr := conn .Addr ()
96- if addr == nil {
97- log .Printf ("tarantool: connect (%d/%d) failed: %s" ,
98- reconnects , conn .opts .MaxReconnects , err )
99- } else {
100- log .Printf ("tarantool: reconnect (%d/%d) to %s failed: %s" ,
101- reconnects , conn .opts .MaxReconnects , addr , err )
102- }
103- case LogLastReconnectFailed :
104- err := v [0 ].(error )
105- addr := conn .Addr ()
106- if addr == nil {
107- log .Printf ("tarantool: last connect failed: %s, giving it up" ,
108- err )
109- } else {
110- log .Printf ("tarantool: last reconnect to %s failed: %s, giving it up" ,
111- addr , err )
112- }
113- case LogUnexpectedResultId :
114- header := v [0 ].(Header )
115- log .Printf ("tarantool: connection %s got unexpected request ID (%d) in response " +
116- "(probably cancelled request)" ,
117- conn .Addr (), header .RequestId )
118- case LogWatchEventReadFailed :
119- err := v [0 ].(error )
120- log .Printf ("tarantool: unable to parse watch event: %s" , err )
121- case LogBoxSessionPushUnsupported :
122- header := v [0 ].(Header )
123- log .Printf ("tarantool: unsupported box.session.push() for request %d" , header .RequestId )
124- default :
125- args := append ([]interface {}{"tarantool: unexpected event " , event , conn }, v ... )
126- log .Print (args ... )
127- }
128- }
129-
13069// Connection is a handle with a single connection to a Tarantool instance.
13170//
13271// It is created and configured with Connect function, and could not be
@@ -208,7 +147,8 @@ type Connection struct {
208147 // shutdownWatcher is the "box.shutdown" event watcher.
209148 shutdownWatcher Watcher
210149 // requestCnt is a counter of active requests.
211- requestCnt int64
150+ requestCnt int64
151+ reconnectCount uint32 // count of reconnect
212152}
213153
214154var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -369,7 +309,7 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
369309 }
370310
371311 if conn .opts .Logger == nil {
372- conn .opts .Logger = defaultLogger {}
312+ conn .opts .Logger = NewSlogLogger ( slog . Default ())
373313 }
374314
375315 conn .cond = sync .NewCond (& conn .mutex )
@@ -410,6 +350,28 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
410350 return conn , err
411351}
412352
353+ func (conn * Connection ) logEvent (event LogEvent ) {
354+ if conn .opts .Logger != nil {
355+ conn .opts .Logger .Report (event , conn )
356+ }
357+ }
358+
359+ func (conn * Connection ) stateToString () string {
360+ state := atomic .LoadUint32 (& conn .state )
361+ switch state {
362+ case connDisconnected :
363+ return "disconnected"
364+ case connConnected :
365+ return "connected"
366+ case connShutdown :
367+ return "shutdown"
368+ case connClosed :
369+ return "closed"
370+ default :
371+ return "unknown"
372+ }
373+ }
374+
413375// ConnectedNow reports if connection is established at the moment.
414376func (conn * Connection ) ConnectedNow () bool {
415377 return atomic .LoadUint32 (& conn .state ) == connConnected
@@ -583,6 +545,14 @@ func (conn *Connection) connect(ctx context.Context) error {
583545 var err error
584546 if conn .c == nil && conn .state == connDisconnected {
585547 if err = conn .dial (ctx ); err == nil {
548+ // Atomically increase the reconnect count
549+ // (use atomic operations for thread safety)
550+ reconnects := atomic .AddUint32 (& conn .reconnectCount , 1 ) - 1
551+
552+ conn .logEvent (ConnectedEvent {
553+ baseEvent : newBaseEvent (conn .addr ),
554+ Reconnects : uint (reconnects ),
555+ })
586556 conn .notify (Connected )
587557 return nil
588558 }
@@ -593,24 +563,37 @@ func (conn *Connection) connect(ctx context.Context) error {
593563 return err
594564}
595565
566+ // Method to reset the reconnect count
567+ func (conn * Connection ) resetReconnectCount () {
568+ atomic .StoreUint32 (& conn .reconnectCount , 0 )
569+ }
570+
571+ // Method for getting the current number of reconnects
572+ func (conn * Connection ) GetReconnectCount () uint32 {
573+ return atomic .LoadUint32 (& conn .reconnectCount )
574+ }
575+
596576func (conn * Connection ) closeConnection (neterr error , forever bool ) (err error ) {
597577 conn .lockShards ()
598578 defer conn .unlockShards ()
579+
599580 if forever {
600581 if conn .state != connClosed {
601582 close (conn .control )
602583 atomic .StoreUint32 (& conn .state , connClosed )
603584 conn .cond .Broadcast ()
604- // Free the resources.
605- if conn .shutdownWatcher != nil {
606- go conn .shutdownWatcher .Unregister ()
607- conn .shutdownWatcher = nil
608- }
585+ conn .logEvent (ClosedEvent {
586+ baseEvent : newBaseEvent (conn .addr ),
587+ })
609588 conn .notify (Closed )
610589 }
611590 } else {
612591 atomic .StoreUint32 (& conn .state , connDisconnected )
613592 conn .cond .Broadcast ()
593+ conn .logEvent (DisconnectedEvent {
594+ baseEvent : newBaseEvent (conn .addr ),
595+ Reason : neterr ,
596+ })
614597 conn .notify (Disconnected )
615598 }
616599 if conn .c != nil {
@@ -655,14 +638,6 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
655638 cancel ()
656639
657640 if err != nil {
658- // The error will most likely be the one that Dialer
659- // returns to us due to the context being cancelled.
660- // Although this is not guaranteed. For example,
661- // if the dialer may throw another error before checking
662- // the context, and the context has already been
663- // canceled. Or the context was not canceled after
664- // the error was thrown, but before the context was
665- // checked here.
666641 if ctx .Err () != nil {
667642 return err
668643 }
@@ -674,23 +649,31 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
674649 return nil
675650 }
676651
677- conn .opts .Logger .Report (LogReconnectFailed , conn , reconnects , err )
652+ conn .logEvent (ReconnectFailedEvent {
653+ baseEvent : newBaseEvent (conn .addr ),
654+ Reconnects : reconnects ,
655+ MaxReconnects : conn .opts .MaxReconnects ,
656+ Error : err ,
657+ IsInitial : conn .addr == nil ,
658+ })
659+
678660 conn .notify (ReconnectFailed )
679661 reconnects ++
680662 conn .mutex .Unlock ()
681663
682664 select {
683665 case <- ctx .Done ():
684- // Since the context is cancelled, we don't need to do anything.
685- // Conn.connect() will return the correct error.
686666 case <- t .C :
687667 }
688668
689669 conn .mutex .Lock ()
690670 }
691671
692- conn .opts .Logger .Report (LogLastReconnectFailed , conn , err )
693- // mark connection as closed to avoid reopening by another goroutine
672+ conn .logEvent (LastReconnectFailedEvent {
673+ baseEvent : newBaseEvent (conn .addr ),
674+ Error : err ,
675+ })
676+
694677 return ClientError {ErrConnectionClosed , "last reconnect failed" }
695678}
696679
@@ -880,11 +863,17 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
880863 ErrProtocolError ,
881864 fmt .Sprintf ("failed to decode IPROTO_EVENT: %s" , err ),
882865 }
883- conn .opts .Logger .Report (LogWatchEventReadFailed , conn , err )
866+ conn .logEvent (WatchEventReadFailedEvent {
867+ baseEvent : newBaseEvent (conn .addr ),
868+ Error : err ,
869+ })
884870 }
885871 continue
886872 } else if code == iproto .IPROTO_CHUNK {
887- conn .opts .Logger .Report (LogBoxSessionPushUnsupported , conn , header )
873+ conn .logEvent (BoxSessionPushUnsupportedEvent {
874+ baseEvent : newBaseEvent (conn .addr ),
875+ RequestId : header .RequestId ,
876+ })
888877 } else {
889878 if fut = conn .fetchFuture (header .RequestId ); fut != nil {
890879 if err := fut .SetResponse (header , & buf ); err != nil {
@@ -895,7 +884,10 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
895884 }
896885
897886 if fut == nil {
898- conn .opts .Logger .Report (LogUnexpectedResultId , conn , header )
887+ conn .logEvent (UnexpectedResultIdEvent {
888+ baseEvent : newBaseEvent (conn .addr ),
889+ RequestId : header .RequestId ,
890+ })
899891 }
900892 }
901893}
@@ -1174,6 +1166,12 @@ func (conn *Connection) timeouts() {
11741166 })
11751167 conn .markDone (fut )
11761168 shard .bufmut .Unlock ()
1169+
1170+ conn .logEvent (TimeoutEvent {
1171+ baseEvent : newBaseEvent (conn .addr ),
1172+ RequestId : fut .requestId ,
1173+ Timeout : timeout ,
1174+ })
11771175 }
11781176 if pair .first != nil && pair .first .timeout < minNext {
11791177 minNext = pair .first .timeout
0 commit comments