Skip to content

Commit 4471d26

Browse files
authored
id, replay: add id allocator for SQL server and replayer (#688)
1 parent 57a854f commit 4471d26

File tree

11 files changed

+103
-36
lines changed

11 files changed

+103
-36
lines changed

pkg/manager/id/manager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package id
5+
6+
import "sync/atomic"
7+
8+
// IDManager is used to generate unique ID concurrently.
9+
// SQLServer and Replay allocate backend connection ID conrrently, but the ID cannot overlap.
10+
type IDManager struct {
11+
id atomic.Uint64
12+
}
13+
14+
func NewIDManager() *IDManager {
15+
return &IDManager{}
16+
}
17+
18+
func (m *IDManager) NewID() uint64 {
19+
return m.id.Add(1)
20+
}

pkg/manager/id/manager_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package id
5+
6+
import (
7+
"sync"
8+
"testing"
9+
10+
"github.com/pingcap/tiproxy/lib/util/waitgroup"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestIDValue(t *testing.T) {
15+
var lock sync.Mutex
16+
var wg waitgroup.WaitGroup
17+
mgr := NewIDManager()
18+
m := make(map[uint64]struct{}, 1000)
19+
for i := 0; i < 10; i++ {
20+
wg.Run(func() {
21+
for j := 0; j < 100; j++ {
22+
id := mgr.NewID()
23+
lock.Lock()
24+
m[id] = struct{}{}
25+
lock.Unlock()
26+
}
27+
})
28+
}
29+
wg.Wait()
30+
31+
require.Equal(t, 1000, len(m))
32+
for i := 1; i <= 1000; i++ {
33+
require.Contains(t, m, uint64(i), "missing id: %d", i)
34+
}
35+
}

pkg/proxy/proxy.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/pingcap/tiproxy/lib/util/errors"
1515
"github.com/pingcap/tiproxy/lib/util/waitgroup"
1616
"github.com/pingcap/tiproxy/pkg/manager/cert"
17+
"github.com/pingcap/tiproxy/pkg/manager/id"
1718
"github.com/pingcap/tiproxy/pkg/metrics"
1819
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1920
"github.com/pingcap/tiproxy/pkg/proxy/client"
@@ -28,7 +29,6 @@ type serverState struct {
2829
healthyKeepAlive config.KeepAlive
2930
unhealthyKeepAlive config.KeepAlive
3031
clients map[uint64]*client.ClientConnection
31-
connID uint64
3232
maxConnections uint64
3333
connBufferSize int
3434
requireBackendTLS bool
@@ -43,6 +43,7 @@ type SQLServer struct {
4343
addrs []string
4444
logger *zap.Logger
4545
certMgr *cert.CertManager
46+
idMgr *id.IDManager
4647
hsHandler backend.HandshakeHandler
4748
cpt capture.Capture
4849
wg waitgroup.WaitGroup
@@ -52,15 +53,15 @@ type SQLServer struct {
5253
}
5354

5455
// NewSQLServer creates a new SQLServer.
55-
func NewSQLServer(logger *zap.Logger, cfg *config.Config, certMgr *cert.CertManager, cpt capture.Capture, hsHandler backend.HandshakeHandler) (*SQLServer, error) {
56+
func NewSQLServer(logger *zap.Logger, cfg *config.Config, certMgr *cert.CertManager, idMgr *id.IDManager, cpt capture.Capture, hsHandler backend.HandshakeHandler) (*SQLServer, error) {
5657
var err error
5758
s := &SQLServer{
5859
logger: logger,
5960
certMgr: certMgr,
61+
idMgr: idMgr,
6062
hsHandler: hsHandler,
6163
cpt: cpt,
6264
mu: serverState{
63-
connID: 0,
6465
clients: make(map[uint64]*client.ClientConnection),
6566
},
6667
}
@@ -150,8 +151,7 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
150151
return false, nil, 0, nil
151152
}
152153

153-
s.mu.connID++
154-
connID := s.mu.connID
154+
connID := s.idMgr.NewID()
155155
logger := s.logger.With(zap.Uint64("connID", connID), zap.String("client_addr", conn.RemoteAddr().String()),
156156
zap.String("addr", addr))
157157
clientConn := client.NewClientConnection(logger.Named("conn"), conn, s.certMgr.ServerSQLTLS(), s.certMgr.SQLTLS(),

pkg/proxy/proxy_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/pingcap/tiproxy/lib/util/waitgroup"
2020
"github.com/pingcap/tiproxy/pkg/balance/router"
2121
"github.com/pingcap/tiproxy/pkg/manager/cert"
22+
"github.com/pingcap/tiproxy/pkg/manager/id"
2223
"github.com/pingcap/tiproxy/pkg/metrics"
2324
"github.com/pingcap/tiproxy/pkg/proxy/backend"
2425
"github.com/pingcap/tiproxy/pkg/proxy/client"
@@ -31,7 +32,7 @@ func TestCreateConn(t *testing.T) {
3132
cfg := &config.Config{}
3233
certManager := cert.NewCertManager()
3334
require.NoError(t, certManager.Init(cfg, lg, nil))
34-
server, err := NewSQLServer(lg, cfg, certManager, nil, &mockHsHandler{})
35+
server, err := NewSQLServer(lg, cfg, certManager, id.NewIDManager(), nil, &mockHsHandler{})
3536
require.NoError(t, err)
3637
server.Run(context.Background(), nil)
3738
defer func() {
@@ -79,7 +80,7 @@ func TestGracefulCloseConn(t *testing.T) {
7980
},
8081
},
8182
}
82-
server, err := NewSQLServer(lg, cfg, nil, nil, hsHandler)
83+
server, err := NewSQLServer(lg, cfg, nil, id.NewIDManager(), nil, hsHandler)
8384
require.NoError(t, err)
8485
finish := make(chan struct{})
8586
go func() {
@@ -109,7 +110,7 @@ func TestGracefulCloseConn(t *testing.T) {
109110
}
110111

111112
// Graceful shutdown will be blocked if there are alive connections.
112-
server, err = NewSQLServer(lg, cfg, nil, nil, hsHandler)
113+
server, err = NewSQLServer(lg, cfg, nil, id.NewIDManager(), nil, hsHandler)
113114
require.NoError(t, err)
114115
clientConn := createClientConn()
115116
go func() {
@@ -135,7 +136,7 @@ func TestGracefulCloseConn(t *testing.T) {
135136

136137
// Graceful shutdown will shut down after GracefulCloseConnTimeout.
137138
cfg.Proxy.GracefulCloseConnTimeout = 1
138-
server, err = NewSQLServer(lg, cfg, nil, nil, hsHandler)
139+
server, err = NewSQLServer(lg, cfg, nil, id.NewIDManager(), nil, hsHandler)
139140
require.NoError(t, err)
140141
createClientConn()
141142
go func() {
@@ -163,7 +164,7 @@ func TestGracefulShutDown(t *testing.T) {
163164
},
164165
},
165166
}
166-
server, err := NewSQLServer(lg, cfg, certManager, nil, &mockHsHandler{})
167+
server, err := NewSQLServer(lg, cfg, certManager, id.NewIDManager(), nil, &mockHsHandler{})
167168
require.NoError(t, err)
168169
server.Run(context.Background(), nil)
169170

@@ -201,7 +202,7 @@ func TestMultiAddr(t *testing.T) {
201202
Proxy: config.ProxyServer{
202203
Addr: "0.0.0.0:0,0.0.0.0:0",
203204
},
204-
}, certManager, nil, &mockHsHandler{})
205+
}, certManager, id.NewIDManager(), nil, &mockHsHandler{})
205206
require.NoError(t, err)
206207
server.Run(context.Background(), nil)
207208

@@ -221,7 +222,7 @@ func TestWatchCfg(t *testing.T) {
221222
lg, _ := logger.CreateLoggerForTest(t)
222223
hsHandler := backend.NewDefaultHandshakeHandler(nil)
223224
cfgch := make(chan *config.Config)
224-
server, err := NewSQLServer(lg, &config.Config{}, nil, nil, hsHandler)
225+
server, err := NewSQLServer(lg, &config.Config{}, nil, id.NewIDManager(), nil, hsHandler)
225226
require.NoError(t, err)
226227
server.Run(context.Background(), cfgch)
227228
cfg := &config.Config{
@@ -256,7 +257,7 @@ func TestRecoverPanic(t *testing.T) {
256257
certManager := cert.NewCertManager()
257258
err := certManager.Init(&config.Config{}, lg, nil)
258259
require.NoError(t, err)
259-
server, err := NewSQLServer(lg, &config.Config{}, certManager, nil, &mockHsHandler{
260+
server, err := NewSQLServer(lg, &config.Config{}, certManager, id.NewIDManager(), nil, &mockHsHandler{
260261
handshakeResp: func(ctx backend.ConnContext, _ *pnet.HandshakeResp) error {
261262
if ctx.Value(backend.ConnContextKeyConnID).(uint64) == 1 {
262263
panic("HandleHandshakeResp panic")

pkg/server/server.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
1515
"github.com/pingcap/tiproxy/pkg/manager/cert"
1616
mgrcfg "github.com/pingcap/tiproxy/pkg/manager/config"
17+
"github.com/pingcap/tiproxy/pkg/manager/id"
1718
"github.com/pingcap/tiproxy/pkg/manager/infosync"
1819
"github.com/pingcap/tiproxy/pkg/manager/logger"
1920
mgrns "github.com/pingcap/tiproxy/pkg/manager/namespace"
@@ -161,16 +162,16 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error)
161162
} else {
162163
hsHandler = backend.NewDefaultHandshakeHandler(srv.namespaceManager)
163164
}
165+
idMgr := id.NewIDManager()
164166

165167
// setup capture and replay job manager
166168
{
167-
srv.replay = mgrrp.NewJobManager(lg.Named("replay"), srv.configManager.GetConfig(), srv.certManager, hsHandler)
169+
srv.replay = mgrrp.NewJobManager(lg.Named("replay"), srv.configManager.GetConfig(), srv.certManager, idMgr, hsHandler)
168170
}
169171

170172
// setup proxy server
171173
{
172-
173-
srv.proxy, err = proxy.NewSQLServer(lg.Named("proxy"), cfg, srv.certManager, srv.replay.GetCapture(), hsHandler)
174+
srv.proxy, err = proxy.NewSQLServer(lg.Named("proxy"), cfg, srv.certManager, idMgr, srv.replay.GetCapture(), hsHandler)
174175
if err != nil {
175176
return
176177
}

pkg/sqlreplay/conn/conn.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"encoding/binary"
1010

1111
"github.com/pingcap/tiproxy/lib/util/errors"
12+
"github.com/pingcap/tiproxy/pkg/manager/id"
1213
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1314
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
1415
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
@@ -34,18 +35,20 @@ type conn struct {
3435
closeCh chan<- uint64
3536
lg *zap.Logger
3637
backendConn BackendConn
37-
connID uint64 // frontend connection id
38+
connID uint64 // capture ID, not replay ID
3839
}
3940

40-
func NewConn(lg *zap.Logger, username, password string, backendTLSConfig *tls.Config, hsHandler backend.HandshakeHandler, connID uint64,
41-
bcConfig *backend.BCConfig, exceptionCh chan<- Exception, closeCh chan<- uint64) *conn {
41+
func NewConn(lg *zap.Logger, username, password string, backendTLSConfig *tls.Config, hsHandler backend.HandshakeHandler,
42+
idMgr *id.IDManager, connID uint64, bcConfig *backend.BCConfig, exceptionCh chan<- Exception, closeCh chan<- uint64) *conn {
43+
backendConnID := idMgr.NewID()
44+
lg = lg.With(zap.Uint64("captureID", connID), zap.Uint64("replayID", backendConnID))
4245
return &conn{
43-
lg: lg.With(zap.Uint64("connID", connID)),
46+
lg: lg,
4447
connID: connID,
4548
cmdCh: make(chan *cmd.Command, maxPendingCommands),
4649
exceptionCh: exceptionCh,
4750
closeCh: closeCh,
48-
backendConn: NewBackendConn(lg.Named("be"), connID, hsHandler, bcConfig, backendTLSConfig, username, password),
51+
backendConn: NewBackendConn(lg.Named("be"), backendConnID, hsHandler, bcConfig, backendTLSConfig, username, password),
4952
}
5053
}
5154

pkg/sqlreplay/conn/conn_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/pingcap/tiproxy/lib/util/errors"
1313
"github.com/pingcap/tiproxy/lib/util/logger"
1414
"github.com/pingcap/tiproxy/lib/util/waitgroup"
15+
"github.com/pingcap/tiproxy/pkg/manager/id"
1516
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1617
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
1718
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
@@ -38,7 +39,7 @@ func TestConnectError(t *testing.T) {
3839
var wg waitgroup.WaitGroup
3940
for i, test := range tests {
4041
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
41-
conn := NewConn(lg, "u1", "", nil, nil, 1, &backend.BCConfig{}, exceptionCh, closeCh)
42+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
4243
backendConn := &mockBackendConn{connErr: test.connErr, execErr: test.execErr}
4344
conn.backendConn = backendConn
4445
wg.RunWithRecover(func() {
@@ -59,7 +60,7 @@ func TestExecuteCmd(t *testing.T) {
5960
lg, _ := logger.CreateLoggerForTest(t)
6061
var wg waitgroup.WaitGroup
6162
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
62-
conn := NewConn(lg, "u1", "", nil, nil, 1, &backend.BCConfig{}, exceptionCh, closeCh)
63+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
6364
backendConn := &mockBackendConn{}
6465
conn.backendConn = backendConn
6566
childCtx, cancel := context.WithCancel(context.Background())
@@ -105,7 +106,7 @@ func TestExecuteError(t *testing.T) {
105106
lg, _ := logger.CreateLoggerForTest(t)
106107
var wg waitgroup.WaitGroup
107108
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
108-
conn := NewConn(lg, "u1", "", nil, nil, 1, &backend.BCConfig{}, exceptionCh, closeCh)
109+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
109110
backendConn := &mockBackendConn{execErr: errors.New("mock error")}
110111
conn.backendConn = backendConn
111112
childCtx, cancel := context.WithCancel(context.Background())

pkg/sqlreplay/manager/manager.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/pingcap/tiproxy/lib/config"
1212
"github.com/pingcap/tiproxy/lib/util/errors"
13+
"github.com/pingcap/tiproxy/pkg/manager/id"
1314
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1415
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
1516
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
@@ -42,11 +43,11 @@ type jobManager struct {
4243
lg *zap.Logger
4344
}
4445

45-
func NewJobManager(lg *zap.Logger, cfg *config.Config, certMgr CertManager, hsHandler backend.HandshakeHandler) *jobManager {
46+
func NewJobManager(lg *zap.Logger, cfg *config.Config, certMgr CertManager, idMgr *id.IDManager, hsHandler backend.HandshakeHandler) *jobManager {
4647
return &jobManager{
4748
lg: lg,
4849
capture: capture.NewCapture(lg.Named("capture")),
49-
replay: replay.NewReplay(lg.Named("replay")),
50+
replay: replay.NewReplay(lg.Named("replay"), idMgr),
5051
hsHandler: hsHandler,
5152
cfg: cfg,
5253
certManager: certMgr,

pkg/sqlreplay/manager/manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99

1010
"github.com/pingcap/tiproxy/lib/config"
1111
"github.com/pingcap/tiproxy/lib/util/errors"
12+
"github.com/pingcap/tiproxy/pkg/manager/id"
1213
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
1314
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
1415
"github.com/stretchr/testify/require"
1516
"go.uber.org/zap"
1617
)
1718

1819
func TestStartAndStop(t *testing.T) {
19-
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, nil)
20+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil)
2021
defer mgr.Close()
2122
cpt, rep := &mockCapture{}, &mockReplay{}
2223
mgr.capture, mgr.replay = cpt, rep
@@ -60,7 +61,7 @@ func TestMarshalJobHistory(t *testing.T) {
6061
require.NoError(t, err)
6162
endTime, err := time.Parse("2006-01-02 15:04:05", "2020-01-01 02:01:01")
6263
require.NoError(t, err)
63-
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, nil)
64+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil)
6465
mgr.jobHistory = []Job{
6566
&captureJob{
6667
job: job{

pkg/sqlreplay/replay/replay.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/pingcap/tiproxy/lib/util/errors"
1616
"github.com/pingcap/tiproxy/lib/util/waitgroup"
17+
"github.com/pingcap/tiproxy/pkg/manager/id"
1718
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1819
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
1920
"github.com/pingcap/tiproxy/pkg/sqlreplay/conn"
@@ -78,6 +79,7 @@ type replay struct {
7879
cfg ReplayConfig
7980
meta store.Meta
8081
conns map[uint64]conn.Conn
82+
idMgr *id.IDManager
8183
exceptionCh chan conn.Exception
8284
closeCh chan uint64
8385
wg waitgroup.WaitGroup
@@ -95,9 +97,10 @@ type replay struct {
9597
lg *zap.Logger
9698
}
9799

98-
func NewReplay(lg *zap.Logger) *replay {
100+
func NewReplay(lg *zap.Logger, idMgr *id.IDManager) *replay {
99101
return &replay{
100-
lg: lg,
102+
lg: lg,
103+
idMgr: idMgr,
101104
}
102105
}
103106

@@ -120,14 +123,14 @@ func (r *replay) Start(cfg ReplayConfig, backendTLSConfig *tls.Config, hsHandler
120123
r.connCreator = cfg.connCreator
121124
if r.connCreator == nil {
122125
r.connCreator = func(connID uint64) conn.Conn {
123-
return conn.NewConn(r.lg.Named("conn"), r.cfg.Username, r.cfg.Password, backendTLSConfig, hsHandler, connID, bcConfig, r.exceptionCh, r.closeCh)
126+
return conn.NewConn(r.lg.Named("conn"), r.cfg.Username, r.cfg.Password, backendTLSConfig, hsHandler, r.idMgr,
127+
connID, bcConfig, r.exceptionCh, r.closeCh)
124128
}
125129
}
126130
r.report = cfg.report
127131
if r.report == nil {
128132
backendConnCreator := func() conn.BackendConn {
129-
// TODO: allocate connection ID.
130-
return conn.NewBackendConn(r.lg.Named("be"), 1, hsHandler, bcConfig, backendTLSConfig, r.cfg.Username, r.cfg.Password)
133+
return conn.NewBackendConn(r.lg.Named("be"), r.idMgr.NewID(), hsHandler, bcConfig, backendTLSConfig, r.cfg.Username, r.cfg.Password)
131134
}
132135
r.report = report.NewReport(r.lg.Named("report"), r.exceptionCh, backendConnCreator)
133136
}

0 commit comments

Comments
 (0)