Skip to content

Commit 5ca4fc1

Browse files
authored
*: reduce memory allocation for plenty of idle connections (#474)
1 parent 25a51b9 commit 5ca4fc1

File tree

10 files changed

+57
-43
lines changed

10 files changed

+57
-43
lines changed

pkg/manager/router/router_score.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,14 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]*BackendHea
325325
}
326326

327327
func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {
328+
ticker := time.NewTicker(rebalanceInterval)
328329
for {
329-
router.rebalance(rebalanceConnsPerLoop)
330330
select {
331331
case <-ctx.Done():
332+
ticker.Stop()
332333
return
333-
case <-time.After(rebalanceInterval):
334+
case <-ticker.C:
335+
router.rebalance(rebalanceConnsPerLoop)
334336
}
335337
}
336338
}

pkg/proxy/backend/authenticator.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"context"
99
"crypto/tls"
1010
"encoding/binary"
11-
"fmt"
1211
"net"
1312
"strings"
1413

@@ -36,20 +35,24 @@ const SupportedServerCapabilities = pnet.ClientLongPassword | pnet.ClientFoundRo
3635

3736
// Authenticator handshakes with the client and the backend.
3837
type Authenticator struct {
38+
salt [20]byte
3939
dbname string // default database name
4040
user string
4141
attrs map[string]string
42-
salt []byte
4342
capability pnet.Capability
4443
zstdLevel int
4544
collation uint8
4645
proxyProtocol bool
4746
requireBackendTLS bool
4847
}
4948

50-
func (auth *Authenticator) String() string {
51-
return fmt.Sprintf("user:%s, dbname:%s, capability:%d, collation:%d",
52-
auth.user, auth.dbname, auth.capability, auth.collation)
49+
func NewAuthenticator(config *BCConfig) *Authenticator {
50+
auth := &Authenticator{
51+
proxyProtocol: config.ProxyProtocol,
52+
requireBackendTLS: config.RequireBackendTLS,
53+
}
54+
GenerateSalt(&auth.salt)
55+
return auth
5356
}
5457

5558
func (auth *Authenticator) writeProxyProtocol(clientIO, backendIO *pnet.PacketIO) error {

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,12 @@ type BackendConnManager struct {
132132
backendIO atomic.Pointer[pnet.PacketIO]
133133
backendTLS *tls.Config
134134
handshakeHandler HandshakeHandler
135-
ctxmap sync.Map
136-
connectionID uint64
137-
quitSource ErrorSource
135+
ctxmap struct {
136+
sync.Mutex
137+
m map[any]any
138+
}
139+
connectionID uint64
140+
quitSource ErrorSource
138141
}
139142

140143
// NewBackendConnManager creates a BackendConnManager.
@@ -146,16 +149,13 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler
146149
connectionID: connectionID,
147150
cmdProcessor: NewCmdProcessor(logger.Named("cp")),
148151
handshakeHandler: handshakeHandler,
149-
authenticator: &Authenticator{
150-
proxyProtocol: config.ProxyProtocol,
151-
requireBackendTLS: config.RequireBackendTLS,
152-
salt: GenerateSalt(20),
153-
},
152+
authenticator: NewAuthenticator(config),
154153
// There are 2 types of signals, which may be sent concurrently.
155154
signalReceived: make(chan signalType, signalTypeNums),
156155
redirectResCh: make(chan *redirectResult, 1),
157156
quitSource: SrcNone,
158157
}
158+
mgr.ctxmap.m = make(map[any]any)
159159
mgr.SetValue(ConnContextKeyConnID, connectionID)
160160
return mgr
161161
}
@@ -618,14 +618,15 @@ func (mgr *BackendConnManager) QuitSource() ErrorSource {
618618
}
619619

620620
func (mgr *BackendConnManager) SetValue(key, val any) {
621-
mgr.ctxmap.Store(key, val)
621+
mgr.ctxmap.Lock()
622+
mgr.ctxmap.m[key] = val
623+
mgr.ctxmap.Unlock()
622624
}
623625

624626
func (mgr *BackendConnManager) Value(key any) any {
625-
v, ok := mgr.ctxmap.Load(key)
626-
if !ok {
627-
return nil
628-
}
627+
mgr.ctxmap.Lock()
628+
v := mgr.ctxmap.m[key]
629+
mgr.ctxmap.Unlock()
629630
return v
630631
}
631632

pkg/proxy/backend/backend_conn_mgr_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"net"
1010
"strings"
11+
"sync"
1112
"sync/atomic"
1213
"testing"
1314
"time"
@@ -1217,3 +1218,22 @@ func TestCloseWhileGracefulClose(t *testing.T) {
12171218

12181219
ts.runTests(runners)
12191220
}
1221+
1222+
func BenchmarkSyncMap(b *testing.B) {
1223+
for i := 0; i < b.N; i++ {
1224+
var m sync.Map
1225+
m.Store("1", "1")
1226+
m.Load("1")
1227+
}
1228+
}
1229+
1230+
func BenchmarkLockedMap(b *testing.B) {
1231+
for i := 0; i < b.N; i++ {
1232+
m := make(map[string]string)
1233+
var lock sync.Mutex
1234+
lock.Lock()
1235+
m["1"] = "1"
1236+
_ = m["1"]
1237+
lock.Unlock()
1238+
}
1239+
}

pkg/proxy/backend/mock_backend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ import (
1313
)
1414

1515
type backendConfig struct {
16+
salt [20]byte
1617
tlsConfig *tls.Config
1718
authPlugin string
1819
sessionStates string
19-
salt []byte
2020
columns int
2121
loops int
2222
params int

pkg/proxy/backend/testsuite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
var (
3535
mockUsername = "test_user"
3636
mockDBName = "test_db"
37-
mockSalt = []byte("01234567890123456789")
37+
mockSalt = [20]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
3838
mockAuthData = []byte("123456")
3939
mockToken = strings.Repeat("t", 512)
4040
mockCmdStr = "str"

pkg/proxy/backend/util.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ func Uint32N(a uint64) uint64
1010

1111
// Buf generates a random string using ASCII characters but avoid separator character.
1212
// Ref https://github.com/mysql/mysql-server/blob/5.7/mysys_ssl/crypt_genhash_impl.cc#L435.
13-
func GenerateSalt(size int) []byte {
14-
buf := make([]byte, size)
13+
func GenerateSalt(buf *[20]byte) {
1514
for i := range buf {
1615
buf[i] = byte(Uint32N(127))
1716
for buf[i] == 0 || buf[i] == byte('$') {
1817
buf[i] = byte(Uint32N(127))
1918
}
2019
}
21-
return buf
2220
}

pkg/proxy/net/packetio.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,14 @@ type PacketIO struct {
195195
rawConn net.Conn
196196
readWriter packetReadWriter
197197
limitReader io.LimitedReader // reuse memory to reduce allocation
198-
header []byte // reuse memory to reduce allocation
199198
logger *zap.Logger
200199
remoteAddr net.Addr
201200
wrap error
201+
header [4]byte // reuse memory to reduce allocation
202202
}
203203

204204
func NewPacketIO(conn net.Conn, lg *zap.Logger, bufferSize int, opts ...PacketIOption) *PacketIO {
205205
p := &PacketIO{
206-
header: make([]byte, 4),
207206
rawConn: conn,
208207
logger: lg,
209208
readWriter: newBasicReadWriter(conn, bufferSize),
@@ -243,7 +242,7 @@ func (p *PacketIO) GetSequence() uint8 {
243242
}
244243

245244
func (p *PacketIO) readOnePacket() ([]byte, bool, error) {
246-
if err := ReadFull(p.readWriter, p.header); err != nil {
245+
if err := ReadFull(p.readWriter, p.header[:]); err != nil {
247246
return nil, false, errors.Wrap(ErrReadConn, err)
248247
}
249248
p.readWriter.SetSequence(p.header[3] + 1)
@@ -292,7 +291,7 @@ func (p *PacketIO) writeOnePacket(data []byte) (int, bool, error) {
292291
p.header[3] = sequence
293292
p.readWriter.SetSequence(sequence + 1)
294293

295-
if _, err := p.readWriter.Write(p.header); err != nil {
294+
if _, err := p.readWriter.Write(p.header[:]); err != nil {
296295
return 0, more, errors.Wrap(ErrWriteConn, err)
297296
}
298297

pkg/proxy/net/packetio_mysql.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,8 @@ var (
1717

1818
// WriteInitialHandshake writes an initial handshake as a server.
1919
// It's used for tenant-aware routing and testing.
20-
func (p *PacketIO) WriteInitialHandshake(capability Capability, salt []byte, authPlugin string, serverVersion string, connID uint64) error {
20+
func (p *PacketIO) WriteInitialHandshake(capability Capability, salt [20]byte, authPlugin string, serverVersion string, connID uint64) error {
2121
saltLen := len(salt)
22-
if saltLen < 8 {
23-
return ErrSaltNotLongEnough
24-
} else if saltLen > 20 {
25-
saltLen = 20
26-
}
27-
2822
data := make([]byte, 0, 128)
2923

3024
// min version 10
@@ -61,14 +55,14 @@ func (p *PacketIO) WriteInitialHandshake(capability Capability, salt []byte, aut
6155
}
6256

6357
// WriteSwitchRequest writes a switch request to the client. It's only for testing.
64-
func (p *PacketIO) WriteSwitchRequest(authPlugin string, salt []byte) error {
58+
func (p *PacketIO) WriteSwitchRequest(authPlugin string, salt [20]byte) error {
6559
length := 1 + len(authPlugin) + 1 + len(salt) + 1
6660
data := make([]byte, 0, length)
6761
// check https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase_packets_protocol_auth_switch_request.html
6862
data = append(data, byte(AuthSwitchHeader))
6963
data = append(data, authPlugin...)
7064
data = append(data, 0x00)
71-
data = append(data, salt...)
65+
data = append(data, salt[:]...)
7266
data = append(data, 0x00)
7367
return p.WritePacket(data, true)
7468
}

pkg/proxy/net/packetio_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestPacketIO(t *testing.T) {
7676
require.NoError(t, err)
7777
},
7878
func(t *testing.T, srv *PacketIO) {
79-
var salt [40]byte
79+
var salt [20]byte
8080
var msg []byte
8181
var err error
8282

@@ -95,10 +95,7 @@ func TestPacketIO(t *testing.T) {
9595
}
9696

9797
// send handshake
98-
require.NoError(t, srv.WriteInitialHandshake(0, salt[:], AuthNativePassword, ServerVersion, 100))
99-
// salt should not be long enough
100-
require.ErrorIs(t, srv.WriteInitialHandshake(0, make([]byte, 4), AuthNativePassword, ServerVersion, 100), ErrSaltNotLongEnough)
101-
98+
require.NoError(t, srv.WriteInitialHandshake(0, salt, AuthNativePassword, ServerVersion, 100))
10299
// expect correct and wrong capability flags
103100
_, isSSL, err := srv.ReadSSLRequestOrHandshakeResp()
104101
require.NoError(t, err)

0 commit comments

Comments
 (0)