Skip to content

[WIP] hitless #3447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 33 commits into
base: ndyakov/CAE-1088-resp3-notification-handlers
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
47bb58d
wip
ndyakov Aug 12, 2025
a1461e9
refactor manager
ndyakov Aug 14, 2025
478875d
refactor processor
ndyakov Aug 14, 2025
49e7df6
wip
ndyakov Aug 14, 2025
e3b7052
wip
ndyakov Aug 15, 2025
2f9f85a
wip
ndyakov Aug 15, 2025
a8ef7e9
wip
ndyakov Aug 15, 2025
6fb87ca
wip
ndyakov Aug 15, 2025
01844e6
wip
ndyakov Aug 15, 2025
e2d6be8
fix linter
ndyakov Aug 15, 2025
49a2e43
deadlock?
ndyakov Aug 15, 2025
3c40e22
deadlock?
ndyakov Aug 15, 2025
02c2447
wip
ndyakov Aug 15, 2025
30e5c82
wip review
ndyakov Aug 15, 2025
76bb698
still hunting this deadlock
ndyakov Aug 15, 2025
69569ea
still hunting this deadlock
ndyakov Aug 15, 2025
668f578
still hunting this deadlock
ndyakov Aug 15, 2025
c0d63fa
remove in separate goroutine
ndyakov Aug 15, 2025
b80f6d5
one more additional check for shutdown
ndyakov Aug 15, 2025
34edec2
what if those conns are tracked and closed on Close?
ndyakov Aug 15, 2025
a4758c2
add debug and make sure workers are not started after shutdown
ndyakov Aug 15, 2025
a4090d4
on demand workers
ndyakov Aug 15, 2025
0091c74
pubsub pool still outperforms track/untrack
ndyakov Aug 15, 2025
d3c3eb7
pubsub fixes
ndyakov Aug 16, 2025
ca73e4b
wip
ndyakov Aug 16, 2025
06ea120
wip
ndyakov Aug 16, 2025
374eb2d
wip
ndyakov Aug 16, 2025
08d746c
safe close of pools
ndyakov Aug 16, 2025
b35380a
safe creation on closed pool
ndyakov Aug 16, 2025
49f3f0d
fix error
ndyakov Aug 16, 2025
fedff39
potential data race?
ndyakov Aug 16, 2025
a938edb
clone the config
ndyakov Aug 16, 2025
8e73688
update example tests
ndyakov Aug 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions async_handoff_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &mockNetConn{addr: "original:6379"}, nil
},
PoolSize: 5,
PoolSize: int32(5),
PoolTimeout: time.Second,
})

Expand Down Expand Up @@ -144,8 +144,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &mockNetConn{addr: "original:6379"}, nil
},
PoolSize: 10,

PoolSize: int32(10),
PoolTimeout: time.Second,
})
defer testPool.Close()
Expand Down Expand Up @@ -216,8 +216,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &mockNetConn{addr: "original:6379"}, nil
},
PoolSize: 3,

PoolSize: int32(3),
PoolTimeout: time.Second,
})
defer testPool.Close()
Expand Down Expand Up @@ -279,8 +279,8 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
Dialer: func(ctx context.Context) (net.Conn, error) {
return &mockNetConn{addr: "original:6379"}, nil
},
PoolSize: 2,

PoolSize: int32(2),
PoolTimeout: time.Second,
})
defer testPool.Close()
Expand Down
5 changes: 2 additions & 3 deletions hitless/pool_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,14 @@ func (ph *PoolHook) GetScaleLevel() int {
return ph.scaleLevel
}


// IsHandoffPending returns true if the given connection has a pending handoff
func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool {
_, pending := ph.pending.Load(conn.GetID())
return pending
}

// OnGet is called when a connection is retrieved from the pool
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error {
func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, isNewConn bool) error {
// NOTE: There are two conditions to make sure we don't return a connection that should be handed off or is
// in a handoff state at the moment.

Expand Down Expand Up @@ -508,7 +507,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
oldConn := conn.GetNetConn()

// Replace the connection and execute initialization
err = conn.SetNetConnWithInitConn(ctx, newNetConn)
err = conn.SetNetConnAndInitConn(ctx, newNetConn)
if err != nil {
// Remove the connection from the pool since it's in a bad state
if pooler != nil {
Expand Down
7 changes: 4 additions & 3 deletions internal/pool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool_test

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
b.Run(bm.String(), func(b *testing.B) {
connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolSize: int32(bm.poolSize),
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
Expand Down Expand Up @@ -75,7 +76,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
b.Run(bm.String(), func(b *testing.B) {
connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolSize: int32(bm.poolSize),
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
Expand All @@ -89,7 +90,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
if err != nil {
b.Fatal(err)
}
connPool.Remove(ctx, cn, nil)
connPool.Remove(ctx, cn, errors.New("Bench test remove"))
}
})
})
Expand Down
8 changes: 4 additions & 4 deletions internal/pool/buffer_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ = Describe("Buffer Size Configuration", func() {
It("should use default buffer sizes when not specified", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolSize: int32(1),
PoolTimeout: 1000,
})

Expand All @@ -48,7 +48,7 @@ var _ = Describe("Buffer Size Configuration", func() {

connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolSize: int32(1),
PoolTimeout: 1000,
ReadBufferSize: customReadSize,
WriteBufferSize: customWriteSize,
Expand All @@ -69,7 +69,7 @@ var _ = Describe("Buffer Size Configuration", func() {
It("should handle zero buffer sizes by using defaults", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolSize: int32(1),
PoolTimeout: 1000,
ReadBufferSize: 0, // Should use default
WriteBufferSize: 0, // Should use default
Expand Down Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Buffer Size Configuration", func() {
// without setting ReadBufferSize and WriteBufferSize
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolSize: int32(1),
PoolTimeout: 1000,
// ReadBufferSize and WriteBufferSize are not set (will be 0)
})
Expand Down
19 changes: 9 additions & 10 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Conn struct {
// Only used for the brief period during SetNetConn and HasBufferedData/PeekReplyTypeSafe
readerMu sync.RWMutex

Inited bool
Inited atomic.Bool
pooled bool
createdAt time.Time
expiresAt time.Time
Expand Down Expand Up @@ -200,6 +200,10 @@ func (cn *Conn) IsUsable() bool {
return cn.isUsable()
}

func (cn *Conn) IsInited() bool {
return cn.Inited.Load()
}

// SetUsable sets the usable flag for the connection (lock-free).
func (cn *Conn) SetUsable(usable bool) {
cn.setUsable(usable)
Expand Down Expand Up @@ -347,12 +351,7 @@ func (cn *Conn) SetInitConnFunc(fn func(context.Context, *Conn) error) {
// ExecuteInitConn runs the stored connection initialization function if available.
func (cn *Conn) ExecuteInitConn(ctx context.Context) error {
if cn.initConnFunc != nil {
if err := cn.initConnFunc(ctx, cn); err != nil {
return err
}
cn.Inited = true
cn.setUsable(true) // Use atomic operation
return nil
return cn.initConnFunc(ctx, cn)
}
return fmt.Errorf("redis: no initConnFunc set for connection %d", cn.GetID())
}
Expand All @@ -378,10 +377,10 @@ func (cn *Conn) GetNetConn() net.Conn {
return cn.getNetConn()
}

// SetNetConnWithInitConn replaces the underlying connection and executes the initialization.
func (cn *Conn) SetNetConnWithInitConn(ctx context.Context, netConn net.Conn) error {
// SetNetConnAndInitConn replaces the underlying connection and executes the initialization.
func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error {
// New connection is not initialized yet
cn.Inited = false
cn.Inited.Store(false)
// Replace the underlying connection
cn.SetNetConn(netConn)
return cn.ExecuteInitConn(ctx)
Expand Down
89 changes: 61 additions & 28 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ type Options struct {
ReadBufferSize int
WriteBufferSize int

PoolFIFO bool
PoolSize int
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
PoolFIFO bool
PoolSize int32
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int32
MaxIdleConns int32
MaxActiveConns int32
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
PushNotificationsEnabled bool
}

type lastDialErrorWrap struct {
Expand All @@ -103,8 +104,9 @@ type ConnPool struct {
conns []*Conn
idleConns []*Conn

poolSize atomic.Int32
idleConnsLen int
poolSize atomic.Int32
idleConnsLen atomic.Int32
idleCheckInProgress atomic.Bool

stats Stats
waitDurationNs atomic.Int64
Expand Down Expand Up @@ -161,23 +163,29 @@ func (p *ConnPool) RemovePoolHook(hook PoolHook) {
}

func (p *ConnPool) checkMinIdleConns() {

if !p.idleCheckInProgress.CompareAndSwap(false, true) {
return
}
defer p.idleCheckInProgress.Store(false)

if p.cfg.MinIdleConns == 0 {
return
}

// Only create idle connections if we haven't reached the total pool size limit
// MinIdleConns should be a subset of PoolSize, not additional connections
for p.poolSize.Load() < int32(p.cfg.PoolSize) && p.idleConnsLen < p.cfg.MinIdleConns {
for p.poolSize.Load() < p.cfg.PoolSize && p.idleConnsLen.Load() < p.cfg.MinIdleConns {
select {
case p.queue <- struct{}{}:
p.poolSize.Add(1)
p.idleConnsLen++
p.idleConnsLen.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
p.connsMu.Lock()
p.poolSize.Add(-1)
p.idleConnsLen--
p.idleConnsLen.Add(-1)
p.connsMu.Unlock()

p.freeTurn()
Expand All @@ -189,7 +197,7 @@ func (p *ConnPool) checkMinIdleConns() {
if err != nil && err != ErrClosed {
p.connsMu.Lock()
p.poolSize.Add(-1)
p.idleConnsLen--
p.idleConnsLen.Add(-1)
p.connsMu.Unlock()
}
p.freeTurn()
Expand Down Expand Up @@ -389,6 +397,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
// Process connection using the hooks system
if p.hookManager != nil {
if err := p.hookManager.ProcessOnGet(ctx, cn, false); err != nil {
log.Printf("redis: connection pool: failed to process idle connection by hook: %v", err)
// Failed to process connection, discard it
_ = p.CloseConn(cn)
continue
Expand Down Expand Up @@ -490,7 +499,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
attempts++

if cn.IsUsable() {
p.idleConnsLen--
p.idleConnsLen.Add(-1)
break
}

Expand Down Expand Up @@ -543,28 +552,38 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {

// If hooks say to remove the connection, do so
if shouldRemove {
p.Remove(ctx, cn, nil)
p.Remove(ctx, cn, errors.New("hook requested removal"))
return
}

// If processor says not to pool the connection, remove it
if !shouldPool {
p.Remove(ctx, cn, nil)
p.Remove(ctx, cn, errors.New("hook requested no pooling"))
return
}

if !cn.pooled {
p.Remove(ctx, cn, nil)
p.Remove(ctx, cn, errors.New("connection not pooled"))
return
}

var shouldCloseConn bool

p.connsMu.Lock()

if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen.Load() < p.cfg.MaxIdleConns {
// unusable conns are expected to become usable at some point (background process is reconnecting them)
// put them at the opposite end of the queue
if !cn.IsUsable() {
if p.cfg.PoolFIFO {
p.idleConns = append(p.idleConns, cn)
} else {
p.idleConns = append([]*Conn{cn}, p.idleConns...)
}
} else {
p.idleConns = append(p.idleConns, cn)
}
p.idleConnsLen.Add(1)
} else {
p.removeConn(cn)
shouldCloseConn = true
Expand Down Expand Up @@ -626,9 +645,9 @@ func (p *ConnPool) Len() int {
// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
n := p.idleConnsLen.Load()
p.connsMu.Unlock()
return n
return int(n)
}

func (p *ConnPool) Stats() *Stats {
Expand Down Expand Up @@ -680,7 +699,7 @@ func (p *ConnPool) Close() error {
p.conns = nil
p.poolSize.Store(0)
p.idleConns = nil
p.idleConnsLen = 0
p.idleConnsLen.Store(0)
p.connsMu.Unlock()

return firstErr
Expand All @@ -696,12 +715,26 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
return false
}

cn.SetUsedAt(now)
// Check basic connection health
// Use GetNetConn() to safely access netConn and avoid data races
if err := connCheck(cn.getNetConn()); err != nil {
return false
// If there's unexpected data, it might be push notifications (RESP3)
// However, push notification processing is now handled by the client
// before WithReader to ensure proper context is available to handlers
if p.cfg.PushNotificationsEnabled && err == errUnexpectedRead {
// we know that there is something in the buffer, so peek at the next reply type without
// the potential to block
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For RESP3 connections with push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client")
return true // Connection is healthy, client will handle notifications
}
return false // Unexpected data, not push notifications, connection is unhealthy
} else {
return false
}
}

cn.SetUsedAt(now)
return true
}
4 changes: 3 additions & 1 deletion internal/pool/pool_single.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pool

import "context"
import (
"context"
)

type SingleConnPool struct {
pool Pooler
Expand Down
Loading
Loading