88 "errors"
99 "fmt"
1010 "io"
11- "log"
11+ "log/slog "
1212 "math"
1313 "net"
1414 "runtime"
@@ -32,7 +32,6 @@ const (
3232const shutdownEventKey = "box.shutdown"
3333
3434type ConnEventKind int
35- type ConnLogKind int
3635
3736var (
3837 errUnknownRequest = errors .New ("the passed connected request doesn't belong " +
@@ -50,19 +49,6 @@ const (
5049 Shutdown
5150 // Either reconnect attempts exhausted, or explicit Close is called.
5251 Closed
53-
54- // LogReconnectFailed is logged when reconnect attempt failed.
55- LogReconnectFailed ConnLogKind = iota + 1
56- // LogLastReconnectFailed is logged when last reconnect attempt failed,
57- // connection will be closed after that.
58- LogLastReconnectFailed
59- // LogUnexpectedResultId is logged when response with unknown id was received.
60- // Most probably it is due to request timeout.
61- LogUnexpectedResultId
62- // LogWatchEventReadFailed is logged when failed to read a watch event.
63- LogWatchEventReadFailed
64- // LogBoxSessionPushUnsupported is logged when response type turned IPROTO_CHUNK.
65- LogBoxSessionPushUnsupported
6652)
6753
6854// ConnEvent is sent throw Notify channel specified in Opts.
@@ -80,53 +66,6 @@ type connWatchEvent struct {
8066
8167var epoch = time .Now ()
8268
83- // Logger is logger type expected to be passed in options.
84- type Logger interface {
85- Report (event ConnLogKind , conn * Connection , v ... interface {})
86- }
87-
88- type defaultLogger struct {}
89-
90- func (d defaultLogger ) Report (event ConnLogKind , conn * Connection , v ... interface {}) {
91- switch event {
92- case LogReconnectFailed :
93- reconnects := v [0 ].(uint )
94- err := v [1 ].(error )
95- addr := conn .Addr ()
96- if addr == nil {
97- log .Printf ("tarantool: connect (%d/%d) failed: %s" ,
98- reconnects , conn .opts .MaxReconnects , err )
99- } else {
100- log .Printf ("tarantool: reconnect (%d/%d) to %s failed: %s" ,
101- reconnects , conn .opts .MaxReconnects , addr , err )
102- }
103- case LogLastReconnectFailed :
104- err := v [0 ].(error )
105- addr := conn .Addr ()
106- if addr == nil {
107- log .Printf ("tarantool: last connect failed: %s, giving it up" ,
108- err )
109- } else {
110- log .Printf ("tarantool: last reconnect to %s failed: %s, giving it up" ,
111- addr , err )
112- }
113- case LogUnexpectedResultId :
114- header := v [0 ].(Header )
115- log .Printf ("tarantool: connection %s got unexpected request ID (%d) in response " +
116- "(probably cancelled request)" ,
117- conn .Addr (), header .RequestId )
118- case LogWatchEventReadFailed :
119- err := v [0 ].(error )
120- log .Printf ("tarantool: unable to parse watch event: %s" , err )
121- case LogBoxSessionPushUnsupported :
122- header := v [0 ].(Header )
123- log .Printf ("tarantool: unsupported box.session.push() for request %d" , header .RequestId )
124- default :
125- args := append ([]interface {}{"tarantool: unexpected event " , event , conn }, v ... )
126- log .Print (args ... )
127- }
128- }
129-
13069// Connection is a handle with a single connection to a Tarantool instance.
13170//
13271// It is created and configured with Connect function, and could not be
@@ -369,7 +308,7 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
369308 }
370309
371310 if conn .opts .Logger == nil {
372- conn .opts .Logger = defaultLogger {}
311+ conn .opts .Logger = NewSlogLogger ( slog . Default ())
373312 }
374313
375314 conn .cond = sync .NewCond (& conn .mutex )
@@ -410,6 +349,28 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
410349 return conn , err
411350}
412351
352+ func (conn * Connection ) logEvent (event LogEvent ) {
353+ if conn .opts .Logger != nil {
354+ conn .opts .Logger .Report (event , conn )
355+ }
356+ }
357+
358+ func (conn * Connection ) stateToString () string {
359+ state := atomic .LoadUint32 (& conn .state )
360+ switch state {
361+ case connDisconnected :
362+ return "disconnected"
363+ case connConnected :
364+ return "connected"
365+ case connShutdown :
366+ return "shutdown"
367+ case connClosed :
368+ return "closed"
369+ default :
370+ return "unknown"
371+ }
372+ }
373+
413374// ConnectedNow reports if connection is established at the moment.
414375func (conn * Connection ) ConnectedNow () bool {
415376 return atomic .LoadUint32 (& conn .state ) == connConnected
@@ -583,6 +544,10 @@ func (conn *Connection) connect(ctx context.Context) error {
583544 var err error
584545 if conn .c == nil && conn .state == connDisconnected {
585546 if err = conn .dial (ctx ); err == nil {
547+
548+ conn .logEvent (ConnectedEvent {
549+ baseEvent : newBaseEvent (conn .addr ),
550+ })
586551 conn .notify (Connected )
587552 return nil
588553 }
@@ -596,6 +561,7 @@ func (conn *Connection) connect(ctx context.Context) error {
596561func (conn * Connection ) closeConnection (neterr error , forever bool ) (err error ) {
597562 conn .lockShards ()
598563 defer conn .unlockShards ()
564+
599565 if forever {
600566 if conn .state != connClosed {
601567 close (conn .control )
@@ -606,11 +572,18 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
606572 go conn .shutdownWatcher .Unregister ()
607573 conn .shutdownWatcher = nil
608574 }
575+ conn .logEvent (ClosedEvent {
576+ baseEvent : newBaseEvent (conn .addr ),
577+ })
609578 conn .notify (Closed )
610579 }
611580 } else {
612581 atomic .StoreUint32 (& conn .state , connDisconnected )
613582 conn .cond .Broadcast ()
583+ conn .logEvent (DisconnectedEvent {
584+ baseEvent : newBaseEvent (conn .addr ),
585+ Reason : neterr ,
586+ })
614587 conn .notify (Disconnected )
615588 }
616589 if conn .c != nil {
@@ -674,23 +647,29 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
674647 return nil
675648 }
676649
677- conn .opts .Logger .Report (LogReconnectFailed , conn , reconnects , err )
650+ conn .logEvent (ConnectionFailedEvent {
651+ baseEvent : newBaseEvent (conn .addr ),
652+ Error : err ,
653+ })
678654 conn .notify (ReconnectFailed )
679655 reconnects ++
680656 conn .mutex .Unlock ()
681657
682658 select {
683659 case <- ctx .Done ():
684- // Since the context is cancelled, we don't need to do anything.
685- // Conn.connect() will return the correct error.
660+ // Since the context is cancelled, we don't need to do anything.
661+ // Conn.connect() will return the correct error.
686662 case <- t .C :
687663 }
688664
689665 conn .mutex .Lock ()
690666 }
691667
692- conn .opts .Logger .Report (LogLastReconnectFailed , conn , err )
693- // mark connection as closed to avoid reopening by another goroutine
668+ conn .logEvent (ConnectionFailedEvent {
669+ baseEvent : newBaseEvent (conn .addr ),
670+ Error : err ,
671+ })
672+
694673 return ClientError {ErrConnectionClosed , "last reconnect failed" }
695674}
696675
@@ -880,11 +859,17 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
880859 ErrProtocolError ,
881860 fmt .Sprintf ("failed to decode IPROTO_EVENT: %s" , err ),
882861 }
883- conn .opts .Logger .Report (LogWatchEventReadFailed , conn , err )
862+ conn .logEvent (WatchEventReadFailedEvent {
863+ baseEvent : newBaseEvent (conn .addr ),
864+ Error : err ,
865+ })
884866 }
885867 continue
886868 } else if code == iproto .IPROTO_CHUNK {
887- conn .opts .Logger .Report (LogBoxSessionPushUnsupported , conn , header )
869+ conn .logEvent (BoxSessionPushUnsupportedEvent {
870+ baseEvent : newBaseEvent (conn .addr ),
871+ RequestId : header .RequestId ,
872+ })
888873 } else {
889874 if fut = conn .fetchFuture (header .RequestId ); fut != nil {
890875 if err := fut .SetResponse (header , & buf ); err != nil {
@@ -895,7 +880,10 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
895880 }
896881
897882 if fut == nil {
898- conn .opts .Logger .Report (LogUnexpectedResultId , conn , header )
883+ conn .logEvent (UnexpectedResultIdEvent {
884+ baseEvent : newBaseEvent (conn .addr ),
885+ RequestId : header .RequestId ,
886+ })
899887 }
900888 }
901889}
@@ -1174,6 +1162,12 @@ func (conn *Connection) timeouts() {
11741162 })
11751163 conn .markDone (fut )
11761164 shard .bufmut .Unlock ()
1165+
1166+ conn .logEvent (TimeoutEvent {
1167+ baseEvent : newBaseEvent (conn .addr ),
1168+ RequestId : fut .requestId ,
1169+ Timeout : timeout ,
1170+ })
11771171 }
11781172 if pair .first != nil && pair .first .timeout < minNext {
11791173 minNext = pair .first .timeout
0 commit comments