Skip to content

Commit ed64d5f

Browse files
authored
sqlreplay, api: add job start time to the report and API parameters (#724)
1 parent ad0e87c commit ed64d5f

File tree

16 files changed

+237
-84
lines changed

16 files changed

+237
-84
lines changed

pkg/server/api/server_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ type httpOpts struct {
2525
header map[string]string
2626
}
2727

28-
func createServer(t *testing.T) (*Server, func(t *testing.T, method string, path string, opts httpOpts, f func(*testing.T, *http.Response))) {
28+
type doHTTPFunc func(t *testing.T, method string, path string, opts httpOpts, f func(*testing.T, *http.Response))
29+
30+
func createServer(t *testing.T) (*Server, doHTTPFunc) {
2931
lg, _ := logger.CreateLoggerForTest(t)
3032
ready := atomic.NewBool(true)
3133
cfgmgr := mgrcfg.NewConfigManager()

pkg/server/api/traffic.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ func (h *Server) TrafficCapture(c *gin.Context) {
4545
}
4646
cfg.Compress = compress
4747
cfg.KeyFile = h.mgr.CfgMgr.GetConfig().Security.Encryption.KeyPath
48+
if startTimeStr := c.PostForm("start-time"); startTimeStr != "" {
49+
startTime, err := time.Parse(time.RFC3339, startTimeStr)
50+
if err != nil {
51+
c.String(http.StatusBadRequest, err.Error())
52+
return
53+
}
54+
cfg.StartTime = startTime
55+
} else {
56+
cfg.StartTime = time.Now()
57+
}
4858

4959
if err := h.mgr.ReplayJobMgr.StartCapture(cfg); err != nil {
5060
c.String(http.StatusInternalServerError, err.Error())
@@ -64,6 +74,16 @@ func (h *Server) TrafficReplay(c *gin.Context) {
6474
}
6575
cfg.Speed = speed
6676
}
77+
if startTimeStr := c.PostForm("start-time"); startTimeStr != "" {
78+
startTime, err := time.Parse(time.RFC3339, startTimeStr)
79+
if err != nil {
80+
c.String(http.StatusBadRequest, err.Error())
81+
return
82+
}
83+
cfg.StartTime = startTime
84+
} else {
85+
cfg.StartTime = time.Now()
86+
}
6787
cfg.Username = c.PostForm("username")
6888
cfg.Password = c.PostForm("password")
6989
cfg.ReadOnly = strings.EqualFold(c.PostForm("readonly"), "true")

pkg/server/api/traffic_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func TestTraffic(t *testing.T) {
2121
server, doHTTP := createServer(t)
2222
mgr := server.mgr.ReplayJobMgr.(*mockReplayJobManager)
2323

24+
// parse duration error
2425
doHTTP(t, http.MethodPost, "/api/traffic/capture", httpOpts{
2526
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "10"}),
2627
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
@@ -30,6 +31,17 @@ func TestTraffic(t *testing.T) {
3031
require.NoError(t, err)
3132
require.Equal(t, "time: missing unit in duration \"10\"", string(all))
3233
})
34+
// parse start time error
35+
doHTTP(t, http.MethodPost, "/api/traffic/capture", httpOpts{
36+
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "1h", "start-time": "2023-01-01"}),
37+
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
38+
}, func(t *testing.T, r *http.Response) {
39+
require.Equal(t, http.StatusBadRequest, r.StatusCode)
40+
all, err := io.ReadAll(r.Body)
41+
require.NoError(t, err)
42+
require.Contains(t, string(all), "cannot parse \"\" as \"T\"")
43+
})
44+
// capture succeeds
3345
doHTTP(t, http.MethodPost, "/api/traffic/capture", httpOpts{
3446
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "1h"}),
3547
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
@@ -39,26 +51,33 @@ func TestTraffic(t *testing.T) {
3951
require.NoError(t, err)
4052
require.Equal(t, "capture started", string(all))
4153
require.Equal(t, "capture", mgr.curJob)
42-
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", Compress: true}, mgr.captureCfg)
54+
startTime := mgr.captureCfg.StartTime
55+
require.False(t, startTime.IsZero())
56+
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", Compress: true, StartTime: startTime}, mgr.captureCfg)
4357
})
58+
// cancel succeeds
4459
doHTTP(t, http.MethodPost, "/api/traffic/cancel", httpOpts{}, func(t *testing.T, r *http.Response) {
4560
require.Equal(t, http.StatusOK, r.StatusCode)
4661
all, err := io.ReadAll(r.Body)
4762
require.NoError(t, err)
4863
require.Equal(t, "stopped", string(all))
4964
require.Equal(t, "", mgr.curJob)
5065
})
66+
// capture succeeds with more options
5167
doHTTP(t, http.MethodPost, "/api/traffic/capture", httpOpts{
52-
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "1h", "encrypt-method": "aes256-ctr", "compress": "false"}),
68+
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "1h", "encrypt-method": "aes256-ctr",
69+
"compress": "false", "start-time": time.Now().Format(time.RFC3339)}),
5370
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
5471
}, func(t *testing.T, r *http.Response) {
5572
require.Equal(t, http.StatusOK, r.StatusCode)
5673
all, err := io.ReadAll(r.Body)
5774
require.NoError(t, err)
5875
require.Equal(t, "capture started", string(all))
5976
require.Equal(t, "capture", mgr.curJob)
60-
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", EncryptMethod: "aes256-ctr", Compress: false}, mgr.captureCfg)
77+
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", EncryptMethod: "aes256-ctr", Compress: false,
78+
StartTime: mgr.captureCfg.StartTime}, mgr.captureCfg)
6179
})
80+
// job is running error
6281
doHTTP(t, http.MethodPost, "/api/traffic/replay", httpOpts{
6382
reader: cli.GetFormReader(map[string]string{"input": "/tmp"}),
6483
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
@@ -68,9 +87,8 @@ func TestTraffic(t *testing.T) {
6887
require.NoError(t, err)
6988
require.Equal(t, "job is running", string(all))
7089
})
71-
doHTTP(t, http.MethodPost, "/api/traffic/cancel", httpOpts{}, func(t *testing.T, r *http.Response) {
72-
require.Equal(t, http.StatusOK, r.StatusCode)
73-
})
90+
cancelJob(t, doHTTP)
91+
// parse speed error
7492
doHTTP(t, http.MethodPost, "/api/traffic/replay", httpOpts{
7593
reader: cli.GetFormReader(map[string]string{"input": "/tmp", "speed": "abc"}),
7694
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
@@ -80,6 +98,7 @@ func TestTraffic(t *testing.T) {
8098
require.NoError(t, err)
8199
require.Equal(t, "strconv.ParseFloat: parsing \"abc\": invalid syntax", string(all))
82100
})
101+
// replay succeeds
83102
doHTTP(t, http.MethodPost, "/api/traffic/replay", httpOpts{
84103
reader: cli.GetFormReader(map[string]string{"input": "/tmp", "speed": "2.0", "username": "u1", "password": "p1"}),
85104
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
@@ -89,14 +108,21 @@ func TestTraffic(t *testing.T) {
89108
require.NoError(t, err)
90109
require.Equal(t, "replay started", string(all))
91110
require.Equal(t, "replay", mgr.curJob)
92-
require.Equal(t, replay.ReplayConfig{Input: "/tmp", Username: "u1", Password: "p1", Speed: 2.0}, mgr.replayCfg)
111+
startTime := mgr.replayCfg.StartTime
112+
require.False(t, startTime.IsZero())
113+
require.Equal(t, replay.ReplayConfig{Input: "/tmp", Username: "u1", Password: "p1", Speed: 2.0, StartTime: startTime}, mgr.replayCfg)
93114
})
115+
// show succeeds
94116
doHTTP(t, http.MethodGet, "/api/traffic/show", httpOpts{}, func(t *testing.T, r *http.Response) {
95117
require.Equal(t, http.StatusOK, r.StatusCode)
96118
all, err := io.ReadAll(r.Body)
97119
require.NoError(t, err)
98120
require.Equal(t, "replay", string(all))
99121
})
122+
cancelJob(t, doHTTP)
123+
}
124+
125+
func cancelJob(t *testing.T, doHTTP doHTTPFunc) {
100126
doHTTP(t, http.MethodPost, "/api/traffic/cancel", httpOpts{}, func(t *testing.T, r *http.Response) {
101127
require.Equal(t, http.StatusOK, r.StatusCode)
102128
})

pkg/sqlreplay/capture/capture.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ type Capture interface {
5151
}
5252

5353
type CaptureConfig struct {
54-
Output string
55-
EncryptMethod string
56-
KeyFile string
54+
Output string
55+
EncryptMethod string
56+
KeyFile string
57+
// It's specified when executing with the statement `TRAFFIC CAPTURE` so that all TiProxy instances
58+
// use the same start time and the time acts as the job ID.
59+
StartTime time.Time
5760
Duration time.Duration
5861
Compress bool
5962
cmdLogger store.Writer
@@ -82,6 +85,15 @@ func (cfg *CaptureConfig) Validate() error {
8285
if cfg.Duration == 0 {
8386
return errors.New("duration is required")
8487
}
88+
// Maybe there's a time bias between TiDB and TiProxy, so add one minute.
89+
now := time.Now()
90+
if cfg.StartTime.IsZero() {
91+
return errors.New("start time is not specified")
92+
} else if now.Add(time.Minute).Before(cfg.StartTime) {
93+
return errors.New("start time should not be in the future")
94+
} else if cfg.StartTime.Add(cfg.Duration).Before(now) {
95+
return errors.New("start time should not be in the past")
96+
}
8597
if cfg.bufferCap == 0 {
8698
cfg.bufferCap = bufferCap
8799
}
@@ -133,7 +145,7 @@ func (c *capture) Start(cfg CaptureConfig) error {
133145
return errors.Errorf("traffic capture is running, start time: %s", c.startTime.String())
134146
}
135147
c.cfg = cfg
136-
c.startTime = time.Now()
148+
c.startTime = cfg.StartTime
137149
c.endTime = time.Time{}
138150
c.progress = 0
139151
c.capturedCmds = 0

pkg/sqlreplay/capture/capture_test.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func TestStartAndStop(t *testing.T) {
3232
Output: dir,
3333
Duration: 10 * time.Second,
3434
cmdLogger: writer,
35+
StartTime: time.Now(),
3536
}
3637

3738
// start capture and the traffic should be outputted
@@ -72,6 +73,7 @@ func TestConcurrency(t *testing.T) {
7273
cfg := CaptureConfig{
7374
Output: dir,
7475
Duration: 10 * time.Second,
76+
StartTime: time.Now(),
7577
bufferCap: 12 * 1024,
7678
flushThreshold: 8 * 1024,
7779
cmdLogger: writer,
@@ -125,19 +127,37 @@ func TestConcurrency(t *testing.T) {
125127
}
126128

127129
func TestCaptureCfgError(t *testing.T) {
130+
now := time.Now()
128131
dir := t.TempDir()
129132
path := filepath.Join(dir, "traffic.log")
130133
require.NoError(t, os.WriteFile(path, []byte{}, 0666))
131134
cfgs := []CaptureConfig{
132135
{
133-
Duration: 10 * time.Second,
136+
Duration: 10 * time.Second,
137+
StartTime: now,
134138
},
135139
{
136-
Output: dir,
140+
Output: dir,
141+
StartTime: now,
137142
},
138143
{
139144
Duration: 10 * time.Second,
140-
Output: path,
145+
Output: dir,
146+
},
147+
{
148+
Duration: 10 * time.Second,
149+
Output: path,
150+
StartTime: now.Add(time.Hour),
151+
},
152+
{
153+
Duration: 10 * time.Second,
154+
Output: path,
155+
StartTime: now.Add(-time.Hour),
156+
},
157+
{
158+
Duration: 10 * time.Second,
159+
Output: path,
160+
StartTime: now,
141161
},
142162
}
143163

@@ -147,8 +167,9 @@ func TestCaptureCfgError(t *testing.T) {
147167
}
148168

149169
cfg := CaptureConfig{
150-
Output: dir,
151-
Duration: 10 * time.Second,
170+
Output: dir,
171+
Duration: 10 * time.Second,
172+
StartTime: now,
152173
}
153174
require.NoError(t, cfg.Validate())
154175
require.Equal(t, bufferCap, cfg.bufferCap)
@@ -166,6 +187,7 @@ func TestProgress(t *testing.T) {
166187
Output: t.TempDir(),
167188
Duration: 10 * time.Second,
168189
cmdLogger: writer,
190+
StartTime: time.Now(),
169191
}
170192
setStartTime := func(t time.Time) {
171193
cpt.Lock()
@@ -212,6 +234,7 @@ func TestInitConn(t *testing.T) {
212234
Output: t.TempDir(),
213235
Duration: 10 * time.Second,
214236
cmdLogger: writer,
237+
StartTime: time.Now(),
215238
}
216239

217240
require.NoError(t, cpt.Start(cfg))
@@ -246,6 +269,7 @@ func TestQuit(t *testing.T) {
246269
Output: t.TempDir(),
247270
Duration: 10 * time.Second,
248271
cmdLogger: writer,
272+
StartTime: time.Now(),
249273
}
250274

251275
require.NoError(t, cpt.Start(cfg))
@@ -299,8 +323,9 @@ func TestFilterCmds(t *testing.T) {
299323

300324
dir := t.TempDir()
301325
cfg := CaptureConfig{
302-
Output: dir,
303-
Duration: 10 * time.Second,
326+
Output: dir,
327+
Duration: 10 * time.Second,
328+
StartTime: time.Now(),
304329
}
305330
for i, test := range tests {
306331
cpt := NewCapture(zap.NewNop())

pkg/sqlreplay/manager/job.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ func (job *job) SetProgress(progress float64, endTime time.Time, done bool, err
6565

6666
func (job *job) getJob4Marshal() *job4Marshal {
6767
jm := &job4Marshal{
68-
StartTime: job.startTime.String(),
68+
StartTime: job.startTime.Format(time.RFC3339),
6969
Progress: fmt.Sprintf("%d%%", int(job.progress*100)),
7070
}
7171
if !job.endTime.IsZero() {
72-
jm.EndTime = job.endTime.String()
72+
jm.EndTime = job.endTime.Format(time.RFC3339)
7373
}
7474
if job.err != nil {
7575
jm.Status = "canceled"

pkg/sqlreplay/manager/job_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func TestMarshalJob(t *testing.T) {
127127
Duration: 2 * time.Hour,
128128
},
129129
},
130-
marshal: `{"type":"capture","status":"canceled","start_time":"2020-01-01 00:00:00 +0000 UTC","end_time":"2020-01-01 02:01:01 +0000 UTC","duration":"2h0m0s","output":"/tmp/traffic","progress":"50%","error":"mock error"}`,
130+
marshal: `{"type":"capture","status":"canceled","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","duration":"2h0m0s","output":"/tmp/traffic","progress":"50%","error":"mock error"}`,
131131
},
132132
{
133133
job: &replayJob{
@@ -140,7 +140,7 @@ func TestMarshalJob(t *testing.T) {
140140
Username: "root",
141141
},
142142
},
143-
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01 00:00:00 +0000 UTC","input":"/tmp/traffic","username":"root","speed":1,"progress":"0%"}`,
143+
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01T00:00:00Z","input":"/tmp/traffic","username":"root","speed":1,"progress":"0%"}`,
144144
},
145145
{
146146
job: &replayJob{
@@ -156,7 +156,7 @@ func TestMarshalJob(t *testing.T) {
156156
Speed: 0.5,
157157
},
158158
},
159-
marshal: `{"type":"replay","status":"done","start_time":"2020-01-01 00:00:00 +0000 UTC","end_time":"2020-01-01 02:01:01 +0000 UTC","input":"/tmp/traffic","username":"root","speed":0.5,"progress":"100%"}`,
159+
marshal: `{"type":"replay","status":"done","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","input":"/tmp/traffic","username":"root","speed":0.5,"progress":"100%"}`,
160160
},
161161
}
162162

pkg/sqlreplay/manager/manager.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package manager
66
import (
77
"crypto/tls"
88
"encoding/json"
9-
"time"
109

1110
"github.com/pingcap/tiproxy/lib/config"
1211
"github.com/pingcap/tiproxy/lib/util/errors"
@@ -98,7 +97,8 @@ func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
9897
}
9998
newJob := &captureJob{
10099
job: job{
101-
startTime: time.Now(),
100+
// cfg.StartTime may act as the job ID in a TiProxy cluster.
101+
startTime: cfg.StartTime,
102102
},
103103
cfg: cfg,
104104
}
@@ -126,7 +126,8 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
126126
}
127127
newJob := &replayJob{
128128
job: job{
129-
startTime: time.Now(),
129+
// cfg.StartTime may act as the job ID in a TiProxy cluster.
130+
startTime: cfg.StartTime,
130131
},
131132
cfg: cfg,
132133
}

pkg/sqlreplay/manager/manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func TestMarshalJobHistory(t *testing.T) {
106106
{
107107
"type": "capture",
108108
"status": "canceled",
109-
"start_time": "2020-01-01 00:00:00 +0000 UTC",
110-
"end_time": "2020-01-01 02:01:01 +0000 UTC",
109+
"start_time": "2020-01-01T00:00:00Z",
110+
"end_time": "2020-01-01T02:01:01Z",
111111
"duration": "2h0m0s",
112112
"output": "/tmp/traffic",
113113
"progress": "50%",
@@ -116,7 +116,7 @@ func TestMarshalJobHistory(t *testing.T) {
116116
{
117117
"type": "replay",
118118
"status": "running",
119-
"start_time": "2020-01-01 00:00:00 +0000 UTC",
119+
"start_time": "2020-01-01T00:00:00Z",
120120
"input": "/tmp/traffic",
121121
"username": "root",
122122
"speed": 1,
@@ -125,8 +125,8 @@ func TestMarshalJobHistory(t *testing.T) {
125125
{
126126
"type": "replay",
127127
"status": "done",
128-
"start_time": "2020-01-01 00:00:00 +0000 UTC",
129-
"end_time": "2020-01-01 02:01:01 +0000 UTC",
128+
"start_time": "2020-01-01T00:00:00Z",
129+
"end_time": "2020-01-01T02:01:01Z",
130130
"input": "/tmp/traffic",
131131
"username": "root",
132132
"speed": 0.5,

0 commit comments

Comments
 (0)