Skip to content

Commit 35f3c79

Browse files
authored
net, config: optimize read and write packets (#382)
1 parent a4d2de9 commit 35f3c79

File tree

16 files changed

+245
-91
lines changed

16 files changed

+245
-91
lines changed

conf/proxy.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
# 100 => accept as many as 100 connections.
2626
# max-connections = 0
2727

28+
# It's a tradeoff between memory and performance.
29+
# possible values:
30+
# 0 => default value
31+
# 1K to 16M
32+
# conn-buffer-size = 0
33+
2834
[api]
2935
# addr = "0.0.0.0:3080"
3036

lib/config/proxy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
var (
1717
ErrUnsupportedProxyProtocolVersion = errors.New("unsupported proxy protocol version")
18+
ErrInvalidConfigValue = errors.New("invalid config value")
1819
)
1920

2021
type Config struct {
@@ -46,6 +47,7 @@ type KeepAlive struct {
4647

4748
type ProxyServerOnline struct {
4849
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty"`
50+
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty"`
4951
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
5052
// BackendHealthyKeepalive applies when the observer treats the backend as healthy.
5153
// The config values should be conservative to save CPU and tolerate network fluctuation.
@@ -182,6 +184,9 @@ func (cfg *Config) Check() error {
182184
return errors.Wrapf(ErrUnsupportedProxyProtocolVersion, "%s", cfg.Proxy.ProxyProtocol)
183185
}
184186

187+
if cfg.Proxy.ConnBufferSize > 0 && (cfg.Proxy.ConnBufferSize > 16*1024*1024 || cfg.Proxy.ConnBufferSize < 1024) {
188+
return errors.Wrapf(ErrInvalidConfigValue, "conn-buffer-size must be between 1K and 16M")
189+
}
185190
return nil
186191
}
187192

lib/config/proxy_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ var testProxyConfig = Config{
2626
FrontendKeepalive: KeepAlive{Enabled: true},
2727
ProxyProtocol: "v2",
2828
GracefulWaitBeforeShutdown: 10,
29+
ConnBufferSize: 32 * 1024,
2930
},
3031
},
3132
API: API{
@@ -113,6 +114,12 @@ func TestProxyCheck(t *testing.T) {
113114
},
114115
err: ErrUnsupportedProxyProtocolVersion,
115116
},
117+
{
118+
pre: func(t *testing.T, c *Config) {
119+
c.Proxy.ConnBufferSize = 100 * 1024 * 1024
120+
},
121+
err: ErrInvalidConfigValue,
122+
},
116123
}
117124
for _, tc := range testcases {
118125
cfg := testProxyConfig

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,12 @@ const (
7171
)
7272

7373
type BCConfig struct {
74-
ProxyProtocol bool
75-
RequireBackendTLS bool
76-
CheckBackendInterval time.Duration
7774
HealthyKeepAlive config.KeepAlive
7875
UnhealthyKeepAlive config.KeepAlive
76+
CheckBackendInterval time.Duration
77+
ConnBufferSize int
78+
ProxyProtocol bool
79+
RequireBackendTLS bool
7980
}
8081

8182
func (cfg *BCConfig) check() {
@@ -219,7 +220,7 @@ func (mgr *BackendConnManager) getBackendIO(cctx ConnContext, auth *Authenticato
219220
// NOTE: should use DNS name as much as possible
220221
// Usually certs are signed with domain instead of IP addrs
221222
// And `RemoteAddr()` will return IP addr
222-
backendIO := pnet.NewPacketIO(cn, mgr.logger, pnet.WithRemoteAddr(addr, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))
223+
backendIO := pnet.NewPacketIO(cn, mgr.logger, mgr.config.ConnBufferSize, pnet.WithRemoteAddr(addr, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))
223224
mgr.backendIO.Store(backendIO)
224225
mgr.setKeepAlive(mgr.config.HealthyKeepAlive)
225226
return backendIO, nil
@@ -442,7 +443,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
442443
mgr.handshakeHandler.OnHandshake(mgr, rs.to, rs.err)
443444
return
444445
}
445-
newBackendIO := pnet.NewPacketIO(cn, mgr.logger, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))
446+
newBackendIO := pnet.NewPacketIO(cn, mgr.logger, mgr.config.ConnBufferSize, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))
446447

447448
if rs.err = mgr.authenticator.handshakeSecondTime(mgr.logger, mgr.clientIO, newBackendIO, mgr.backendTLS, sessionToken); rs.err == nil {
448449
rs.err = mgr.initSessionStates(newBackendIO, sessionStates)

pkg/proxy/backend/backend_conn_mgr_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (ts *backendMgrTester) firstHandshake4Proxy(clientIO, backendIO *pnet.Packe
125125
func (ts *backendMgrTester) handshake4Backend(packetIO *pnet.PacketIO) error {
126126
conn, err := ts.tc.backendListener.Accept()
127127
require.NoError(ts.t, err)
128-
ts.tc.backendIO = pnet.NewPacketIO(conn, ts.lg)
128+
ts.tc.backendIO = pnet.NewPacketIO(conn, ts.lg, pnet.DefaultConnBufferSize)
129129
return ts.mb.authenticate(ts.tc.backendIO)
130130
}
131131

@@ -404,7 +404,7 @@ func TestConnectFail(t *testing.T) {
404404
backend: func(_ *pnet.PacketIO) error {
405405
conn, err := ts.tc.backendListener.Accept()
406406
require.NoError(ts.t, err)
407-
ts.tc.backendIO = pnet.NewPacketIO(conn, ts.lg)
407+
ts.tc.backendIO = pnet.NewPacketIO(conn, ts.lg, pnet.DefaultConnBufferSize)
408408
ts.mb.authSucceed = false
409409
return ts.mb.authenticate(ts.tc.backendIO)
410410
},
@@ -448,7 +448,7 @@ func TestRedirectFail(t *testing.T) {
448448
require.NoError(t, err)
449449
conn, err := ts.tc.backendListener.Accept()
450450
require.NoError(t, err)
451-
tmpBackendIO := pnet.NewPacketIO(conn, ts.lg)
451+
tmpBackendIO := pnet.NewPacketIO(conn, ts.lg, pnet.DefaultConnBufferSize)
452452
// auth fails
453453
ts.mb.authSucceed = false
454454
err = ts.mb.authenticate(tmpBackendIO)
@@ -469,7 +469,7 @@ func TestRedirectFail(t *testing.T) {
469469
require.NoError(ts.t, err)
470470
conn, err := ts.tc.backendListener.Accept()
471471
require.NoError(ts.t, err)
472-
tmpBackendIO := pnet.NewPacketIO(conn, ts.lg)
472+
tmpBackendIO := pnet.NewPacketIO(conn, ts.lg, pnet.DefaultConnBufferSize)
473473
ts.mb.authSucceed = true
474474
err = ts.mb.authenticate(tmpBackendIO)
475475
require.NoError(t, err)

pkg/proxy/backend/common_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,23 @@ func (tc *tcpConnSuite) newConn(t *testing.T, enableRoute bool) func() {
5252
wg.Run(func() {
5353
conn, err := tc.backendListener.Accept()
5454
require.NoError(t, err)
55-
tc.backendIO = pnet.NewPacketIO(conn, lg)
55+
tc.backendIO = pnet.NewPacketIO(conn, lg, pnet.DefaultConnBufferSize)
5656
})
5757
}
5858
wg.Run(func() {
5959
if !enableRoute {
6060
backendConn, err := net.Dial("tcp", tc.backendListener.Addr().String())
6161
require.NoError(t, err)
62-
tc.proxyBIO = pnet.NewPacketIO(backendConn, lg)
62+
tc.proxyBIO = pnet.NewPacketIO(backendConn, lg, pnet.DefaultConnBufferSize)
6363
}
6464
clientConn, err := tc.proxyListener.Accept()
6565
require.NoError(t, err)
66-
tc.proxyCIO = pnet.NewPacketIO(clientConn, lg)
66+
tc.proxyCIO = pnet.NewPacketIO(clientConn, lg, pnet.DefaultConnBufferSize)
6767
})
6868
wg.Run(func() {
6969
conn, err := net.Dial("tcp", tc.proxyListener.Addr().String())
7070
require.NoError(t, err)
71-
tc.clientIO = pnet.NewPacketIO(conn, lg)
71+
tc.clientIO = pnet.NewPacketIO(conn, lg, pnet.DefaultConnBufferSize)
7272
})
7373
wg.Wait()
7474
return func() {
@@ -91,13 +91,13 @@ func (tc *tcpConnSuite) reconnectBackend(t *testing.T) {
9191
_ = tc.backendIO.Close()
9292
conn, err := tc.backendListener.Accept()
9393
require.NoError(t, err)
94-
tc.backendIO = pnet.NewPacketIO(conn, lg)
94+
tc.backendIO = pnet.NewPacketIO(conn, lg, pnet.DefaultConnBufferSize)
9595
})
9696
wg.Run(func() {
9797
_ = tc.proxyBIO.Close()
9898
backendConn, err := net.Dial("tcp", tc.backendListener.Addr().String())
9999
require.NoError(t, err)
100-
tc.proxyBIO = pnet.NewPacketIO(backendConn, lg)
100+
tc.proxyBIO = pnet.NewPacketIO(backendConn, lg, pnet.DefaultConnBufferSize)
101101
})
102102
wg.Wait()
103103
}

pkg/proxy/client/client_conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewClientConnection(logger *zap.Logger, conn net.Conn, frontendTLSConfig *t
3030
if bcConfig.ProxyProtocol {
3131
opts = append(opts, pnet.WithProxy)
3232
}
33-
pkt := pnet.NewPacketIO(conn, logger, opts...)
33+
pkt := pnet.NewPacketIO(conn, logger, bcConfig.ConnBufferSize, opts...)
3434
return &ClientConnection{
3535
logger: logger,
3636
frontendTLSConfig: frontendTLSConfig,

pkg/proxy/net/compress.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type compressedReadWriter struct {
6060
logger *zap.Logger
6161
rwStatus rwStatus
6262
zstdLevel zstd.EncoderLevel
63+
header []byte
6364
sequence uint8
6465
}
6566

@@ -70,6 +71,7 @@ func newCompressedReadWriter(rw packetReadWriter, algorithm CompressAlgorithm, z
7071
zstdLevel: zstd.EncoderLevelFromZstd(zstdLevel),
7172
logger: logger,
7273
rwStatus: rwNone,
74+
header: make([]byte, 7),
7375
}
7476
}
7577

@@ -100,7 +102,7 @@ func (crw *compressedReadWriter) Read(p []byte) (n int, err error) {
100102
}
101103
n, err = crw.readBuffer.Read(p)
102104
// Trade off between memory and efficiency.
103-
if n == len(p) && crw.readBuffer.Len() == 0 && crw.readBuffer.Cap() > defaultReaderSize {
105+
if n == len(p) && crw.readBuffer.Len() == 0 && crw.readBuffer.Cap() > DefaultConnBufferSize {
104106
crw.readBuffer = bytes.Buffer{}
105107
}
106108
return
@@ -110,18 +112,17 @@ func (crw *compressedReadWriter) Read(p []byte) (n int, err error) {
110112
// The format of the protocol: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_compression_packet.html
111113
func (crw *compressedReadWriter) readFromConn() error {
112114
var err error
113-
var header [7]byte
114-
if _, err = io.ReadFull(crw.packetReadWriter, header[:]); err != nil {
115+
if err = ReadFull(crw.packetReadWriter, crw.header); err != nil {
115116
return err
116117
}
117-
compressedSequence := header[3]
118+
compressedSequence := crw.header[3]
118119
if compressedSequence != crw.sequence {
119120
return ErrInvalidSequence.GenWithStack(
120121
"invalid compressed sequence, expected %d, actual %d", crw.sequence, compressedSequence)
121122
}
122123
crw.sequence++
123-
compressedLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
124-
uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16)
124+
compressedLength := int(uint32(crw.header[0]) | uint32(crw.header[1])<<8 | uint32(crw.header[2])<<16)
125+
uncompressedLength := int(uint32(crw.header[4]) | uint32(crw.header[5])<<8 | uint32(crw.header[6])<<16)
125126

126127
if uncompressedLength == 0 {
127128
// If the data is uncompressed, the uncompressed length is 0 and compressed length is the data length
@@ -134,7 +135,7 @@ func (crw *compressedReadWriter) readFromConn() error {
134135
// If the data is compressed, the compressed length is the length of data after the compressed header and
135136
// the uncompressed length is the length of data after decompression.
136137
data := make([]byte, compressedLength)
137-
if _, err = io.ReadFull(crw.packetReadWriter, data); err != nil {
138+
if err = ReadFull(crw.packetReadWriter, data); err != nil {
138139
return err
139140
}
140141
if err = crw.uncompress(data, uncompressedLength); err != nil {
@@ -173,7 +174,7 @@ func (crw *compressedReadWriter) Flush() error {
173174
return nil
174175
}
175176
// Trade off between memory and efficiency.
176-
if crw.writeBuffer.Cap() > defaultWriterSize {
177+
if crw.writeBuffer.Cap() > DefaultConnBufferSize {
177178
crw.writeBuffer = bytes.Buffer{}
178179
} else {
179180
crw.writeBuffer.Reset()
@@ -193,16 +194,15 @@ func (crw *compressedReadWriter) Flush() error {
193194
compressedLength = len(data)
194195
}
195196

196-
var compressedHeader [7]byte
197-
compressedHeader[0] = byte(compressedLength)
198-
compressedHeader[1] = byte(compressedLength >> 8)
199-
compressedHeader[2] = byte(compressedLength >> 16)
200-
compressedHeader[3] = crw.sequence
201-
compressedHeader[4] = byte(uncompressedLength)
202-
compressedHeader[5] = byte(uncompressedLength >> 8)
203-
compressedHeader[6] = byte(uncompressedLength >> 16)
197+
crw.header[0] = byte(compressedLength)
198+
crw.header[1] = byte(compressedLength >> 8)
199+
crw.header[2] = byte(compressedLength >> 16)
200+
crw.header[3] = crw.sequence
201+
crw.header[4] = byte(uncompressedLength)
202+
crw.header[5] = byte(uncompressedLength >> 8)
203+
crw.header[6] = byte(uncompressedLength >> 16)
204204
crw.sequence++
205-
if _, err = crw.packetReadWriter.Write(compressedHeader[:]); err != nil {
205+
if _, err = crw.packetReadWriter.Write(crw.header[:]); err != nil {
206206
return errors.WithStack(err)
207207
}
208208
if _, err = crw.packetReadWriter.Write(data); err != nil {

0 commit comments

Comments
 (0)