Skip to content

Commit 355a21a

Browse files
committed
change to share failed to delivery connection to other waiting
1 parent 8508f71 commit 355a21a

File tree

4 files changed

+646
-69
lines changed

4 files changed

+646
-69
lines changed

internal/pool/pool.go

Lines changed: 49 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -114,65 +114,6 @@ type lastDialErrorWrap struct {
114114
err error
115115
}
116116

117-
type wantConn struct {
118-
mu sync.Mutex // protects ctx, done and sending of the result
119-
ctx context.Context // context for dial, cleared after delivered or canceled
120-
cancelCtx context.CancelFunc
121-
done bool // true after delivered or canceled
122-
result chan wantConnResult // channel to deliver connection or error
123-
}
124-
125-
// getCtxForDial returns context for dial or nil if connection was delivered or canceled.
126-
func (w *wantConn) getCtxForDial() context.Context {
127-
w.mu.Lock()
128-
defer w.mu.Unlock()
129-
130-
return w.ctx
131-
}
132-
133-
func (w *wantConn) tryDeliver(cn *Conn, err error) bool {
134-
w.mu.Lock()
135-
defer w.mu.Unlock()
136-
if w.done {
137-
return false
138-
}
139-
140-
w.done = true
141-
w.ctx = nil
142-
143-
w.result <- wantConnResult{cn: cn, err: err}
144-
close(w.result)
145-
146-
return true
147-
}
148-
149-
func (w *wantConn) cancel(ctx context.Context, p *ConnPool) {
150-
w.mu.Lock()
151-
var cn *Conn
152-
if w.done {
153-
select {
154-
case result := <-w.result:
155-
cn = result.cn
156-
default:
157-
}
158-
} else {
159-
close(w.result)
160-
}
161-
162-
w.done = true
163-
w.ctx = nil
164-
w.mu.Unlock()
165-
166-
if cn != nil {
167-
p.Put(ctx, cn)
168-
}
169-
}
170-
171-
type wantConnResult struct {
172-
cn *Conn
173-
err error
174-
}
175-
176117
type ConnPool struct {
177118
cfg *Options
178119

@@ -181,6 +122,7 @@ type ConnPool struct {
181122

182123
queue chan struct{}
183124
dialsInProgress chan struct{}
125+
dialsQueue *wantConnQueue
184126

185127
connsMu sync.Mutex
186128
conns map[uint64]*Conn
@@ -209,6 +151,7 @@ func NewConnPool(opt *Options) *ConnPool {
209151
queue: make(chan struct{}, opt.PoolSize),
210152
conns: make(map[uint64]*Conn),
211153
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
154+
dialsQueue: newWantConnQueue(),
212155
idleConns: make([]*Conn, 0, opt.PoolSize),
213156
}
214157

@@ -288,6 +231,7 @@ func (p *ConnPool) checkMinIdleConns() {
288231
return
289232
}
290233
}
234+
291235
}
292236

293237
func (p *ConnPool) addIdleConn() error {
@@ -535,7 +479,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
535479

536480
atomic.AddUint32(&p.stats.Misses, 1)
537481

538-
newcn, err := p.asyncNewConn(ctx)
482+
newcn, err := p.queuedNewConn(ctx)
539483
if err != nil {
540484
return nil, err
541485
}
@@ -556,8 +500,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
556500
return newcn, nil
557501
}
558502

559-
func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
560-
// First try to acquire permission to create a connection
503+
func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
561504
select {
562505
case p.dialsInProgress <- struct{}{}:
563506
// Got permission, proceed to create connection
@@ -580,7 +523,19 @@ func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
580523
}
581524
}()
582525

526+
p.dialsQueue.enqueue(w)
527+
583528
go func(w *wantConn) {
529+
var freeTurnCalled bool
530+
defer func() {
531+
if err := recover(); err != nil {
532+
if !freeTurnCalled {
533+
p.freeTurn()
534+
}
535+
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
536+
}
537+
}()
538+
584539
defer w.cancelCtx()
585540
defer func() { <-p.dialsInProgress }() // Release connection creation permission
586541

@@ -590,9 +545,11 @@ func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
590545
if cnErr == nil && delivered {
591546
return
592547
} else if cnErr == nil && !delivered {
593-
p.Put(dialCtx, cn)
594-
} else { // freeTurn after error
548+
p.putIdleConn(dialCtx, cn)
549+
freeTurnCalled = true // putIdleConn 内部已调用 freeTurn
550+
} else {
595551
p.freeTurn()
552+
freeTurnCalled = true
596553
}
597554
}(w)
598555

@@ -606,6 +563,34 @@ func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
606563
}
607564
}
608565

566+
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
567+
for {
568+
w, ok := p.dialsQueue.dequeue()
569+
if !ok {
570+
break
571+
}
572+
if w.tryDeliver(cn, nil) {
573+
return
574+
}
575+
}
576+
577+
cn.SetUsable(true)
578+
579+
p.connsMu.Lock()
580+
defer p.connsMu.Unlock()
581+
582+
if p.closed() {
583+
_ = cn.Close()
584+
return
585+
}
586+
587+
// poolSize is increased in newConn
588+
p.idleConns = append(p.idleConns, cn)
589+
p.idleConnsLen.Add(1)
590+
591+
p.freeTurn()
592+
}
593+
609594
func (p *ConnPool) waitTurn(ctx context.Context) error {
610595
select {
611596
case <-ctx.Done():

internal/pool/pool_test.go

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ var _ = Describe("ConnPool", func() {
6060
Expect(connPool.Close()).NotTo(HaveOccurred())
6161
close(closedChan)
6262

63-
// We wait for 1 second and believe that checkMinIdleConns has been executed.
63+
// We wait for 1 second and believe that checkIdleConns has been executed.
6464
time.Sleep(time.Second)
6565

6666
Expect(connPool.Stats()).To(Equal(&pool.Stats{
@@ -519,7 +519,7 @@ func TestDialerRetryConfiguration(t *testing.T) {
519519
})
520520
}
521521

522-
var _ = Describe("asyncNewConn", func() {
522+
var _ = Describe("queuedNewConn", func() {
523523
ctx := context.Background()
524524

525525
It("should successfully create connection when pool is exhausted", func() {
@@ -607,7 +607,7 @@ var _ = Describe("asyncNewConn", func() {
607607

608608
It("should handle context cancellation while waiting for connection result", func() {
609609
// This test focuses on proper error handling when context is cancelled
610-
// during asyncNewConn execution (not testing connection reuse)
610+
// during queuedNewConn execution (not testing connection reuse)
611611

612612
slowDialer := func(ctx context.Context) (net.Conn, error) {
613613
// Simulate slow dialing
@@ -633,7 +633,7 @@ var _ = Describe("asyncNewConn", func() {
633633
defer cancel()
634634

635635
// This request should timeout while waiting for connection creation result
636-
// Testing the error handling path in asyncNewConn select statement
636+
// Testing the error handling path in queuedNewConn select statement
637637
done := make(chan struct{})
638638
var err2 error
639639
go func() {
@@ -781,7 +781,7 @@ var _ = Describe("asyncNewConn", func() {
781781
go func() {
782782
defer GinkgoRecover()
783783
defer close(done1)
784-
// This will trigger asyncNewConn since pool is full
784+
// This will trigger queuedNewConn since pool is full
785785
conn, err := testPool.Get(ctx)
786786
if err == nil {
787787
// Put connection back to pool after creation
@@ -822,6 +822,93 @@ var _ = Describe("asyncNewConn", func() {
822822

823823
testPool.Put(ctx, conn3)
824824
})
825+
826+
It("recover queuedNewConn panic", func() {
827+
opt := &pool.Options{
828+
Dialer: func(ctx context.Context) (net.Conn, error) {
829+
panic("test panic in queuedNewConn")
830+
},
831+
PoolSize: int32(10),
832+
MaxConcurrentDials: 10,
833+
DialTimeout: 1 * time.Second,
834+
PoolTimeout: 1 * time.Second,
835+
}
836+
testPool := pool.NewConnPool(opt)
837+
defer testPool.Close()
838+
839+
// Trigger queuedNewConn - calling Get() on empty pool will trigger it
840+
// Since dialer will panic, it should be handled by recover
841+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
842+
defer cancel()
843+
844+
// Try to get connections multiple times, each will trigger panic but should be properly recovered
845+
for i := 0; i < 3; i++ {
846+
conn, err := testPool.Get(ctx)
847+
// Connection should be nil, error should exist (panic converted to error)
848+
Expect(conn).To(BeNil())
849+
Expect(err).To(HaveOccurred())
850+
}
851+
852+
// Verify state after panic recovery:
853+
// - turn should be properly released (QueueLen() == 0)
854+
// - connection counts should be correct (TotalConns == 0, IdleConns == 0)
855+
Eventually(func() bool {
856+
stats := testPool.Stats()
857+
queueLen := testPool.QueueLen()
858+
return stats.TotalConns == 0 && stats.IdleConns == 0 && queueLen == 0
859+
}, "3s", "50ms").Should(BeTrue())
860+
})
861+
862+
It("should handle connection creation success but delivery failure (putIdleConn path)", func() {
863+
// This test covers the most important untested branch in queuedNewConn:
864+
// cnErr == nil && !delivered -> putIdleConn()
865+
866+
// Use slow dialer to ensure request times out before connection is ready
867+
slowDialer := func(ctx context.Context) (net.Conn, error) {
868+
// Delay long enough for client request to timeout first
869+
time.Sleep(300 * time.Millisecond)
870+
return newDummyConn(), nil
871+
}
872+
873+
testPool := pool.NewConnPool(&pool.Options{
874+
Dialer: slowDialer,
875+
PoolSize: 1,
876+
MaxConcurrentDials: 2,
877+
DialTimeout: 500 * time.Millisecond, // Long enough for dialer to complete
878+
PoolTimeout: 100 * time.Millisecond, // Client requests will timeout quickly
879+
})
880+
defer testPool.Close()
881+
882+
// Record initial idle connection count
883+
initialIdleConns := testPool.Stats().IdleConns
884+
885+
// Make a request that will timeout
886+
// This request will start queuedNewConn, create connection, but fail to deliver due to timeout
887+
shortCtx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
888+
defer cancel()
889+
890+
conn, err := testPool.Get(shortCtx)
891+
892+
// Request should fail due to timeout
893+
Expect(err).To(HaveOccurred())
894+
Expect(conn).To(BeNil())
895+
896+
// However, background queuedNewConn should continue and complete connection creation
897+
// Since it cannot deliver (request timed out), it should call putIdleConn to add connection to idle pool
898+
Eventually(func() bool {
899+
stats := testPool.Stats()
900+
return stats.IdleConns > initialIdleConns
901+
}, "1s", "50ms").Should(BeTrue())
902+
903+
// Verify the connection can indeed be used by subsequent requests
904+
conn2, err2 := testPool.Get(context.Background())
905+
Expect(err2).NotTo(HaveOccurred())
906+
Expect(conn2).NotTo(BeNil())
907+
Expect(conn2.IsUsable()).To(BeTrue())
908+
909+
// Cleanup
910+
testPool.Put(context.Background(), conn2)
911+
})
825912
})
826913

827914
func init() {

0 commit comments

Comments
 (0)