Skip to content

Commit e67b908

Browse files
authored
fix: prevent acquiring broken connections from the pool (#711)
Signed-off-by: Rueian <[email protected]>
1 parent e8f79a4 commit e67b908

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type pool struct {
3838

3939
func (p *pool) Acquire() (v wire) {
4040
p.cond.L.Lock()
41+
retry:
4142
for len(p.list) == 0 && p.size == p.cap && !p.down {
4243
p.cond.Wait()
4344
}
@@ -51,6 +52,11 @@ func (p *pool) Acquire() (v wire) {
5152
v = p.list[i]
5253
p.list[i] = nil
5354
p.list = p.list[:i]
55+
if v.Error() != nil {
56+
p.size--
57+
v.Close()
58+
goto retry
59+
}
5460
}
5561
p.cond.L.Unlock()
5662
return v

pool_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,34 @@ func TestPool(t *testing.T) {
4949
}
5050
})
5151

52+
t.Run("Reuse without broken connections", func(t *testing.T) {
53+
pool, count := setup(100)
54+
c1 := pool.Acquire()
55+
c2 := pool.Acquire()
56+
pool.Store(c1)
57+
pool.Store(c2)
58+
pool.cond.L.Lock()
59+
for _, p := range pool.list {
60+
p.Close()
61+
}
62+
pool.cond.L.Unlock()
63+
c3 := pool.Acquire()
64+
if c3.Error() != nil {
65+
t.Fatalf("c3.Error() is not nil")
66+
}
67+
if atomic.LoadInt32(count) != 3 {
68+
t.Fatalf("pool does not clean borken connections")
69+
}
70+
pool.cond.L.Lock()
71+
defer pool.cond.L.Unlock()
72+
if pool.size != 1 {
73+
t.Fatalf("pool size is not 1")
74+
}
75+
if len(pool.list) != 0 {
76+
t.Fatalf("pool list is not empty")
77+
}
78+
})
79+
5280
t.Run("NotExceed", func(t *testing.T) {
5381
conn := make([]wire, 100)
5482
pool, count := setup(len(conn))

0 commit comments

Comments
 (0)