@@ -3,6 +3,7 @@ package pool
3
3
import (
4
4
"context"
5
5
"errors"
6
+ "fmt"
6
7
"log"
7
8
"net"
8
9
"sync"
@@ -51,6 +52,8 @@ type Stats struct {
51
52
TotalConns uint32 // number of total connections in the pool
52
53
IdleConns uint32 // number of idle connections in the pool
53
54
StaleConns uint32 // number of stale connections removed from the pool
55
+
56
+ PubSubStats PubSubStats
54
57
}
55
58
56
59
type Pooler interface {
@@ -104,7 +107,7 @@ type ConnPool struct {
104
107
queue chan struct {}
105
108
106
109
connsMu sync.Mutex
107
- conns [ ]* Conn
110
+ conns map [ uint64 ]* Conn
108
111
idleConns []* Conn
109
112
110
113
poolSize atomic.Int32
@@ -127,13 +130,10 @@ func NewConnPool(opt *Options) *ConnPool {
127
130
cfg : opt ,
128
131
129
132
queue : make (chan struct {}, opt .PoolSize ),
130
- conns : make ([ ]* Conn , 0 , opt . PoolSize ),
133
+ conns : make (map [ uint64 ]* Conn ),
131
134
idleConns : make ([]* Conn , 0 , opt .PoolSize ),
132
135
}
133
136
134
- // Initialize hooks system
135
- p .initializeHooks ()
136
-
137
137
// Only create MinIdleConns if explicitly requested (> 0)
138
138
// This avoids creating connections during pool initialization for tests
139
139
if opt .MinIdleConns > 0 {
@@ -156,17 +156,18 @@ func (p *ConnPool) AddPoolHook(hook PoolHook) {
156
156
p .initializeHooks ()
157
157
}
158
158
p .hookManager .AddHook (hook )
159
+ p .hookManager = nil
159
160
}
160
161
161
162
// RemovePoolHook removes a pool hook from the pool.
162
163
func (p * ConnPool ) RemovePoolHook (hook PoolHook ) {
163
164
if p .hookManager != nil {
164
165
p .hookManager .RemoveHook (hook )
165
166
}
167
+ p .hookManager = nil
166
168
}
167
169
168
170
func (p * ConnPool ) checkMinIdleConns () {
169
-
170
171
if ! p .idleCheckInProgress .CompareAndSwap (false , true ) {
171
172
return
172
173
}
@@ -186,10 +187,8 @@ func (p *ConnPool) checkMinIdleConns() {
186
187
go func () {
187
188
defer func () {
188
189
if err := recover (); err != nil {
189
- p .connsMu .Lock ()
190
190
p .poolSize .Add (- 1 )
191
191
p .idleConnsLen .Add (- 1 )
192
- p .connsMu .Unlock ()
193
192
194
193
p .freeTurn ()
195
194
internal .Logger .Printf (context .Background (), "addIdleConn panic: %+v" , err )
@@ -198,10 +197,8 @@ func (p *ConnPool) checkMinIdleConns() {
198
197
199
198
err := p .addIdleConn ()
200
199
if err != nil && err != ErrClosed {
201
- p .connsMu .Lock ()
202
200
p .poolSize .Add (- 1 )
203
201
p .idleConnsLen .Add (- 1 )
204
- p .connsMu .Unlock ()
205
202
}
206
203
p .freeTurn ()
207
204
}()
@@ -232,7 +229,7 @@ func (p *ConnPool) addIdleConn() error {
232
229
return ErrClosed
233
230
}
234
231
235
- p .conns = append ( p . conns , cn )
232
+ p .conns [ cn . GetID ()] = cn
236
233
p .idleConns = append (p .idleConns , cn )
237
234
return nil
238
235
}
@@ -250,12 +247,9 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
250
247
return nil , ErrClosed
251
248
}
252
249
253
- p .connsMu .Lock ()
254
250
if p .cfg .MaxActiveConns > 0 && p .poolSize .Load () >= int32 (p .cfg .MaxActiveConns ) {
255
- p .connsMu .Unlock ()
256
251
return nil , ErrPoolExhausted
257
252
}
258
- p .connsMu .Unlock ()
259
253
260
254
cn , err := p .dialConn (ctx , pooled )
261
255
if err != nil {
@@ -265,15 +259,14 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
265
259
// This is essential for normal pool operations
266
260
cn .SetUsable (true )
267
261
268
- p .connsMu .Lock ()
269
- defer p .connsMu .Unlock ()
270
-
271
262
if p .cfg .MaxActiveConns > 0 && p .poolSize .Load () > int32 (p .cfg .MaxActiveConns ) {
272
263
_ = cn .Close ()
273
264
return nil , ErrPoolExhausted
274
265
}
275
266
276
- p .conns = append (p .conns , cn )
267
+ p .connsMu .Lock ()
268
+ p .conns [cn .GetID ()] = cn
269
+ defer p .connsMu .Unlock ()
277
270
if pooled {
278
271
// If pool is full remove the cn on next Put.
279
272
currentPoolSize := p .poolSize .Load ()
@@ -307,6 +300,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
307
300
308
301
cn := NewConnWithBufferSize (netConn , p .cfg .ReadBufferSize , p .cfg .WriteBufferSize )
309
302
cn .pooled = pooled
303
+ fmt .Printf ("New conn %d, pooled: %v\n " , cn .GetID (), cn .pooled )
310
304
if p .cfg .ConnMaxLifetime > 0 {
311
305
cn .expiresAt = time .Now ().Add (p .cfg .ConnMaxLifetime )
312
306
} else {
@@ -372,7 +366,6 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
372
366
now := time .Now ()
373
367
attempts := 0
374
368
for {
375
-
376
369
if attempts >= getAttempts {
377
370
log .Printf ("redis: connection pool: failed to get an connection accepted by hook after %d attempts" , attempts )
378
371
break
@@ -477,6 +470,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
477
470
if p .closed () {
478
471
return nil , ErrClosed
479
472
}
473
+
480
474
n := len (p .idleConns )
481
475
if n == 0 {
482
476
return nil , nil
@@ -570,28 +564,30 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
570
564
571
565
var shouldCloseConn bool
572
566
573
- p .connsMu .Lock ()
574
-
575
567
if p .cfg .MaxIdleConns == 0 || p .idleConnsLen .Load () < p .cfg .MaxIdleConns {
576
568
// unusable conns are expected to become usable at some point (background process is reconnecting them)
577
569
// put them at the opposite end of the queue
578
570
if ! cn .IsUsable () {
579
571
if p .cfg .PoolFIFO {
572
+ p .connsMu .Lock ()
580
573
p .idleConns = append (p .idleConns , cn )
574
+ p .connsMu .Unlock ()
581
575
} else {
576
+ p .connsMu .Lock ()
582
577
p .idleConns = append ([]* Conn {cn }, p .idleConns ... )
578
+ p .connsMu .Unlock ()
583
579
}
584
580
} else {
581
+ p .connsMu .Lock ()
585
582
p .idleConns = append (p .idleConns , cn )
583
+ p .connsMu .Unlock ()
586
584
}
587
585
p .idleConnsLen .Add (1 )
588
586
} else {
589
- p .removeConn (cn )
587
+ p .removeConnWithLock (cn )
590
588
shouldCloseConn = true
591
589
}
592
590
593
- p .connsMu .Unlock ()
594
-
595
591
p .freeTurn ()
596
592
597
593
if shouldCloseConn {
@@ -600,6 +596,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
600
596
}
601
597
602
598
func (p * ConnPool ) Remove (_ context.Context , cn * Conn , reason error ) {
599
+ internal .Logger .Printf (context .Background (), "Removing connection %d from pool: %v" , cn .GetID (), reason )
603
600
p .removeConnWithLock (cn )
604
601
p .freeTurn ()
605
602
_ = p .closeConn (cn )
@@ -617,17 +614,7 @@ func (p *ConnPool) removeConnWithLock(cn *Conn) {
617
614
}
618
615
619
616
func (p * ConnPool ) removeConn (cn * Conn ) {
620
- for i , c := range p .conns {
621
- if c == cn {
622
- p .conns = append (p .conns [:i ], p .conns [i + 1 :]... )
623
- if cn .pooled {
624
- p .poolSize .Add (- 1 )
625
- // Immediately check for minimum idle connections when a pooled connection is removed
626
- p .checkMinIdleConns ()
627
- }
628
- break
629
- }
630
- }
617
+ delete (p .conns , cn .GetID ())
631
618
atomic .AddUint32 (& p .stats .StaleConns , 1 )
632
619
}
633
620
@@ -743,18 +730,12 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
743
730
func (p * ConnPool ) TrackConn (cn * Conn ) {
744
731
p .connsMu .Lock ()
745
732
p .poolSize .Add (1 )
746
- p .conns = append ( p . conns , cn )
733
+ p .conns [ cn . GetID ()] = cn
747
734
p .connsMu .Unlock ()
748
735
}
749
736
750
737
func (p * ConnPool ) UntrackConn (cn * Conn ) {
751
738
p .connsMu .Lock ()
752
- for i , c := range p .conns {
753
- if c == cn {
754
- p .conns = append (p .conns [:i ], p .conns [i + 1 :]... )
755
- p .poolSize .Add (- 1 )
756
- break
757
- }
758
- }
739
+ delete (p .conns , cn .GetID ())
759
740
p .connsMu .Unlock ()
760
741
}
0 commit comments