Skip to content

Commit 7af418a

Browse files
committed
revert some optimizations
1 parent faf4ae1 commit 7af418a

File tree

7 files changed

+108
-74
lines changed

7 files changed

+108
-74
lines changed

hitless/notification_handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
9191
}
9292

9393
// Mark the connection for handoff
94-
poolConn.MarkForHandoff(newEndpoint, seqID)
94+
if err := poolConn.MarkForHandoff(newEndpoint, seqID); err != nil {
95+
// Connection is already marked for handoff, which is acceptable
96+
// This can happen if multiple MOVING notifications are received for the same connection
97+
return nil
98+
}
9599

96100
// Optionally track in hitless manager for monitoring/debugging
97101
if snh.manager != nil {

hitless/redis_connection_processor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,6 @@ func (rcp *RedisConnectionProcessor) performConnectionHandoffWithPool(ctx contex
517517
}
518518
} else {
519519
cn.Close()
520-
newNetConn.Close()
521520
internal.Logger.Printf(ctx,
522521
"hitless: no pool provided for connection %d, cannot remove due to handoff initialization failure: %v",
523522
cn.GetID(), err)

hitless/redis_connection_processor_test.go

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
111111
defer processor.Shutdown(context.Background())
112112

113113
conn := createMockPoolConnection()
114-
conn.MarkForHandoff("new-endpoint:6379", 12345)
114+
if err := conn.MarkForHandoff("new-endpoint:6379", 12345); err != nil {
115+
t.Fatalf("Failed to mark connection for handoff: %v", err)
116+
}
115117

116118
// Set a mock initialization function
117119
initConnCalled := false
@@ -182,7 +184,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
182184
t.Run("EmptyEndpoint", func(t *testing.T) {
183185
processor := NewRedisConnectionProcessor(3, baseDialer, nil, nil)
184186
conn := createMockPoolConnection()
185-
conn.MarkForHandoff("", 12345) // Empty endpoint
187+
if err := conn.MarkForHandoff("", 12345); err != nil { // Empty endpoint
188+
t.Fatalf("Failed to mark connection for handoff: %v", err)
189+
}
186190

187191
ctx := context.Background()
188192
shouldPool, shouldRemove, err := processor.ProcessConnectionOnPut(ctx, conn)
@@ -214,7 +218,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
214218
defer processor.Shutdown(context.Background())
215219

216220
conn := createMockPoolConnection()
217-
conn.MarkForHandoff("new-endpoint:6379", 12345)
221+
if err := conn.MarkForHandoff("new-endpoint:6379", 12345); err != nil {
222+
t.Fatalf("Failed to mark connection for handoff: %v", err)
223+
}
218224

219225
ctx := context.Background()
220226
shouldPool, shouldRemove, err := processor.ProcessConnectionOnPut(ctx, conn)
@@ -359,7 +365,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
359365
connections := make([]*pool.Conn, 5)
360366
for i := 0; i < 5; i++ {
361367
connections[i] = createMockPoolConnection()
362-
connections[i].MarkForHandoff("new-endpoint:6379", int64(i))
368+
if err := connections[i].MarkForHandoff("new-endpoint:6379", int64(i)); err != nil {
369+
t.Fatalf("Failed to mark connection %d for handoff: %v", i, err)
370+
}
363371
}
364372

365373
ctx := context.Background()
@@ -438,7 +446,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
438446

439447
// Create a connection and trigger handoff
440448
conn := createMockPoolConnection()
441-
conn.MarkForHandoff("new-endpoint:6379", 1)
449+
if err := conn.MarkForHandoff("new-endpoint:6379", 1); err != nil {
450+
t.Fatalf("Failed to mark connection for handoff: %v", err)
451+
}
442452

443453
// Process the connection to trigger handoff
444454
shouldPool, shouldRemove, err := processor.ProcessConnectionOnPut(ctx, conn)
@@ -516,7 +526,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
516526
}
517527

518528
// Mark connection for handoff
519-
conn.MarkForHandoff("new-endpoint:6379", 1)
529+
if err := conn.MarkForHandoff("new-endpoint:6379", 1); err != nil {
530+
t.Fatalf("Failed to mark connection for handoff: %v", err)
531+
}
520532

521533
// Connection should no longer be usable
522534
if conn.IsUsable() {
@@ -583,7 +595,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
583595
// Fill part of the queue
584596
for i := 0; i < 10; i++ {
585597
conn := createMockPoolConnection()
586-
conn.MarkForHandoff("new-endpoint:6379", int64(i+1))
598+
if err := conn.MarkForHandoff("new-endpoint:6379", int64(i+1)); err != nil {
599+
t.Fatalf("Failed to mark connection %d for handoff: %v", i, err)
600+
}
587601

588602
shouldPool, shouldRemove, err := processor.ProcessConnectionOnPut(ctx, conn)
589603
if err != nil {
@@ -634,7 +648,9 @@ func TestRedisConnectionProcessor(t *testing.T) {
634648

635649
// Create a connection and mark it for handoff
636650
conn := createMockPoolConnection()
637-
conn.MarkForHandoff("new-endpoint:6379", 1)
651+
if err := conn.MarkForHandoff("new-endpoint:6379", 1); err != nil {
652+
t.Fatalf("Failed to mark connection for handoff: %v", err)
653+
}
638654

639655
// Set a failing initialization function
640656
conn.SetInitConnFunc(func(ctx context.Context, cn *pool.Conn) error {
@@ -712,4 +728,31 @@ func TestRedisConnectionProcessor(t *testing.T) {
712728
t.Error("Relaxed timeout should be automatically cleared after post-handoff duration")
713729
}
714730
})
731+
732+
t.Run("MarkForHandoff returns error when already marked", func(t *testing.T) {
733+
conn := createMockPoolConnection()
734+
735+
// First mark should succeed
736+
if err := conn.MarkForHandoff("new-endpoint:6379", 1); err != nil {
737+
t.Fatalf("First MarkForHandoff should succeed: %v", err)
738+
}
739+
740+
// Second mark should fail
741+
if err := conn.MarkForHandoff("another-endpoint:6379", 2); err == nil {
742+
t.Fatal("Second MarkForHandoff should return error")
743+
} else if err.Error() != "connection is already marked for handoff" {
744+
t.Fatalf("Expected specific error message, got: %v", err)
745+
}
746+
747+
// Verify original handoff data is preserved
748+
if !conn.ShouldHandoff() {
749+
t.Fatal("Connection should still be marked for handoff")
750+
}
751+
if conn.GetHandoffEndpoint() != "new-endpoint:6379" {
752+
t.Fatalf("Expected original endpoint, got: %s", conn.GetHandoffEndpoint())
753+
}
754+
if conn.GetMovingSeqID() != 1 {
755+
t.Fatalf("Expected original sequence ID, got: %d", conn.GetMovingSeqID())
756+
}
757+
})
715758
}

internal/pool/conn.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pool
33
import (
44
"bufio"
55
"context"
6+
"errors"
67
"fmt"
78
"net"
89
"sync/atomic"
@@ -162,11 +163,6 @@ func (cn *Conn) setNewEndpoint(endpoint string) {
162163
cn.newEndpointAtomic.Store(endpoint)
163164
}
164165

165-
// getHandoffRetries returns the retry count atomically (lock-free).
166-
func (cn *Conn) getHandoffRetries() int {
167-
return int(atomic.LoadInt32(&cn.handoffRetriesAtomic))
168-
}
169-
170166
// setHandoffRetries sets the retry count atomically (lock-free).
171167
func (cn *Conn) setHandoffRetries(retries int) {
172168
atomic.StoreInt32(&cn.handoffRetriesAtomic, int32(retries))
@@ -343,11 +339,18 @@ func (cn *Conn) SetNetConnWithInitConn(ctx context.Context, netConn net.Conn) er
343339
}
344340

345341
// MarkForHandoff marks the connection for handoff due to MOVING notification (lock-free).
346-
func (cn *Conn) MarkForHandoff(newEndpoint string, seqID int64) {
342+
// Returns an error if the connection is already marked for handoff.
343+
func (cn *Conn) MarkForHandoff(newEndpoint string, seqID int64) error {
344+
// Check if connection is already marked for handoff
345+
if cn.shouldHandoff() {
346+
return errors.New("connection is already marked for handoff")
347+
}
348+
347349
cn.setShouldHandoff(true)
348350
cn.setNewEndpoint(newEndpoint)
349351
cn.setMovingSeqID(seqID)
350352
cn.setUsable(false) // Connection is not safe to use until handoff completes
353+
return nil
351354
}
352355

353356
// ShouldHandoff returns true if the connection needs to be handed off (lock-free).

internal/pool/pool.go

Lines changed: 25 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,6 @@ type ConnPool struct {
108108
stats Stats
109109
waitDurationNs atomic.Int64
110110

111-
// Atomic flag to prevent concurrent minimum idle connection checks
112-
checkingMinIdle uint32 // atomic: 1 if checking is in progress, 0 otherwise
113-
114111
_closed uint32 // atomic
115112
}
116113

@@ -128,38 +125,14 @@ func NewConnPool(opt *Options) *ConnPool {
128125
// Only create MinIdleConns if explicitly requested (> 0)
129126
// This avoids creating connections during pool initialization for tests
130127
if opt.MinIdleConns > 0 {
131-
p.checkMinIdleConnsAsync()
132-
}
133-
134-
return p
135-
}
136-
137-
// checkMinIdleConnsAsync asynchronously checks and maintains minimum idle connections.
138-
// Uses an atomic flag to prevent concurrent checks - only creates a goroutine when needed.
139-
func (p *ConnPool) checkMinIdleConnsAsync() {
140-
// Fast path: if MinIdleConns is 0, no need to check
141-
if p.cfg.MinIdleConns == 0 {
142-
return
143-
}
144-
145-
// Check if idle connections are already being checked
146-
if !atomic.CompareAndSwapUint32(&p.checkingMinIdle, 0, 1) {
147-
// Already checking, return early to avoid duplicate work
148-
return
149-
}
150-
151-
// Start checking in a goroutine to avoid blocking the caller
152-
go func() {
153-
defer atomic.StoreUint32(&p.checkingMinIdle, 0) // Reset the flag when done
154-
155-
// Check and maintain minimum idle connections
156128
p.connsMu.Lock()
157129
p.checkMinIdleConns()
158130
p.connsMu.Unlock()
159-
}()
131+
}
132+
133+
return p
160134
}
161135

162-
// checkMinIdleConns is not thread-safe. Should be called with connsMu locked.
163136
func (p *ConnPool) checkMinIdleConns() {
164137
if p.cfg.MinIdleConns == 0 {
165138
return
@@ -172,19 +145,23 @@ func (p *ConnPool) checkMinIdleConns() {
172145
case p.queue <- struct{}{}:
173146
p.poolSize++
174147
p.idleConnsLen++
175-
err := p.addIdleConn()
176-
if err != nil && err != ErrClosed {
177-
p.poolSize--
178-
p.idleConnsLen--
179-
}
180-
p.freeTurn()
148+
go func() {
149+
err := p.addIdleConn()
150+
if err != nil && err != ErrClosed {
151+
p.connsMu.Lock()
152+
p.poolSize--
153+
p.idleConnsLen--
154+
p.connsMu.Unlock()
155+
}
156+
157+
p.freeTurn()
158+
}()
181159
default:
182160
return
183161
}
184162
}
185163
}
186164

187-
// addIdleConn is not thread-safe. Should be called with connsMu locked.
188165
func (p *ConnPool) addIdleConn() error {
189166
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
190167
defer cancel()
@@ -194,6 +171,9 @@ func (p *ConnPool) addIdleConn() error {
194171
return err
195172
}
196173

174+
p.connsMu.Lock()
175+
defer p.connsMu.Unlock()
176+
197177
// It is not allowed to add new connections to the closed connection pool.
198178
if p.closed() {
199179
_ = cn.Close()
@@ -334,6 +314,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
334314
}
335315

336316
tries := 0
317+
now := time.Now()
337318
for {
338319
if tries > 10 {
339320
log.Printf("redis: connection pool: failed to get a connection after %d tries", tries)
@@ -353,7 +334,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
353334
break
354335
}
355336

356-
if !p.isHealthyConn(cn) {
337+
if !p.isHealthyConn(cn, now) {
357338
_ = p.CloseConn(cn)
358339
continue
359340
}
@@ -475,12 +456,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
475456
return nil, nil
476457
}
477458

478-
// Asynchronously check minimum idle connections (only if MinIdleConns > 0)
479-
// This avoids blocking the Get() operation while still maintaining pool health
480-
if p.cfg.MinIdleConns > 0 {
481-
p.checkMinIdleConnsAsync()
482-
}
483-
459+
p.checkMinIdleConns()
484460
return cn, nil
485461
}
486462

@@ -550,18 +526,9 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
550526
}
551527

552528
func (p *ConnPool) removeConnWithLock(cn *Conn) {
553-
var shouldCheckMinIdle bool
554-
555529
p.connsMu.Lock()
556-
oldPoolSize := p.poolSize
530+
defer p.connsMu.Unlock()
557531
p.removeConn(cn)
558-
shouldCheckMinIdle = cn.pooled && p.poolSize < oldPoolSize // Connection was removed from pool
559-
p.connsMu.Unlock()
560-
561-
// Check minimum idle connections asynchronously after releasing the lock
562-
if shouldCheckMinIdle {
563-
p.checkMinIdleConnsAsync()
564-
}
565532
}
566533

567534
func (p *ConnPool) removeConn(cn *Conn) {
@@ -570,6 +537,8 @@ func (p *ConnPool) removeConn(cn *Conn) {
570537
p.conns = append(p.conns[:i], p.conns[i+1:]...)
571538
if cn.pooled {
572539
p.poolSize--
540+
// Immediately check for minimum idle connections when a pooled connection is removed
541+
p.checkMinIdleConns()
573542
}
574543
break
575544
}
@@ -653,9 +622,7 @@ func (p *ConnPool) Close() error {
653622
return firstErr
654623
}
655624

656-
func (p *ConnPool) isHealthyConn(cn *Conn) bool {
657-
now := time.Now()
658-
625+
func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
659626
// slight optimization, check expiresAt first.
660627
if cn.expiresAt.Before(now) {
661628
return false
@@ -667,7 +634,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
667634

668635
// Check basic connection health
669636
// Use GetNetConn() to safely access netConn and avoid data races
670-
if err := connCheck(cn.GetNetConn()); err != nil {
637+
if err := connCheck(cn.getNetConn()); err != nil {
671638
return false
672639
}
673640

internal/pool/pool.test

10.9 MB
Binary file not shown.

pool_pubsub_bench_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,3 +390,21 @@ func BenchmarkPoolStatsCollection(b *testing.B) {
390390
_ = stats.Hits + stats.Misses + stats.Timeouts // Use the stats to prevent optimization
391391
}
392392
}
393+
394+
// BenchmarkPoolHighContention tests pool performance under high contention
395+
func BenchmarkPoolHighContention(b *testing.B) {
396+
ctx := context.Background()
397+
client := benchmarkClient(32)
398+
defer client.Close()
399+
400+
b.ResetTimer()
401+
b.ReportAllocs()
402+
403+
b.RunParallel(func(pb *testing.PB) {
404+
for pb.Next() {
405+
// High contention Get/Put operations
406+
pubsub := client.Subscribe(ctx, "test-channel")
407+
pubsub.Close()
408+
}
409+
})
410+
}

0 commit comments

Comments
 (0)