Skip to content

Commit ea69458

Browse files
committed
async create conn
1 parent d2ad801 commit ea69458

File tree

4 files changed

+291
-19
lines changed

4 files changed

+291
-19
lines changed

internal/pool/pool.go

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ type Options struct {
7272
ConnMaxIdleTime time.Duration
7373
ConnMaxLifetime time.Duration
7474

75+
MaxConcurrentDials int
76+
7577
ReadBufferSize int
7678
WriteBufferSize int
7779
}
@@ -80,13 +82,43 @@ type lastDialErrorWrap struct {
8082
err error
8183
}
8284

85+
type wantConn struct {
86+
mu sync.Mutex // protects ctx, done and sending of the result
87+
ctx context.Context // context for dial, cleared after delivered or canceled
88+
cancelCtx context.CancelFunc
89+
done bool // true after delivered or canceled
90+
result chan wantConnResult // channel to deliver connection or error
91+
}
92+
93+
func (w *wantConn) tryDeliver(cn *Conn, err error) bool {
94+
w.mu.Lock()
95+
defer w.mu.Unlock()
96+
if w.done {
97+
return false
98+
}
99+
100+
w.done = true
101+
w.ctx = nil
102+
103+
w.result <- wantConnResult{cn: cn, err: err}
104+
close(w.result)
105+
106+
return true
107+
}
108+
109+
type wantConnResult struct {
110+
cn *Conn
111+
err error
112+
}
113+
83114
type ConnPool struct {
84115
cfg *Options
85116

86117
dialErrorsNum uint32 // atomic
87118
lastDialError atomic.Value
88119

89-
queue chan struct{}
120+
queue chan struct{}
121+
dialsInProgress chan struct{}
90122

91123
connsMu sync.Mutex
92124
conns []*Conn
@@ -107,7 +139,9 @@ func NewConnPool(opt *Options) *ConnPool {
107139
p := &ConnPool{
108140
cfg: opt,
109141

110-
queue: make(chan struct{}, opt.PoolSize),
142+
queue: make(chan struct{}, opt.PoolSize),
143+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
144+
111145
conns: make([]*Conn, 0, opt.PoolSize),
112146
idleConns: make([]*Conn, 0, opt.PoolSize),
113147
}
@@ -316,13 +350,46 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
316350

317351
atomic.AddUint32(&p.stats.Misses, 1)
318352

319-
newcn, err := p.newConn(ctx, true)
320-
if err != nil {
353+
return p.asyncNewConn(ctx)
354+
}
355+
356+
func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
357+
// First try to acquire permission to create a connection
358+
select {
359+
case p.dialsInProgress <- struct{}{}:
360+
// Got permission, proceed to create connection
361+
case <-ctx.Done():
321362
p.freeTurn()
322-
return nil, err
363+
return nil, ctx.Err()
364+
}
365+
366+
dialCtx, cancel := context.WithTimeout(ctx, p.cfg.DialTimeout)
367+
368+
w := &wantConn{
369+
ctx: dialCtx,
370+
cancelCtx: cancel,
371+
result: make(chan wantConnResult, 1),
323372
}
324373

325-
return newcn, nil
374+
go func(w *wantConn) {
375+
defer w.cancelCtx()
376+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
377+
378+
cn, err := p.newConn(w.ctx, true)
379+
delivered := w.tryDeliver(cn, err)
380+
if err == nil && !delivered {
381+
p.Put(w.ctx, cn)
382+
} else {
383+
p.freeTurn()
384+
}
385+
}(w)
386+
387+
select {
388+
case <-ctx.Done():
389+
return nil, ctx.Err()
390+
case result := <-w.result:
391+
return result.cn, result.err
392+
}
326393
}
327394

328395
func (p *ConnPool) waitTurn(ctx context.Context) error {

internal/pool/pool_test.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool_test
22

33
import (
44
"context"
5+
"fmt"
56
"net"
67
"sync"
78
"testing"
@@ -377,8 +378,8 @@ var _ = Describe("race", func() {
377378
state := p.Stats()
378379
return state.TotalConns == 0 && state.IdleConns == 0 && p.QueueLen() == 0
379380
}, "3s", "50ms").Should(BeTrue())
380-
})
381-
381+
})
382+
382383
It("wait", func() {
383384
opt := &pool.Options{
384385
Dialer: func(ctx context.Context) (net.Conn, error) {
@@ -435,3 +436,80 @@ var _ = Describe("race", func() {
435436
Expect(stats.Timeouts).To(Equal(uint32(1)))
436437
})
437438
})
439+
440+
var _ = Describe("asyncNewConn", func() {
441+
ctx := context.Background()
442+
443+
It("should successfully create connection when pool is exhausted", func() {
444+
testPool := pool.NewConnPool(&pool.Options{
445+
Dialer: dummyDialer,
446+
PoolSize: 1,
447+
MaxConcurrentDials: 2,
448+
DialTimeout: 1 * time.Second,
449+
PoolTimeout: 1 * time.Second,
450+
})
451+
defer testPool.Close()
452+
453+
// Fill the pool
454+
conn1, err := testPool.Get(ctx)
455+
Expect(err).NotTo(HaveOccurred())
456+
Expect(conn1).NotTo(BeNil())
457+
458+
// This should trigger asyncNewConn since pool is exhausted
459+
conn2, err := testPool.Get(ctx)
460+
Expect(err).NotTo(HaveOccurred())
461+
Expect(conn2).NotTo(BeNil())
462+
463+
// Basic validation - we got two different connections
464+
Expect(conn1).NotTo(Equal(conn2))
465+
466+
// Close connections without Put to avoid queue issues
467+
_ = conn1.Close()
468+
_ = conn2.Close()
469+
})
470+
471+
It("should handle context cancellation", func() {
472+
testPool := pool.NewConnPool(&pool.Options{
473+
Dialer: dummyDialer,
474+
PoolSize: 1,
475+
MaxConcurrentDials: 1,
476+
DialTimeout: 1 * time.Second,
477+
PoolTimeout: 1 * time.Second,
478+
})
479+
defer testPool.Close()
480+
481+
// Get the only connection
482+
conn1, err := testPool.Get(ctx)
483+
Expect(err).NotTo(HaveOccurred())
484+
485+
// Create cancelled context
486+
cancelledCtx, cancel := context.WithCancel(ctx)
487+
cancel()
488+
489+
// This should fail immediately with context cancelled
490+
_, err = testPool.Get(cancelledCtx)
491+
Expect(err).To(Equal(context.Canceled))
492+
493+
_ = conn1.Close()
494+
})
495+
496+
It("should handle dial failures gracefully", func() {
497+
alwaysFailDialer := func(ctx context.Context) (net.Conn, error) {
498+
return nil, fmt.Errorf("dial failed")
499+
}
500+
501+
testPool := pool.NewConnPool(&pool.Options{
502+
Dialer: alwaysFailDialer,
503+
PoolSize: 1,
504+
MaxConcurrentDials: 1,
505+
DialTimeout: 1 * time.Second,
506+
PoolTimeout: 1 * time.Second,
507+
})
508+
defer testPool.Close()
509+
510+
// This call should fail
511+
_, err := testPool.Get(ctx)
512+
Expect(err).To(HaveOccurred())
513+
Expect(err.Error()).To(ContainSubstring("dial failed"))
514+
})
515+
})

options.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ type Options struct {
127127
// default: 3 seconds
128128
WriteTimeout time.Duration
129129

130+
// MaxConcurrentDials is the maximum number of concurrent connection creation goroutines.
131+
// If 0, defaults to PoolSize/4+1. If negative, unlimited goroutines (not recommended).
132+
MaxConcurrentDials int
133+
130134
// ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines.
131135
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
132136
ContextTimeoutEnabled bool
@@ -261,6 +265,16 @@ func (opt *Options) init() {
261265
if opt.PoolSize == 0 {
262266
opt.PoolSize = 10 * runtime.GOMAXPROCS(0)
263267
}
268+
if opt.MaxConcurrentDials == 0 {
269+
// Default to PoolSize/4+1, at least 1, at most PoolSize
270+
opt.MaxConcurrentDials = opt.PoolSize/4 + 1
271+
if opt.MaxConcurrentDials > opt.PoolSize {
272+
opt.MaxConcurrentDials = opt.PoolSize
273+
}
274+
} else if opt.MaxConcurrentDials < 0 {
275+
// Negative value means unlimited, set to PoolSize
276+
opt.MaxConcurrentDials = opt.PoolSize
277+
}
264278
if opt.ReadBufferSize == 0 {
265279
opt.ReadBufferSize = proto.DefaultBufferSize
266280
}
@@ -565,6 +579,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
565579
o.MinIdleConns = q.int("min_idle_conns")
566580
o.MaxIdleConns = q.int("max_idle_conns")
567581
o.MaxActiveConns = q.int("max_active_conns")
582+
o.MaxConcurrentDials = q.int("max_concurrent_dials")
568583
if q.has("conn_max_idle_time") {
569584
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
570585
} else {
@@ -609,16 +624,17 @@ func newConnPool(
609624
Dialer: func(ctx context.Context) (net.Conn, error) {
610625
return dialer(ctx, opt.Network, opt.Addr)
611626
},
612-
PoolFIFO: opt.PoolFIFO,
613-
PoolSize: opt.PoolSize,
614-
PoolTimeout: opt.PoolTimeout,
615-
DialTimeout: opt.DialTimeout,
616-
MinIdleConns: opt.MinIdleConns,
617-
MaxIdleConns: opt.MaxIdleConns,
618-
MaxActiveConns: opt.MaxActiveConns,
619-
ConnMaxIdleTime: opt.ConnMaxIdleTime,
620-
ConnMaxLifetime: opt.ConnMaxLifetime,
621-
ReadBufferSize: opt.ReadBufferSize,
622-
WriteBufferSize: opt.WriteBufferSize,
627+
PoolFIFO: opt.PoolFIFO,
628+
PoolSize: opt.PoolSize,
629+
MaxConcurrentDials: opt.MaxConcurrentDials,
630+
PoolTimeout: opt.PoolTimeout,
631+
DialTimeout: opt.DialTimeout,
632+
MinIdleConns: opt.MinIdleConns,
633+
MaxIdleConns: opt.MaxIdleConns,
634+
MaxActiveConns: opt.MaxActiveConns,
635+
ConnMaxIdleTime: opt.ConnMaxIdleTime,
636+
ConnMaxLifetime: opt.ConnMaxLifetime,
637+
ReadBufferSize: opt.ReadBufferSize,
638+
WriteBufferSize: opt.WriteBufferSize,
623639
})
624640
}

0 commit comments

Comments
 (0)