Skip to content

Commit d40947a

Browse files
authored
replay: slow down decoding when there are too many pending commands (#701)
1 parent 31aa2e0 commit d40947a

File tree

20 files changed

+460
-177
lines changed

20 files changed

+460
-177
lines changed

pkg/metrics/metrics.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const (
3232
LabelMonitor = "monitor"
3333
LabelBackend = "backend"
3434
LabelTraffic = "traffic"
35+
LabelReplay = "replay"
3536
)
3637

3738
// MetricsManager manages metrics.
@@ -120,6 +121,8 @@ func init() {
120121
OutboundBytesCounter,
121122
OutboundPacketsCounter,
122123
CrossLocationBytesCounter,
124+
ReplayPendingCmdsGauge,
125+
ReplayWaitTime,
123126
}
124127
}
125128

pkg/metrics/replay.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package metrics
5+
6+
import "github.com/prometheus/client_golang/prometheus"
7+
8+
var (
9+
ReplayPendingCmdsGauge = prometheus.NewGauge(
10+
prometheus.GaugeOpts{
11+
Namespace: ModuleProxy,
12+
Subsystem: LabelReplay,
13+
Name: "pending_cmds",
14+
Help: "Counter of pending commands.",
15+
})
16+
17+
ReplayWaitTime = prometheus.NewGauge(
18+
prometheus.GaugeOpts{
19+
Namespace: ModuleProxy,
20+
Subsystem: LabelReplay,
21+
Name: "wait_time",
22+
Help: "Wait time of replaying commands.",
23+
})
24+
)

pkg/proxy/backend/mock_proxy_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ func (mc *mockCapture) Capture(packet []byte, startTime time.Time, connID uint64
145145
}
146146
}
147147

148-
func (mc *mockCapture) Progress() (float64, time.Time, error) {
149-
return 0, time.Time{}, nil
148+
func (mc *mockCapture) Progress() (float64, time.Time, bool, error) {
149+
return 0, time.Time{}, false, nil
150150
}
151151

152152
func (mc *mockCapture) Close() {

pkg/sqlreplay/capture/capture.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Capture interface {
4444
// Capture captures traffic
4545
Capture(packet []byte, startTime time.Time, connID uint64, initSession func() (string, error))
4646
// Progress returns the progress of the capture job
47-
Progress() (float64, time.Time, error)
47+
Progress() (float64, time.Time, bool, error)
4848
// Close closes the capture
4949
Close()
5050
}
@@ -235,9 +235,10 @@ func (c *capture) flushBuffer(bufCh <-chan *bytes.Buffer) {
235235
c.Lock()
236236
startTime := c.startTime
237237
capturedCmds := c.capturedCmds
238+
filteredCmds := c.filteredCmds
238239
c.Unlock()
239240
// Write meta outside of the lock to avoid affecting QPS.
240-
c.writeMeta(time.Since(startTime), capturedCmds)
241+
c.writeMeta(time.Since(startTime), capturedCmds, filteredCmds)
241242
}
242243

243244
func (c *capture) InitConn(startTime time.Time, connID uint64, db string) {
@@ -334,24 +335,24 @@ func (c *capture) putCommand(command *cmd.Command) bool {
334335
}
335336
}
336337

337-
func (c *capture) writeMeta(duration time.Duration, cmds uint64) {
338-
meta := store.Meta{Duration: duration, Cmds: cmds}
338+
func (c *capture) writeMeta(duration time.Duration, cmds, filteredCmds uint64) {
339+
meta := store.NewMeta(duration, cmds, filteredCmds)
339340
if err := meta.Write(c.cfg.Output); err != nil {
340341
c.lg.Error("failed to write meta", zap.Error(err))
341342
}
342343
}
343344

344-
func (c *capture) Progress() (float64, time.Time, error) {
345+
func (c *capture) Progress() (float64, time.Time, bool, error) {
345346
c.Lock()
346347
defer c.Unlock()
347348
if c.status == statusIdle || c.cfg.Duration == 0 {
348-
return c.progress, c.endTime, c.err
349+
return c.progress, c.endTime, true, c.err
349350
}
350351
progress := float64(time.Since(c.startTime)) / float64(c.cfg.Duration)
351352
if progress > 1 {
352353
progress = 1
353354
}
354-
return progress, c.endTime, c.err
355+
return progress, c.endTime, false, c.err
355356
}
356357

357358
// stopNoLock must be called after holding a lock.

pkg/sqlreplay/capture/capture_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,25 @@ func TestProgress(t *testing.T) {
170170

171171
now := time.Now()
172172
require.NoError(t, cpt.Start(cfg))
173-
progress, _, err := cpt.Progress()
173+
progress, _, done, err := cpt.Progress()
174174
require.NoError(t, err)
175175
require.Less(t, progress, 0.3)
176+
require.False(t, done)
176177

177178
setStartTime(now.Add(-5 * time.Second))
178-
progress, _, err = cpt.Progress()
179+
progress, _, _, err = cpt.Progress()
179180
require.NoError(t, err)
180181
require.GreaterOrEqual(t, progress, 0.5)
181182

182183
packet := append([]byte{pnet.ComQuery.Byte()}, []byte("select 1")...)
183184
cpt.Capture(packet, time.Now(), 100, mockInitSession)
184185
cpt.Stop(errors.Errorf("mock error"))
185186
cpt.wg.Wait()
186-
progress, _, err = cpt.Progress()
187+
progress, _, done, err = cpt.Progress()
187188
require.ErrorContains(t, err, "mock error")
188189
require.GreaterOrEqual(t, progress, 0.5)
189190
require.Less(t, progress, 1.0)
191+
require.True(t, done)
190192

191193
m := store.Meta{}
192194
require.NoError(t, m.Read(cfg.Output))

pkg/sqlreplay/cmd/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
)
2929

3030
type LineReader interface {
31+
String() string
3132
ReadLine() ([]byte, string, int, error)
3233
Read([]byte) (string, int, error)
3334
Close()

pkg/sqlreplay/cmd/mock_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,7 @@ func (mr *mockReader) Read(data []byte) (string, int, error) {
3939

4040
func (mr *mockReader) Close() {
4141
}
42+
43+
func (mr *mockReader) String() string {
44+
return "mockReader"
45+
}

pkg/sqlreplay/conn/conn.go

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,49 +7,66 @@ import (
77
"context"
88
"crypto/tls"
99
"encoding/binary"
10-
"time"
10+
"sync"
11+
"sync/atomic"
1112

12-
"github.com/pingcap/tiproxy/lib/util/errors"
13+
glist "github.com/bahlo/generic-list-go"
1314
"github.com/pingcap/tiproxy/pkg/manager/id"
1415
"github.com/pingcap/tiproxy/pkg/proxy/backend"
1516
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
1617
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
1718
"go.uber.org/zap"
1819
)
1920

20-
const (
21-
maxPendingCommands = 100 // pending commands for each connection
22-
)
21+
// ReplayStats record the statistics during replay. All connections share one ReplayStats and update it concurrently.
22+
type ReplayStats struct {
23+
// ReplayedCmds is the number of executed commands.
24+
ReplayedCmds atomic.Uint64
25+
// PendingCmds is the number of decoded but not executed commands.
26+
PendingCmds atomic.Int64
27+
}
28+
29+
func (s *ReplayStats) Reset() {
30+
s.ReplayedCmds.Store(0)
31+
s.PendingCmds.Store(0)
32+
}
2333

2434
type Conn interface {
2535
Run(ctx context.Context)
2636
ExecuteCmd(command *cmd.Command)
37+
Stop()
2738
}
2839

2940
type ConnCreator func(connID uint64) Conn
3041

3142
var _ Conn = (*conn)(nil)
3243

3344
type conn struct {
34-
cmdCh chan *cmd.Command
35-
exceptionCh chan<- Exception
36-
closeCh chan<- uint64
37-
lg *zap.Logger
38-
backendConn BackendConn
39-
connID uint64 // capture ID, not replay ID
45+
cmdLock sync.Mutex
46+
cmdCh chan struct{}
47+
cmdList *glist.List[*cmd.Command]
48+
exceptionCh chan<- Exception
49+
closeCh chan<- uint64
50+
lg *zap.Logger
51+
backendConn BackendConn
52+
connID uint64 // capture ID, not replay ID
53+
replayStats *ReplayStats
54+
lastPendingCmds int // last pending cmds reported to the stats
4055
}
4156

4257
func NewConn(lg *zap.Logger, username, password string, backendTLSConfig *tls.Config, hsHandler backend.HandshakeHandler,
43-
idMgr *id.IDManager, connID uint64, bcConfig *backend.BCConfig, exceptionCh chan<- Exception, closeCh chan<- uint64) *conn {
58+
idMgr *id.IDManager, connID uint64, bcConfig *backend.BCConfig, exceptionCh chan<- Exception, closeCh chan<- uint64, replayStats *ReplayStats) *conn {
4459
backendConnID := idMgr.NewID()
4560
lg = lg.With(zap.Uint64("captureID", connID), zap.Uint64("replayID", backendConnID))
4661
return &conn{
4762
lg: lg,
4863
connID: connID,
49-
cmdCh: make(chan *cmd.Command, maxPendingCommands),
64+
cmdList: glist.New[*cmd.Command](),
65+
cmdCh: make(chan struct{}, 1),
5066
exceptionCh: exceptionCh,
5167
closeCh: closeCh,
5268
backendConn: NewBackendConn(lg.Named("be"), backendConnID, hsHandler, bcConfig, backendTLSConfig, username, password),
69+
replayStats: replayStats,
5370
}
5471
}
5572

@@ -59,22 +76,42 @@ func (c *conn) Run(ctx context.Context) {
5976
c.exceptionCh <- NewOtherException(err, c.connID)
6077
return
6178
}
62-
for {
79+
// context is canceled when the replay is interrupted.
80+
// cmdCh is closed when the replay is finished.
81+
finished := false
82+
for !finished {
6383
select {
6484
case <-ctx.Done():
65-
// ctx is canceled when the replay is finished
6685
return
67-
case command := <-c.cmdCh:
68-
if err := c.backendConn.ExecuteCmd(ctx, command.Payload); err != nil {
86+
case _, ok := <-c.cmdCh:
87+
if !ok {
88+
finished = true
89+
}
90+
}
91+
for ctx.Err() == nil {
92+
c.cmdLock.Lock()
93+
pendingCmds := c.cmdList.Len()
94+
command := c.cmdList.Back()
95+
if command != nil {
96+
c.cmdList.Remove(command)
97+
}
98+
c.updatePendingCmds(pendingCmds)
99+
c.cmdLock.Unlock()
100+
if command == nil {
101+
break
102+
}
103+
if err := c.backendConn.ExecuteCmd(ctx, command.Value.Payload); err != nil {
69104
if pnet.IsDisconnectError(err) {
70105
c.exceptionCh <- NewOtherException(err, c.connID)
106+
c.lg.Debug("backend connection disconnected", zap.Error(err))
71107
return
72108
}
73-
if c.updateCmdForExecuteStmt(command) {
74-
c.exceptionCh <- NewFailException(err, command)
109+
if c.updateCmdForExecuteStmt(command.Value) {
110+
c.exceptionCh <- NewFailException(err, command.Value)
75111
}
76112
}
77-
if command.Type == pnet.ComQuit {
113+
c.replayStats.ReplayedCmds.Add(1)
114+
if command.Value.Type == pnet.ComQuit {
78115
return
79116
}
80117
}
@@ -99,23 +136,41 @@ func (c *conn) updateCmdForExecuteStmt(command *cmd.Command) bool {
99136
return true
100137
}
101138

102-
// ExecuteCmd executes a command asynchronously.
139+
// ExecuteCmd executes a command asynchronously by adding it to the list.
140+
// Adding commands should never block because it may cause cycle wait, so we don't use channels.
141+
// Conn A: wait for the lock held by conn B, and then its list becomes full and blocks the replay
142+
// Conn B: wait for next command, but the replay is blocked, so the lock won't be released
103143
func (c *conn) ExecuteCmd(command *cmd.Command) {
144+
c.cmdLock.Lock()
145+
c.cmdList.PushFront(command)
146+
pendingCmds := c.cmdList.Len()
147+
c.updatePendingCmds(pendingCmds)
148+
c.cmdLock.Unlock()
104149
select {
105-
case c.cmdCh <- command:
106-
case <-time.After(3 * time.Second):
107-
// If the replay is slower, wait until it catches up, otherwise too many transactions are broken.
108-
// But if it's blocked due to a bug, discard the command to avoid block the whole replay.
109-
// If the discarded command is a COMMIT, let the next COMMIT finish the transaction.
110-
select {
111-
case c.exceptionCh <- NewOtherException(errors.New("too many pending commands, discard command"), c.connID):
112-
default:
113-
c.lg.Warn("too many pending errors, discard error")
114-
}
150+
case c.cmdCh <- struct{}{}:
151+
default:
152+
}
153+
}
154+
155+
func (c *conn) Stop() {
156+
close(c.cmdCh)
157+
}
158+
159+
func (c *conn) updatePendingCmds(pendingCmds int) {
160+
diff := pendingCmds - c.lastPendingCmds
161+
c.lastPendingCmds = pendingCmds
162+
if diff != 0 {
163+
c.replayStats.PendingCmds.Add(int64(diff))
115164
}
116165
}
117166

118167
func (c *conn) close() {
168+
c.cmdLock.Lock()
169+
if c.cmdList.Len() > 0 {
170+
c.lg.Debug("backend connection closed while there are still pending commands", zap.Int("pending_cmds", c.cmdList.Len()))
171+
}
172+
c.updatePendingCmds(0)
173+
c.cmdLock.Unlock()
119174
c.backendConn.Close()
120175
c.closeCh <- c.connID
121176
}

pkg/sqlreplay/conn/conn_test.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestConnectError(t *testing.T) {
3939
var wg waitgroup.WaitGroup
4040
for i, test := range tests {
4141
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
42-
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
42+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh, &ReplayStats{})
4343
backendConn := &mockBackendConn{connErr: test.connErr, execErr: test.execErr}
4444
conn.backendConn = backendConn
4545
wg.RunWithRecover(func() {
@@ -60,23 +60,41 @@ func TestExecuteCmd(t *testing.T) {
6060
lg, _ := logger.CreateLoggerForTest(t)
6161
var wg waitgroup.WaitGroup
6262
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
63-
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
63+
stats := &ReplayStats{}
64+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh, stats)
6465
backendConn := &mockBackendConn{}
6566
conn.backendConn = backendConn
6667
childCtx, cancel := context.WithCancel(context.Background())
6768
wg.RunWithRecover(func() {
6869
conn.Run(childCtx)
6970
}, nil, lg)
70-
for i := 0; i < 100; i++ {
71+
cmds := 1000
72+
for i := 0; i < cmds; i++ {
7173
conn.ExecuteCmd(&cmd.Command{ConnID: 1, Type: pnet.ComFieldList})
7274
}
7375
require.Eventually(t, func() bool {
74-
return backendConn.cmds.Load() == 100
76+
return backendConn.cmds.Load() == int32(cmds)
7577
}, 3*time.Second, time.Millisecond)
78+
require.EqualValues(t, cmds, stats.ReplayedCmds.Load())
79+
require.EqualValues(t, 0, stats.PendingCmds.Load())
7680
cancel()
7781
wg.Wait()
7882
}
7983

84+
func TestStopExecution(t *testing.T) {
85+
lg, _ := logger.CreateLoggerForTest(t)
86+
var wg waitgroup.WaitGroup
87+
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
88+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh, &ReplayStats{})
89+
conn.backendConn = &mockBackendConn{}
90+
wg.RunWithRecover(func() {
91+
conn.Run(context.Background())
92+
}, nil, lg)
93+
conn.ExecuteCmd(&cmd.Command{ConnID: 1, Type: pnet.ComFieldList})
94+
conn.Stop()
95+
wg.Wait()
96+
}
97+
8098
func TestExecuteError(t *testing.T) {
8199
tests := []struct {
82100
prepare func(*mockBackendConn) []byte
@@ -106,7 +124,7 @@ func TestExecuteError(t *testing.T) {
106124
lg, _ := logger.CreateLoggerForTest(t)
107125
var wg waitgroup.WaitGroup
108126
exceptionCh, closeCh := make(chan Exception, 1), make(chan uint64, 1)
109-
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh)
127+
conn := NewConn(lg, "u1", "", nil, nil, id.NewIDManager(), 1, &backend.BCConfig{}, exceptionCh, closeCh, &ReplayStats{})
110128
backendConn := &mockBackendConn{execErr: errors.New("mock error")}
111129
conn.backendConn = backendConn
112130
childCtx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)