Skip to content

Commit 3570c1a

Browse files
committed
wrap atomic
1 parent c0baba6 commit 3570c1a

File tree

2 files changed

+24
-14
lines changed

2 files changed

+24
-14
lines changed

hitless/redis_connection_processor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,11 @@ func (rcp *RedisConnectionProcessor) performConnectionHandoffWithPool(ctx contex
501501

502502
// Get the old connection
503503
oldConn := cn.GetNetConn()
504-
defer oldConn.Close()
504+
defer func() {
505+
if oldConn != nil {
506+
oldConn.Close()
507+
}
508+
}()
505509

506510
// Replace the connection and execute initialization
507511
err = cn.SetNetConnWithInitConn(ctx, newNetConn)

internal/pool/conn.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ var noDeadline = time.Time{}
1818
// Global atomic counter for connection IDs
1919
var connIDCounter uint64
2020

21+
// atomicNetConn is a wrapper to ensure consistent typing in atomic.Value
22+
type atomicNetConn struct {
23+
conn net.Conn
24+
}
25+
2126
// generateConnID generates a fast unique identifier for a connection with zero allocations
2227
func generateConnID() uint64 {
2328
return atomic.AddUint64(&connIDCounter, 1)
@@ -27,8 +32,8 @@ type Conn struct {
2732
usedAt int64 // atomic
2833

2934
// Lock-free netConn access using atomic.Value
30-
// Contains net.Conn, accessed atomically for better performance
31-
netConnAtomic atomic.Value // stores net.Conn
35+
// Contains *atomicNetConn wrapper, accessed atomically for better performance
36+
netConnAtomic atomic.Value // stores *atomicNetConn
3237

3338
rd *proto.Reader
3439
bw *bufio.Writer
@@ -74,8 +79,8 @@ func NewConn(netConn net.Conn) *Conn {
7479
id: generateConnID(), // Generate unique ID for this connection
7580
}
7681

77-
// Store netConn atomically for lock-free access
78-
cn.netConnAtomic.Store(netConn)
82+
// Store netConn atomically for lock-free access using wrapper
83+
cn.netConnAtomic.Store(&atomicNetConn{conn: netConn})
7984

8085
// Initialize atomic handoff state
8186
atomic.StoreInt32(&cn.usableAtomic, 0) // false initially, set to true after initialization
@@ -103,16 +108,18 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
103108
// getNetConn returns the current network connection using atomic load (lock-free).
104109
// This is the fast path for accessing netConn without mutex overhead.
105110
func (cn *Conn) getNetConn() net.Conn {
106-
if conn := cn.netConnAtomic.Load(); conn != nil {
107-
return conn.(net.Conn)
111+
if v := cn.netConnAtomic.Load(); v != nil {
112+
if wrapper, ok := v.(*atomicNetConn); ok {
113+
return wrapper.conn
114+
}
108115
}
109116
return nil
110117
}
111118

112119
// setNetConn stores the network connection atomically (lock-free).
113120
// This is used for the fast path of connection replacement.
114121
func (cn *Conn) setNetConn(netConn net.Conn) {
115-
cn.netConnAtomic.Store(netConn)
122+
cn.netConnAtomic.Store(&atomicNetConn{conn: netConn})
116123
}
117124

118125
// Lock-free helper methods for handoff state management
@@ -311,14 +318,13 @@ func (cn *Conn) SetInitConnFunc(fn func(context.Context, *Conn) error) {
311318
// ExecuteInitConn runs the stored connection initialization function if available.
312319
func (cn *Conn) ExecuteInitConn(ctx context.Context) error {
313320
if cn.initConnFunc != nil {
314-
err := cn.initConnFunc(ctx, cn)
315-
if err == nil {
316-
cn.Inited = true
317-
cn.setUsable(true) // Use atomic operation
321+
if err := cn.initConnFunc(ctx, cn); err != nil {
322+
return err
318323
}
319-
return err
324+
cn.Inited = true
325+
cn.setUsable(true) // Use atomic operation
320326
}
321-
return nil
327+
return fmt.Errorf("redis: no initConnFunc set for connection %d", cn.GetID())
322328
}
323329

324330
func (cn *Conn) SetNetConn(netConn net.Conn) {

0 commit comments

Comments
 (0)