Skip to content

Commit 4741276

Browse files
committed
2
1 parent 0ba8b19 commit 4741276

File tree

2 files changed

+45
-39
lines changed

2 files changed

+45
-39
lines changed

connection.go

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,6 @@ import (
2222

2323
const requestsMap = 128
2424
const ignoreStreamId = 0
25-
const (
26-
connDisconnected = 0
27-
connConnected = 1
28-
connShutdown = 2
29-
connClosed = 3
30-
)
3125

3226
const shutdownEventKey = "box.shutdown"
3327

@@ -51,6 +45,32 @@ const (
5145
Closed
5246
)
5347

48+
// ConnectionState represents the state of a connection.
49+
type ConnectionState uint32
50+
51+
const (
52+
StateDisconnected ConnectionState = iota
53+
StateConnected
54+
StateShutdown
55+
StateClosed
56+
)
57+
58+
// String implements fmt.Stringer.
59+
func (s ConnectionState) String() string {
60+
switch s {
61+
case StateDisconnected:
62+
return "disconnected"
63+
case StateConnected:
64+
return "connected"
65+
case StateShutdown:
66+
return "shutdown"
67+
case StateClosed:
68+
return "closed"
69+
default:
70+
return "unknown"
71+
}
72+
}
73+
5474
// ConnEvent is sent throw Notify channel specified in Opts.
5575
type ConnEvent struct {
5676
Conn *Connection
@@ -355,30 +375,14 @@ func (conn *Connection) logEvent(event LogEvent) {
355375
}
356376
}
357377

358-
func (conn *Connection) stateToString() string {
359-
state := atomic.LoadUint32(&conn.state)
360-
switch state {
361-
case connDisconnected:
362-
return "disconnected"
363-
case connConnected:
364-
return "connected"
365-
case connShutdown:
366-
return "shutdown"
367-
case connClosed:
368-
return "closed"
369-
default:
370-
return "unknown"
371-
}
372-
}
373-
374378
// ConnectedNow reports if connection is established at the moment.
375379
func (conn *Connection) ConnectedNow() bool {
376-
return atomic.LoadUint32(&conn.state) == connConnected
380+
return ConnectionState(atomic.LoadUint32(&conn.state)) == StateConnected
377381
}
378382

379383
// ClosedNow reports if connection is closed by user or after reconnect.
380384
func (conn *Connection) ClosedNow() bool {
381-
return atomic.LoadUint32(&conn.state) == connClosed
385+
return ConnectionState(atomic.LoadUint32(&conn.state)) == StateClosed
382386
}
383387

384388
// Close closes Connection.
@@ -468,7 +472,7 @@ func (conn *Connection) dial(ctx context.Context) error {
468472
// Only if connected and fully initialized.
469473
conn.lockShards()
470474
conn.c = c
471-
atomic.StoreUint32(&conn.state, connConnected)
475+
atomic.StoreUint32(&conn.state, uint32(StateConnected))
472476
conn.cond.Broadcast()
473477
conn.unlockShards()
474478
go conn.writer(c, c)
@@ -542,7 +546,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
542546

543547
func (conn *Connection) connect(ctx context.Context) error {
544548
var err error
545-
if conn.c == nil && conn.state == connDisconnected {
549+
if conn.c == nil && ConnectionState(atomic.LoadUint32(&conn.state)) == StateDisconnected {
546550
if err = conn.dial(ctx); err == nil {
547551

548552
conn.logEvent(ConnectedEvent{
@@ -552,7 +556,7 @@ func (conn *Connection) connect(ctx context.Context) error {
552556
return nil
553557
}
554558
}
555-
if conn.state == connClosed {
559+
if ConnectionState(atomic.LoadUint32(&conn.state)) == StateClosed {
556560
err = ClientError{ErrConnectionClosed, "using closed connection"}
557561
}
558562
return err
@@ -563,9 +567,9 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
563567
defer conn.unlockShards()
564568

565569
if forever {
566-
if conn.state != connClosed {
570+
if ConnectionState(atomic.LoadUint32(&conn.state)) != StateClosed {
567571
close(conn.control)
568-
atomic.StoreUint32(&conn.state, connClosed)
572+
atomic.StoreUint32(&conn.state, uint32(StateClosed))
569573
conn.cond.Broadcast()
570574
// Free the resources.
571575
if conn.shutdownWatcher != nil {
@@ -578,7 +582,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
578582
conn.notify(Closed)
579583
}
580584
} else {
581-
atomic.StoreUint32(&conn.state, connDisconnected)
585+
atomic.StoreUint32(&conn.state, uint32(StateDisconnected))
582586
conn.cond.Broadcast()
583587
conn.logEvent(DisconnectedEvent{
584588
baseEvent: newBaseEvent(conn.addr),
@@ -736,7 +740,7 @@ func (conn *Connection) notify(kind ConnEventKind) {
736740
func (conn *Connection) writer(w writeFlusher, c Conn) {
737741
var shardn uint32
738742
var packet smallWBuf
739-
for atomic.LoadUint32(&conn.state) != connClosed {
743+
for ConnectionState(atomic.LoadUint32(&conn.state)) != StateClosed {
740744
select {
741745
case shardn = <-conn.dirtyShard:
742746
default:
@@ -829,7 +833,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
829833

830834
go conn.eventer(events)
831835

832-
for atomic.LoadUint32(&conn.state) != connClosed {
836+
for ConnectionState(atomic.LoadUint32(&conn.state)) != StateClosed {
833837
respBytes, err := read(r, conn.lenbuf[:])
834838
if err != nil {
835839
err = ClientError{
@@ -932,24 +936,24 @@ func (conn *Connection) newFuture(req Request) (fut *future) {
932936
shardn := fut.requestId & (conn.opts.Concurrency - 1)
933937
shard := &conn.shard[shardn]
934938
shard.rmut.Lock()
935-
switch atomic.LoadUint32(&conn.state) {
936-
case connClosed:
939+
switch ConnectionState(atomic.LoadUint32(&conn.state)) {
940+
case StateClosed:
937941
fut.err = ClientError{
938942
ErrConnectionClosed,
939943
"using closed connection",
940944
}
941945
fut.finish()
942946
shard.rmut.Unlock()
943947
return
944-
case connDisconnected:
948+
case StateDisconnected:
945949
fut.err = ClientError{
946950
ErrConnectionNotReady,
947951
"client connection is not ready",
948952
}
949953
fut.finish()
950954
shard.rmut.Unlock()
951955
return
952-
case connShutdown:
956+
case StateShutdown:
953957
fut.err = ClientError{
954958
ErrConnectionShutdown,
955959
"server shutdown in progress",
@@ -1545,7 +1549,7 @@ func (conn *Connection) shutdown(forever bool) error {
15451549
conn.mutex.Lock()
15461550
defer conn.mutex.Unlock()
15471551

1548-
if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
1552+
if !atomic.CompareAndSwapUint32(&conn.state, uint32(StateConnected), uint32(StateShutdown)) {
15491553
if forever {
15501554
err := ClientError{ErrConnectionClosed, "connection closed by client"}
15511555
return conn.closeConnection(err, true)
@@ -1569,7 +1573,7 @@ func (conn *Connection) shutdown(forever bool) error {
15691573

15701574
c := conn.c
15711575
for {
1572-
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
1576+
if (ConnectionState(atomic.LoadUint32(&conn.state)) != StateShutdown) || (c != conn.c) {
15731577
return nil
15741578
}
15751579
if atomic.LoadInt64(&conn.requestCnt) == 0 {

logger.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"context"
55
"log/slog"
6+
"sync/atomic"
67
)
78

89
type Logger interface {
@@ -46,7 +47,8 @@ func (l SlogLogger) Report(event LogEvent, conn *Connection) {
4647
}
4748

4849
if !keys["connection_state"] {
49-
attrs = append(attrs, slog.String("connection_state", conn.stateToString()))
50+
state := ConnectionState(atomic.LoadUint32(&conn.state))
51+
attrs = append(attrs, slog.String("connection_state", state.String()))
5052
}
5153

5254
if conn.opts.MaxReconnects > 0 && !keys["max_reconnects"] {

0 commit comments

Comments
 (0)