Skip to content

Commit 02c2447

Browse files
committed
wip
1 parent 3c40e22 commit 02c2447

15 files changed

+199
-120
lines changed

async_handoff_integration_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5252
Dialer: func(ctx context.Context) (net.Conn, error) {
5353
return &mockNetConn{addr: "original:6379"}, nil
5454
},
55-
PoolSize: 5,
55+
PoolSize: int32(5),
5656
PoolTimeout: time.Second,
5757
})
5858

@@ -144,8 +144,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
144144
Dialer: func(ctx context.Context) (net.Conn, error) {
145145
return &mockNetConn{addr: "original:6379"}, nil
146146
},
147-
148-
PoolSize: 10,
147+
148+
PoolSize: int32(10),
149149
PoolTimeout: time.Second,
150150
})
151151
defer testPool.Close()
@@ -216,8 +216,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
216216
Dialer: func(ctx context.Context) (net.Conn, error) {
217217
return &mockNetConn{addr: "original:6379"}, nil
218218
},
219-
220-
PoolSize: 3,
219+
220+
PoolSize: int32(3),
221221
PoolTimeout: time.Second,
222222
})
223223
defer testPool.Close()
@@ -279,8 +279,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
279279
Dialer: func(ctx context.Context) (net.Conn, error) {
280280
return &mockNetConn{addr: "original:6379"}, nil
281281
},
282-
283-
PoolSize: 2,
282+
283+
PoolSize: int32(2),
284284
PoolTimeout: time.Second,
285285
})
286286
defer testPool.Close()

hitless/pool_hook.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,14 @@ func (ph *PoolHook) GetScaleLevel() int {
124124
return ph.scaleLevel
125125
}
126126

127-
128127
// IsHandoffPending returns true if the given connection has a pending handoff
129128
func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool {
130129
_, pending := ph.pending.Load(conn.GetID())
131130
return pending
132131
}
133132

134133
// OnGet is called when a connection is retrieved from the pool
135-
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error {
134+
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, isNewConn bool) error {
136135
// NOTE: There are two conditions to make sure we don't return a connection that should be handed off or is
137136
// in a handoff state at the moment.
138137

@@ -508,7 +507,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
508507
oldConn := conn.GetNetConn()
509508

510509
// Replace the connection and execute initialization
511-
err = conn.SetNetConnWithInitConn(ctx, newNetConn)
510+
err = conn.SetNetConnAndInitConn(ctx, newNetConn)
512511
if err != nil {
513512
// Remove the connection from the pool since it's in a bad state
514513
if pooler != nil {

internal/pool/bench_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool_test
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"testing"
78
"time"
@@ -31,7 +32,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
3132
b.Run(bm.String(), func(b *testing.B) {
3233
connPool := pool.NewConnPool(&pool.Options{
3334
Dialer: dummyDialer,
34-
PoolSize: bm.poolSize,
35+
PoolSize: int32(bm.poolSize),
3536
PoolTimeout: time.Second,
3637
DialTimeout: 1 * time.Second,
3738
ConnMaxIdleTime: time.Hour,
@@ -75,7 +76,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
b.Run(bm.String(), func(b *testing.B) {
7677
connPool := pool.NewConnPool(&pool.Options{
7778
Dialer: dummyDialer,
78-
PoolSize: bm.poolSize,
79+
PoolSize: int32(bm.poolSize),
7980
PoolTimeout: time.Second,
8081
DialTimeout: 1 * time.Second,
8182
ConnMaxIdleTime: time.Hour,
@@ -89,7 +90,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
8990
if err != nil {
9091
b.Fatal(err)
9192
}
92-
connPool.Remove(ctx, cn, nil)
93+
connPool.Remove(ctx, cn, errors.New("Bench test remove"))
9394
}
9495
})
9596
})

internal/pool/buffer_size_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var _ = Describe("Buffer Size Configuration", func() {
2626
It("should use default buffer sizes when not specified", func() {
2727
connPool = pool.NewConnPool(&pool.Options{
2828
Dialer: dummyDialer,
29-
PoolSize: 1,
29+
PoolSize: int32(1),
3030
PoolTimeout: 1000,
3131
})
3232

@@ -48,7 +48,7 @@ var _ = Describe("Buffer Size Configuration", func() {
4848

4949
connPool = pool.NewConnPool(&pool.Options{
5050
Dialer: dummyDialer,
51-
PoolSize: 1,
51+
PoolSize: int32(1),
5252
PoolTimeout: 1000,
5353
ReadBufferSize: customReadSize,
5454
WriteBufferSize: customWriteSize,
@@ -69,7 +69,7 @@ var _ = Describe("Buffer Size Configuration", func() {
6969
It("should handle zero buffer sizes by using defaults", func() {
7070
connPool = pool.NewConnPool(&pool.Options{
7171
Dialer: dummyDialer,
72-
PoolSize: 1,
72+
PoolSize: int32(1),
7373
PoolTimeout: 1000,
7474
ReadBufferSize: 0, // Should use default
7575
WriteBufferSize: 0, // Should use default
@@ -105,7 +105,7 @@ var _ = Describe("Buffer Size Configuration", func() {
105105
// without setting ReadBufferSize and WriteBufferSize
106106
connPool = pool.NewConnPool(&pool.Options{
107107
Dialer: dummyDialer,
108-
PoolSize: 1,
108+
PoolSize: int32(1),
109109
PoolTimeout: 1000,
110110
// ReadBufferSize and WriteBufferSize are not set (will be 0)
111111
})

internal/pool/conn.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type Conn struct {
4343
// Only used for the brief period during SetNetConn and HasBufferedData/PeekReplyTypeSafe
4444
readerMu sync.RWMutex
4545

46-
Inited bool
46+
Inited atomic.Bool
4747
pooled bool
4848
createdAt time.Time
4949
expiresAt time.Time
@@ -200,6 +200,10 @@ func (cn *Conn) IsUsable() bool {
200200
return cn.isUsable()
201201
}
202202

203+
func (cn *Conn) IsInited() bool {
204+
return cn.Inited.Load()
205+
}
206+
203207
// SetUsable sets the usable flag for the connection (lock-free).
204208
func (cn *Conn) SetUsable(usable bool) {
205209
cn.setUsable(usable)
@@ -347,12 +351,7 @@ func (cn *Conn) SetInitConnFunc(fn func(context.Context, *Conn) error) {
347351
// ExecuteInitConn runs the stored connection initialization function if available.
348352
func (cn *Conn) ExecuteInitConn(ctx context.Context) error {
349353
if cn.initConnFunc != nil {
350-
if err := cn.initConnFunc(ctx, cn); err != nil {
351-
return err
352-
}
353-
cn.Inited = true
354-
cn.setUsable(true) // Use atomic operation
355-
return nil
354+
return cn.initConnFunc(ctx, cn)
356355
}
357356
return fmt.Errorf("redis: no initConnFunc set for connection %d", cn.GetID())
358357
}
@@ -378,10 +377,10 @@ func (cn *Conn) GetNetConn() net.Conn {
378377
return cn.getNetConn()
379378
}
380379

381-
// SetNetConnWithInitConn replaces the underlying connection and executes the initialization.
382-
func (cn *Conn) SetNetConnWithInitConn(ctx context.Context, netConn net.Conn) error {
380+
// SetNetConnAndInitConn replaces the underlying connection and executes the initialization.
381+
func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error {
383382
// New connection is not initialized yet
384-
cn.Inited = false
383+
cn.Inited.Store(false)
385384
// Replace the underlying connection
386385
cn.SetNetConn(netConn)
387386
return cn.ExecuteInitConn(ctx)

internal/pool/pool.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,16 @@ type Options struct {
7676
ReadBufferSize int
7777
WriteBufferSize int
7878

79-
PoolFIFO bool
80-
PoolSize int
81-
DialTimeout time.Duration
82-
PoolTimeout time.Duration
83-
MinIdleConns int
84-
MaxIdleConns int
85-
MaxActiveConns int
86-
ConnMaxIdleTime time.Duration
87-
ConnMaxLifetime time.Duration
79+
PoolFIFO bool
80+
PoolSize int32
81+
DialTimeout time.Duration
82+
PoolTimeout time.Duration
83+
MinIdleConns int32
84+
MaxIdleConns int32
85+
MaxActiveConns int32
86+
ConnMaxIdleTime time.Duration
87+
ConnMaxLifetime time.Duration
88+
PushNotificationsEnabled bool
8889
}
8990

9091
type lastDialErrorWrap struct {
@@ -103,8 +104,9 @@ type ConnPool struct {
103104
conns []*Conn
104105
idleConns []*Conn
105106

106-
poolSize atomic.Int32
107-
idleConnsLen int
107+
poolSize atomic.Int32
108+
idleConnsLen atomic.Int32
109+
idleCheckInProgress atomic.Bool
108110

109111
stats Stats
110112
waitDurationNs atomic.Int64
@@ -161,23 +163,29 @@ func (p *ConnPool) RemovePoolHook(hook PoolHook) {
161163
}
162164

163165
func (p *ConnPool) checkMinIdleConns() {
166+
167+
if !p.idleCheckInProgress.CompareAndSwap(false, true) {
168+
return
169+
}
170+
defer p.idleCheckInProgress.Store(false)
171+
164172
if p.cfg.MinIdleConns == 0 {
165173
return
166174
}
167175

168176
// Only create idle connections if we haven't reached the total pool size limit
169177
// MinIdleConns should be a subset of PoolSize, not additional connections
170-
for p.poolSize.Load() < int32(p.cfg.PoolSize) && p.idleConnsLen < p.cfg.MinIdleConns {
178+
for p.poolSize.Load() < p.cfg.PoolSize && p.idleConnsLen.Load() < p.cfg.MinIdleConns {
171179
select {
172180
case p.queue <- struct{}{}:
173181
p.poolSize.Add(1)
174-
p.idleConnsLen++
182+
p.idleConnsLen.Add(1)
175183
go func() {
176184
defer func() {
177185
if err := recover(); err != nil {
178186
p.connsMu.Lock()
179187
p.poolSize.Add(-1)
180-
p.idleConnsLen--
188+
p.idleConnsLen.Add(-1)
181189
p.connsMu.Unlock()
182190

183191
p.freeTurn()
@@ -189,7 +197,7 @@ func (p *ConnPool) checkMinIdleConns() {
189197
if err != nil && err != ErrClosed {
190198
p.connsMu.Lock()
191199
p.poolSize.Add(-1)
192-
p.idleConnsLen--
200+
p.idleConnsLen.Add(-1)
193201
p.connsMu.Unlock()
194202
}
195203
p.freeTurn()
@@ -389,6 +397,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
389397
// Process connection using the hooks system
390398
if p.hookManager != nil {
391399
if err := p.hookManager.ProcessOnGet(ctx, cn, false); err != nil {
400+
log.Printf("redis: connection pool: failed to process idle connection by hook: %v", err)
392401
// Failed to process connection, discard it
393402
_ = p.CloseConn(cn)
394403
continue
@@ -490,7 +499,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
490499
attempts++
491500

492501
if cn.IsUsable() {
493-
p.idleConnsLen--
502+
p.idleConnsLen.Add(-1)
494503
break
495504
}
496505

@@ -543,28 +552,38 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
543552

544553
// If hooks say to remove the connection, do so
545554
if shouldRemove {
546-
p.Remove(ctx, cn, nil)
555+
p.Remove(ctx, cn, errors.New("hook requested removal"))
547556
return
548557
}
549558

550559
// If processor says not to pool the connection, remove it
551560
if !shouldPool {
552-
p.Remove(ctx, cn, nil)
561+
p.Remove(ctx, cn, errors.New("hook requested no pooling"))
553562
return
554563
}
555564

556565
if !cn.pooled {
557-
p.Remove(ctx, cn, nil)
566+
p.Remove(ctx, cn, errors.New("connection not pooled"))
558567
return
559568
}
560569

561570
var shouldCloseConn bool
562571

563572
p.connsMu.Lock()
564573

565-
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
566-
p.idleConns = append(p.idleConns, cn)
567-
p.idleConnsLen++
574+
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen.Load() < p.cfg.MaxIdleConns {
575+
// unusable conns are expected to become usable at some point (background process is reconnecting them)
576+
// put them at the opposite end of the queue
577+
if !cn.IsUsable() {
578+
if p.cfg.PoolFIFO {
579+
p.idleConns = append(p.idleConns, cn)
580+
} else {
581+
p.idleConns = append([]*Conn{cn}, p.idleConns...)
582+
}
583+
} else {
584+
p.idleConns = append(p.idleConns, cn)
585+
}
586+
p.idleConnsLen.Add(1)
568587
} else {
569588
p.removeConn(cn)
570589
shouldCloseConn = true
@@ -626,9 +645,9 @@ func (p *ConnPool) Len() int {
626645
// IdleLen returns number of idle connections.
627646
func (p *ConnPool) IdleLen() int {
628647
p.connsMu.Lock()
629-
n := p.idleConnsLen
648+
n := p.idleConnsLen.Load()
630649
p.connsMu.Unlock()
631-
return n
650+
return int(n)
632651
}
633652

634653
func (p *ConnPool) Stats() *Stats {
@@ -680,7 +699,7 @@ func (p *ConnPool) Close() error {
680699
p.conns = nil
681700
p.poolSize.Store(0)
682701
p.idleConns = nil
683-
p.idleConnsLen = 0
702+
p.idleConnsLen.Store(0)
684703
p.connsMu.Unlock()
685704

686705
return firstErr
@@ -696,12 +715,26 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
696715
return false
697716
}
698717

718+
cn.SetUsedAt(now)
699719
// Check basic connection health
700720
// Use GetNetConn() to safely access netConn and avoid data races
701721
if err := connCheck(cn.getNetConn()); err != nil {
702-
return false
722+
// If there's unexpected data, it might be push notifications (RESP3)
723+
// However, push notification processing is now handled by the client
724+
// before WithReader to ensure proper context is available to handlers
725+
if p.cfg.PushNotificationsEnabled && err == errUnexpectedRead {
726+
// we know that there is something in the buffer, so peek at the next reply type without
727+
// the potential to block
728+
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
729+
// For RESP3 connections with push notifications, we allow some buffered data
730+
// The client will process these notifications before using the connection
731+
internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client")
732+
return true // Connection is healthy, client will handle notifications
733+
}
734+
return false // Unexpected data, not push notifications, connection is unhealthy
735+
} else {
736+
return false
737+
}
703738
}
704-
705-
cn.SetUsedAt(now)
706739
return true
707740
}

internal/pool/pool_single.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pool
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
type SingleConnPool struct {
68
pool Pooler

0 commit comments

Comments
 (0)