Skip to content

Commit bd02c37

Browse files
committed
pubsub fixes
1 parent 0091c74 commit bd02c37

File tree

4 files changed

+27
-13
lines changed

4 files changed

+27
-13
lines changed

async_handoff_integration_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,27 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
7171
t.Fatalf("Failed to get connection: %v", err)
7272
}
7373

74-
// Set initialization function
74+
// Set initialization function with a small delay to ensure handoff is pending
7575
initConnCalled := false
7676
initConnFunc := func(ctx context.Context, cn *pool.Conn) error {
77+
time.Sleep(50 * time.Millisecond) // Add delay to keep handoff pending
7778
initConnCalled = true
7879
return nil
7980
}
8081
conn.SetInitConnFunc(initConnFunc)
8182

8283
// Mark connection for handoff
83-
conn.MarkForHandoff("new-endpoint:6379", 12345)
84+
err = conn.MarkForHandoff("new-endpoint:6379", 12345)
85+
if err != nil {
86+
t.Fatalf("Failed to mark connection for handoff: %v", err)
87+
}
8488

8589
// Return connection to pool - this should queue handoff
8690
testPool.Put(ctx, conn)
8791

92+
// Give the on-demand worker a moment to start processing
93+
time.Sleep(10 * time.Millisecond)
94+
8895
// Verify handoff was queued
8996
if !processor.IsHandoffPending(conn) {
9097
t.Error("Handoff should be queued in pending map")
@@ -303,14 +310,19 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
303310
t.Fatalf("Failed to mark connection for handoff: %v", err)
304311
}
305312

306-
// Set a mock initialization function
313+
// Set a mock initialization function with delay to ensure handoff is pending
307314
conn.SetInitConnFunc(func(ctx context.Context, cn *pool.Conn) error {
315+
time.Sleep(50 * time.Millisecond) // Add delay to keep handoff pending
308316
return nil
309317
})
310318

311319
testPool.Put(ctx, conn)
312320

313-
// Verify handoff was queued
321+
// Give the on-demand worker a moment to start and begin processing
322+
// The handoff should be pending because the slowDialer takes 100ms
323+
time.Sleep(10 * time.Millisecond)
324+
325+
// Verify handoff was queued and is being processed
314326
if !processor.IsHandoffPending(conn) {
315327
t.Error("Handoff should be queued in pending map")
316328
}

internal/pool/pool.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pool
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"log"
87
"net"
98
"sync"
@@ -156,7 +155,6 @@ func (p *ConnPool) AddPoolHook(hook PoolHook) {
156155
p.initializeHooks()
157156
}
158157
p.hookManager.AddHook(hook)
159-
p.hookManager = nil
160158
}
161159

162160
// RemovePoolHook removes a pool hook from the pool.
@@ -300,7 +298,6 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
300298

301299
cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize)
302300
cn.pooled = pooled
303-
fmt.Printf("New conn %d, pooled: %v\n", cn.GetID(), cn.pooled)
304301
if p.cfg.ConnMaxLifetime > 0 {
305302
cn.expiresAt = time.Now().Add(p.cfg.ConnMaxLifetime)
306303
} else {
@@ -596,7 +593,6 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
596593
}
597594

598595
func (p *ConnPool) Remove(_ context.Context, cn *Conn, reason error) {
599-
internal.Logger.Printf(context.Background(), "Removing connection %d from pool: %v", cn.GetID(), reason)
600596
p.removeConnWithLock(cn)
601597
p.freeTurn()
602598
_ = p.closeConn(cn)

osscluster.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,7 +1845,6 @@ func (c *ClusterClient) pubSub() *PubSub {
18451845
var node *clusterNode
18461846
pubsub := &PubSub{
18471847
opt: c.opt.clientOptions(),
1848-
18491848
newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) {
18501849
if node != nil {
18511850
panic("node != nil")
@@ -1861,14 +1860,19 @@ func (c *ClusterClient) pubSub() *PubSub {
18611860
if err != nil {
18621861
return nil, err
18631862
}
1864-
1865-
cn, err := node.Client.newConn(context.TODO())
1863+
cn, err := node.Client.pubSubPool.NewConn(ctx, node.Client.opt.Network, node.Client.opt.Addr, channels)
18661864
if err != nil {
18671865
node = nil
1868-
18691866
return nil, err
18701867
}
1871-
1868+
// will return nil if already initialized
1869+
err = node.Client.initConn(ctx, cn)
1870+
if err != nil {
1871+
_ = cn.Close()
1872+
node = nil
1873+
return nil, err
1874+
}
1875+
node.Client.pubSubPool.TrackConn(cn)
18721876
return cn, nil
18731877
},
18741878
closeConn: func(cn *pool.Conn) error {

sentinel.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
475475
rdb.pushProcessor = initializePushProcessor(opt)
476476

477477
rdb.connPool = newConnPool(opt, rdb.dialHook)
478+
rdb.pubSubPool = newPubSubPool(opt, rdb.dialHook)
478479

479480
rdb.onClose = rdb.wrappedOnClose(failover.Close)
480481

@@ -551,6 +552,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
551552
process: c.baseClient.process,
552553
})
553554
c.connPool = newConnPool(opt, c.dialHook)
555+
c.pubSubPool = newPubSubPool(opt, c.dialHook)
554556

555557
return c
556558
}

0 commit comments

Comments
 (0)