Skip to content

Commit a8a7665

Browse files
authored
feat(pool): add check for badConnection
* fix: badConn check(#2053) * fix: internalpool test * fix: sentinel test * fix: conncheck ut * fix: remove maxBadConnRetries * fix: add connCheck.deadline check Signed-off-by: monkey92t <[email protected]>
1 parent f5fbb36 commit a8a7665

File tree

10 files changed

+204
-8
lines changed

10 files changed

+204
-8
lines changed

internal/pool/conn_check.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
2+
3+
package pool
4+
5+
import (
6+
"errors"
7+
"io"
8+
"net"
9+
"syscall"
10+
"time"
11+
)
12+
13+
var errUnexpectedRead = errors.New("unexpected read from socket")
14+
15+
func connCheck(conn net.Conn) error {
16+
// Reset previous timeout.
17+
_ = conn.SetDeadline(time.Time{})
18+
19+
sysConn, ok := conn.(syscall.Conn)
20+
if !ok {
21+
return nil
22+
}
23+
rawConn, err := sysConn.SyscallConn()
24+
if err != nil {
25+
return err
26+
}
27+
28+
var sysErr error
29+
err = rawConn.Read(func(fd uintptr) bool {
30+
var buf [1]byte
31+
n, err := syscall.Read(int(fd), buf[:])
32+
switch {
33+
case n == 0 && err == nil:
34+
sysErr = io.EOF
35+
case n > 0:
36+
sysErr = errUnexpectedRead
37+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
38+
sysErr = nil
39+
default:
40+
sysErr = err
41+
}
42+
return true
43+
})
44+
if err != nil {
45+
return err
46+
}
47+
48+
return sysErr
49+
}

internal/pool/conn_check_dummy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
2+
3+
package pool
4+
5+
import "net"
6+
7+
func connCheck(conn net.Conn) error {
8+
return nil
9+
}

internal/pool/conn_check_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
2+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
3+
4+
package pool
5+
6+
import (
7+
"net"
8+
"net/http/httptest"
9+
"time"
10+
11+
. "github.com/onsi/ginkgo"
12+
. "github.com/onsi/gomega"
13+
)
14+
15+
var _ = Describe("tests conn_check with real conns", func() {
16+
var ts *httptest.Server
17+
var conn net.Conn
18+
var err error
19+
20+
BeforeEach(func() {
21+
ts = httptest.NewServer(nil)
22+
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
23+
Expect(err).NotTo(HaveOccurred())
24+
})
25+
26+
AfterEach(func() {
27+
ts.Close()
28+
})
29+
30+
It("good conn check", func() {
31+
Expect(connCheck(conn)).NotTo(HaveOccurred())
32+
33+
Expect(conn.Close()).NotTo(HaveOccurred())
34+
Expect(connCheck(conn)).To(HaveOccurred())
35+
})
36+
37+
It("bad conn check", func() {
38+
Expect(conn.Close()).NotTo(HaveOccurred())
39+
Expect(connCheck(conn)).To(HaveOccurred())
40+
})
41+
42+
It("check conn deadline", func() {
43+
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
44+
time.Sleep(time.Millisecond * 10)
45+
Expect(connCheck(conn)).NotTo(HaveOccurred())
46+
Expect(conn.Close()).NotTo(HaveOccurred())
47+
})
48+
})

internal/pool/export_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package pool
22

33
import (
4+
"net"
45
"time"
56
)
67

78
func (cn *Conn) SetCreatedAt(tm time.Time) {
89
cn.createdAt = tm
910
}
11+
12+
func (cn *Conn) NetConn() net.Conn {
13+
return cn.netConn
14+
}

internal/pool/main_test.go

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package pool_test
22

33
import (
44
"context"
5+
"fmt"
56
"net"
67
"sync"
8+
"syscall"
79
"testing"
10+
"time"
811

912
. "github.com/onsi/ginkgo"
1013
. "github.com/onsi/gomega"
@@ -32,5 +35,87 @@ func perform(n int, cbs ...func(int)) {
3235
}
3336

3437
func dummyDialer(context.Context) (net.Conn, error) {
35-
return &net.TCPConn{}, nil
38+
// return &net.TCPConn{}, nil
39+
return newDummyConn(), nil
40+
}
41+
42+
func newDummyConn() net.Conn {
43+
return &dummyConn{
44+
rawConn: &dummyRawConn{},
45+
}
46+
}
47+
48+
var _ net.Conn = (*dummyConn)(nil)
49+
var _ syscall.Conn = (*dummyConn)(nil)
50+
51+
type dummyConn struct {
52+
rawConn *dummyRawConn
53+
}
54+
55+
func (d *dummyConn) SyscallConn() (syscall.RawConn, error) {
56+
return d.rawConn, nil
57+
}
58+
59+
var errDummy = fmt.Errorf("dummyConn err")
60+
61+
func (d *dummyConn) Read(b []byte) (n int, err error) {
62+
return 0, errDummy
63+
}
64+
65+
func (d *dummyConn) Write(b []byte) (n int, err error) {
66+
return 0, errDummy
67+
}
68+
69+
func (d *dummyConn) Close() error {
70+
d.rawConn.Close()
71+
return nil
72+
}
73+
74+
func (d *dummyConn) LocalAddr() net.Addr {
75+
return &net.TCPAddr{}
76+
}
77+
78+
func (d *dummyConn) RemoteAddr() net.Addr {
79+
return &net.TCPAddr{}
80+
}
81+
82+
func (d *dummyConn) SetDeadline(t time.Time) error {
83+
return nil
84+
}
85+
86+
func (d *dummyConn) SetReadDeadline(t time.Time) error {
87+
return nil
88+
}
89+
90+
func (d *dummyConn) SetWriteDeadline(t time.Time) error {
91+
return nil
92+
}
93+
94+
var _ syscall.RawConn = (*dummyRawConn)(nil)
95+
96+
type dummyRawConn struct {
97+
closed bool
98+
mux sync.Mutex
99+
}
100+
101+
func (d *dummyRawConn) Control(f func(fd uintptr)) error {
102+
return nil
103+
}
104+
105+
func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error {
106+
d.mux.Lock()
107+
defer d.mux.Unlock()
108+
if d.closed {
109+
return fmt.Errorf("dummyRawConn closed")
110+
}
111+
return nil
112+
}
113+
114+
func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error {
115+
return nil
116+
}
117+
func (d *dummyRawConn) Close() {
118+
d.mux.Lock()
119+
d.closed = true
120+
d.mux.Unlock()
36121
}

internal/pool/pool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
542542

543543
func (p *ConnPool) isStaleConn(cn *Conn) bool {
544544
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
545-
return false
545+
return connCheck(cn.netConn) != nil
546546
}
547547

548548
now := time.Now()
@@ -553,5 +553,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
553553
return true
554554
}
555555

556-
return false
556+
return connCheck(cn.netConn) != nil
557557
}

internal/pool/pool_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ var _ = Describe("conns reaper", func() {
323323
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
324324
case "aged":
325325
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
326+
case "connCheck":
327+
_ = cn.Close()
326328
}
327329
conns = append(conns, cn)
328330
staleConns = append(staleConns, cn)
@@ -409,6 +411,7 @@ var _ = Describe("conns reaper", func() {
409411

410412
assert("idle")
411413
assert("aged")
414+
assert("connCheck")
412415
})
413416

414417
var _ = Describe("race", func() {

pool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ var _ = Describe("pool", func() {
8787
client.Pool().Put(ctx, cn)
8888

8989
err = client.Ping(ctx).Err()
90-
Expect(err).To(MatchError("bad connection"))
90+
Expect(err).NotTo(HaveOccurred())
9191

9292
val, err := client.Ping(ctx).Result()
9393
Expect(err).NotTo(HaveOccurred())

sentinel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
192192
err = master.Shutdown(ctx).Err()
193193
Expect(err).NotTo(HaveOccurred())
194194
Eventually(func() error {
195-
return sentinelMaster.Ping(ctx).Err()
195+
return master.Ping(ctx).Err()
196196
}, "15s", "100ms").Should(HaveOccurred())
197197

198198
// Check that client picked up new master.

tx_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ var _ = Describe("Tx", func() {
142142
return err
143143
}
144144

145-
err = do()
146-
Expect(err).To(MatchError("bad connection"))
147-
148145
err = do()
149146
Expect(err).NotTo(HaveOccurred())
150147
})

0 commit comments

Comments
 (0)