Skip to content

Commit 00d7eac

Browse files
authored
observer, net: read packet from SQL port in health check (#578)
1 parent aa36952 commit 00d7eac

File tree

4 files changed

+65
-20
lines changed

4 files changed

+65
-20
lines changed

pkg/balance/observer/health_check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh
8686
if err = conn.SetReadDeadline(time.Now().Add(dhc.cfg.DialTimeout)); err != nil {
8787
return err
8888
}
89+
if err = pnet.CheckSqlPort(conn); err != nil {
90+
return err
91+
}
8992
if ignoredErr := conn.Close(); ignoredErr != nil && !pnet.IsDisconnectError(ignoredErr) {
9093
dhc.logger.Warn("close connection in health check failed", zap.Error(ignoredErr))
9194
}

pkg/balance/observer/health_check_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"net"
1010
"net/http"
1111
"strings"
12+
"sync/atomic"
1213
"testing"
1314
"time"
1415

16+
"github.com/go-mysql-org/go-mysql/packet"
1517
"github.com/pingcap/tiproxy/lib/util/logger"
1618
"github.com/pingcap/tiproxy/lib/util/waitgroup"
1719
"github.com/pingcap/tiproxy/pkg/testkit"
@@ -75,6 +77,12 @@ func TestHealthCheck(t *testing.T) {
7577
health = hc.Check(context.Background(), backend.sqlAddr, info)
7678
require.True(t, health.Healthy)
7779

80+
backend.setSqlResp(false)
81+
health = hc.Check(context.Background(), backend.sqlAddr, info)
82+
require.False(t, health.Healthy)
83+
backend.setSqlResp(true)
84+
health = hc.Check(context.Background(), backend.sqlAddr, info)
85+
require.True(t, health.Healthy)
7886
backend.close()
7987
}
8088

@@ -88,6 +96,7 @@ type backendServer struct {
8896
wg waitgroup.WaitGroup
8997
ip string
9098
statusPort uint
99+
sqlResp atomic.Bool
91100
}
92101

93102
func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
@@ -97,6 +106,7 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
97106
backend.startHTTPServer()
98107
backend.setHTTPResp(true)
99108
backend.setHTTPRespBody("")
109+
backend.setSqlResp(true)
100110
backend.startSQLServer()
101111
return backend, &BackendInfo{
102112
IP: backend.ip,
@@ -133,6 +143,10 @@ func (srv *backendServer) stopHTTPServer() {
133143
require.NoError(srv.t, err)
134144
}
135145

146+
func (srv *backendServer) setSqlResp(sqlResp bool) {
147+
srv.sqlResp.Store(sqlResp)
148+
}
149+
136150
func (srv *backendServer) startSQLServer() {
137151
srv.sqlListener, srv.sqlAddr = testkit.StartListener(srv.t, srv.sqlAddr)
138152
srv.wg.Run(func() {
@@ -142,6 +156,11 @@ func (srv *backendServer) startSQLServer() {
142156
// listener is closed
143157
break
144158
}
159+
if srv.sqlResp.Load() {
160+
data := []byte{0, 0, 0, 0, 0}
161+
c := packet.NewConn(conn)
162+
require.NoError(srv.t, c.WritePacket(data))
163+
}
145164
_ = conn.Close()
146165
}
147166
})

pkg/proxy/net/mysql.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -358,32 +358,17 @@ func ParseChangeUser(data []byte, capability Capability) (*ChangeUserReq, error)
358358
return req, err
359359
}
360360

361-
// ReadServerVersion only reads server version.
362-
func ReadServerVersion(conn net.Conn) (string, error) {
361+
// CheckSqlPort checks whether the SQL port is available.
362+
func CheckSqlPort(conn net.Conn) error {
363363
c := packet.NewConn(conn)
364364
data, err := c.ReadPacket()
365365
if err != nil {
366-
return "", err
366+
return err
367367
}
368368
if data[0] == ErrHeader.Byte() {
369-
return "", errors.New("read initial handshake error")
369+
return errors.New("read initial handshake error")
370370
}
371-
pos := 1
372-
version := data[pos : pos+bytes.IndexByte(data[pos:], 0x00)]
373-
return string(version), nil
374-
}
375-
376-
// WriteServerVersion only writes server version. It's only used for testing.
377-
func WriteServerVersion(conn net.Conn, serverVersion string) error {
378-
data := make([]byte, 0, 128)
379-
data = append(data, []byte{0, 0, 0, 0}...)
380-
// min version 10
381-
data = append(data, 10)
382-
// server version[NUL]
383-
data = append(data, serverVersion...)
384-
data = append(data, 0)
385-
c := packet.NewConn(conn)
386-
return c.WritePacket(data)
371+
return nil
387372
}
388373

389374
// ParseOKPacket parses an OK packet and only returns server status.

pkg/proxy/net/mysql_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
package net
55

66
import (
7+
"net"
78
"testing"
89

910
"github.com/go-mysql-org/go-mysql/mysql"
11+
"github.com/go-mysql-org/go-mysql/packet"
1012
"github.com/pingcap/tiproxy/lib/util/errors"
1113
"github.com/pingcap/tiproxy/lib/util/logger"
14+
"github.com/pingcap/tiproxy/pkg/testkit"
1215
"github.com/stretchr/testify/require"
1316
)
1417

@@ -72,3 +75,38 @@ func TestMySQLError(t *testing.T) {
7275
require.True(t, errors.Is(errors.Wrap(ErrHandshakeTLS, myerr), ErrHandshakeTLS))
7376
require.True(t, errors.Is(errors.Wrap(myerr, ErrHandshakeTLS), ErrHandshakeTLS))
7477
}
78+
79+
func TestCheckSqlPort(t *testing.T) {
80+
// normal
81+
testkit.TestTCPConn(t,
82+
func(t *testing.T, c net.Conn) {
83+
err := CheckSqlPort(c)
84+
require.NoError(t, err)
85+
},
86+
func(t *testing.T, c net.Conn) {
87+
data := []byte{0, 0, 0, 0, 0}
88+
conn := packet.NewConn(c)
89+
require.NoError(t, conn.WritePacket(data))
90+
}, 1)
91+
92+
// no write
93+
testkit.TestTCPConn(t,
94+
func(t *testing.T, c net.Conn) {
95+
err := CheckSqlPort(c)
96+
require.Error(t, err)
97+
},
98+
func(t *testing.T, c net.Conn) {
99+
}, 1)
100+
101+
// write error code
102+
testkit.TestTCPConn(t,
103+
func(t *testing.T, c net.Conn) {
104+
err := CheckSqlPort(c)
105+
require.Error(t, err)
106+
},
107+
func(t *testing.T, c net.Conn) {
108+
data := []byte{0, 0, 0, 0, 0xff}
109+
conn := packet.NewConn(c)
110+
require.NoError(t, conn.WritePacket(data))
111+
}, 1)
112+
}

0 commit comments

Comments
 (0)