Skip to content

Commit af46321

Browse files
authored
sqlreplay, api: add job parameters to the output of show traffic jobs (#749)
1 parent d3195db commit af46321

File tree

7 files changed

+74
-54
lines changed

7 files changed

+74
-54
lines changed

pkg/server/api/traffic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (h *Server) TrafficCapture(c *gin.Context) {
4040
}
4141
cfg.Duration = duration
4242
}
43-
cfg.EncryptMethod = c.PostForm("encrypt-method")
43+
cfg.EncryptionMethod = c.PostForm("encrypt-method")
4444

4545
compress := true
4646
if compressStr := c.PostForm("compress"); compressStr != "" {

pkg/server/api/traffic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestTraffic(t *testing.T) {
7474
require.NoError(t, err)
7575
require.Equal(t, "capture started", string(all))
7676
require.Equal(t, "capture", mgr.curJob)
77-
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", EncryptMethod: "aes256-ctr", Compress: false,
77+
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", EncryptionMethod: "aes256-ctr", Compress: false,
7878
StartTime: mgr.captureCfg.StartTime}, mgr.captureCfg)
7979
})
8080
// job is running error

pkg/sqlreplay/capture/capture.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ type Capture interface {
5353
}
5454

5555
type CaptureConfig struct {
56-
Output string
57-
EncryptMethod string
58-
KeyFile string
56+
Output string
57+
EncryptionMethod string
58+
KeyFile string
5959
// It's specified when executing with the statement `TRAFFIC CAPTURE` so that all TiProxy instances
6060
// use the same start time and the time acts as the job ID.
6161
StartTime time.Time
@@ -245,7 +245,7 @@ func (c *capture) flushBuffer(bufCh <-chan *bytes.Buffer) {
245245
var err error
246246
cmdLogger, err = store.NewWriter(c.lg.Named("writer"), c.storage, store.WriterCfg{
247247
Dir: c.cfg.Output,
248-
EncryptMethod: c.cfg.EncryptMethod,
248+
EncryptMethod: c.cfg.EncryptionMethod,
249249
KeyFile: c.cfg.KeyFile,
250250
Compress: c.cfg.Compress,
251251
})
@@ -370,7 +370,7 @@ func (c *capture) putCommand(command *cmd.Command) bool {
370370
}
371371

372372
func (c *capture) writeMeta(storage storage.ExternalStorage, duration time.Duration, cmds, filteredCmds uint64) {
373-
meta := store.NewMeta(duration, cmds, filteredCmds, c.cfg.EncryptMethod)
373+
meta := store.NewMeta(duration, cmds, filteredCmds, c.cfg.EncryptionMethod)
374374
if err := meta.Write(storage); err != nil {
375375
c.lg.Error("failed to write meta", zap.Error(err))
376376
}

pkg/sqlreplay/manager/job.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,12 @@ type job struct {
3838
}
3939

4040
type job4Marshal struct {
41-
Type string `json:"type"`
42-
Status string `json:"status"`
43-
StartTime string `json:"start_time"`
44-
EndTime string `json:"end_time,omitempty"`
45-
Duration string `json:"duration,omitempty"`
46-
Output string `json:"output,omitempty"`
47-
Input string `json:"input,omitempty"`
48-
Username string `json:"username,omitempty"`
49-
Speed float64 `json:"speed,omitempty"`
50-
Progress string `json:"progress"`
51-
Err string `json:"error,omitempty"`
41+
Type string `json:"type"`
42+
Status string `json:"status"`
43+
StartTime string `json:"start_time"`
44+
EndTime string `json:"end_time,omitempty"`
45+
Progress string `json:"progress"`
46+
Err string `json:"error,omitempty"`
5247
}
5348

5449
func (job *job) IsRunning() bool {
@@ -90,16 +85,29 @@ type captureJob struct {
9085
cfg capture.CaptureConfig
9186
}
9287

88+
type captureJob4Marshal struct {
89+
job4Marshal
90+
Output string `json:"output,omitempty"`
91+
Duration string `json:"duration,omitempty"`
92+
Compress bool `json:"compress,omitempty"`
93+
EncryptionMethod string `json:"encryption-method,omitempty"`
94+
}
95+
9396
func (job *captureJob) Type() JobType {
9497
return Capture
9598
}
9699

97100
func (job *captureJob) MarshalJSON() ([]byte, error) {
98101
job4Marshal := job.getJob4Marshal()
99102
job4Marshal.Type = "capture"
100-
job4Marshal.Output = ast.RedactURL(job.cfg.Output)
101-
job4Marshal.Duration = job.cfg.Duration.String()
102-
return json.Marshal(job4Marshal)
103+
c := captureJob4Marshal{
104+
job4Marshal: *job4Marshal,
105+
Output: ast.RedactURL(job.cfg.Output),
106+
Duration: job.cfg.Duration.String(),
107+
Compress: job.cfg.Compress,
108+
EncryptionMethod: job.cfg.EncryptionMethod,
109+
}
110+
return json.Marshal(c)
103111
}
104112

105113
func (job *captureJob) String() string {
@@ -117,20 +125,29 @@ type replayJob struct {
117125
cfg replay.ReplayConfig
118126
}
119127

128+
type replayJob4Marshal struct {
129+
job4Marshal
130+
Input string `json:"input,omitempty"`
131+
Username string `json:"username,omitempty"`
132+
Speed float64 `json:"speed,omitempty"`
133+
ReadOnly bool `json:"readonly,omitempty"`
134+
}
135+
120136
func (job *replayJob) Type() JobType {
121137
return Replay
122138
}
123139

124140
func (job *replayJob) MarshalJSON() ([]byte, error) {
125141
job4Marshal := job.getJob4Marshal()
126142
job4Marshal.Type = "replay"
127-
job4Marshal.Input = ast.RedactURL(job.cfg.Input)
128-
job4Marshal.Username = job.cfg.Username
129-
job4Marshal.Speed = job.cfg.Speed
130-
if job4Marshal.Speed == 0 {
131-
job4Marshal.Speed = 1
143+
r := replayJob4Marshal{
144+
job4Marshal: *job4Marshal,
145+
Input: ast.RedactURL(job.cfg.Input),
146+
Username: job.cfg.Username,
147+
Speed: job.cfg.Speed,
148+
ReadOnly: job.cfg.ReadOnly,
132149
}
133-
return json.Marshal(job4Marshal)
150+
return json.Marshal(r)
134151
}
135152

136153
func (job *replayJob) String() string {

pkg/sqlreplay/manager/job_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func TestMarshalJob(t *testing.T) {
127127
Duration: 2 * time.Hour,
128128
},
129129
},
130-
marshal: `{"type":"capture","status":"canceled","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","duration":"2h0m0s","output":"/tmp/traffic","progress":"50%","error":"mock error"}`,
130+
marshal: `{"type":"capture","status":"canceled","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","progress":"50%","error":"mock error","output":"/tmp/traffic","duration":"2h0m0s"}`,
131131
},
132132
{
133133
job: &captureJob{
@@ -138,11 +138,13 @@ func TestMarshalJob(t *testing.T) {
138138
done: true,
139139
},
140140
cfg: capture.CaptureConfig{
141-
Output: "s3://bucket/prefix?access-key=abcdefghi&secret-access-key=123&force-path-style=true",
142-
Duration: 2 * time.Hour,
141+
Output: "s3://bucket/prefix?access-key=abcdefghi&secret-access-key=123&force-path-style=true",
142+
Duration: 2 * time.Hour,
143+
Compress: true,
144+
EncryptionMethod: "aes256-ctr",
143145
},
144146
},
145-
marshal: `{"type":"capture","status":"done","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","duration":"2h0m0s","output":"s3://bucket/prefix?access-key=xxxxxx\u0026force-path-style=true\u0026secret-access-key=xxxxxx","progress":"50%"}`,
147+
marshal: `{"type":"capture","status":"done","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","progress":"50%","output":"s3://bucket/prefix?access-key=xxxxxx\u0026force-path-style=true\u0026secret-access-key=xxxxxx","duration":"2h0m0s","compress":true,"encryption-method":"aes256-ctr"}`,
146148
},
147149
{
148150
job: &replayJob{
@@ -155,7 +157,7 @@ func TestMarshalJob(t *testing.T) {
155157
Username: "root",
156158
},
157159
},
158-
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01T00:00:00Z","input":"/tmp/traffic","username":"root","speed":1,"progress":"0%"}`,
160+
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01T00:00:00Z","progress":"0%","input":"/tmp/traffic","username":"root"}`,
159161
},
160162
{
161163
job: &replayJob{
@@ -168,10 +170,12 @@ func TestMarshalJob(t *testing.T) {
168170
cfg: replay.ReplayConfig{
169171
Input: "/tmp/traffic",
170172
Username: "root",
173+
Password: "123456",
171174
Speed: 0.5,
175+
ReadOnly: true,
172176
},
173177
},
174-
marshal: `{"type":"replay","status":"done","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","input":"/tmp/traffic","username":"root","speed":0.5,"progress":"100%"}`,
178+
marshal: `{"type":"replay","status":"done","start_time":"2020-01-01T00:00:00Z","end_time":"2020-01-01T02:01:01Z","progress":"100%","input":"/tmp/traffic","username":"root","speed":0.5,"readonly":true}`,
175179
},
176180
{
177181
job: &replayJob{
@@ -184,7 +188,7 @@ func TestMarshalJob(t *testing.T) {
184188
Username: "root",
185189
},
186190
},
187-
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01T00:00:00Z","input":"s3://bucket/prefix?access-key=xxxxxx\u0026force-path-style=true\u0026secret-access-key=xxxxxx","username":"root","speed":1,"progress":"0%"}`,
191+
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01T00:00:00Z","progress":"0%","input":"s3://bucket/prefix?access-key=xxxxxx\u0026force-path-style=true\u0026secret-access-key=xxxxxx","username":"root"}`,
188192
},
189193
}
190194

pkg/sqlreplay/manager/manager.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,17 @@ func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
9595
if running != nil {
9696
return errors.Errorf("a job is running: %s", running.String())
9797
}
98-
if err := jm.capture.Start(cfg); err != nil {
99-
jm.lg.Warn("start capture failed", zap.Error(err))
100-
return errors.Wrapf(err, "start capture failed")
101-
}
10298
newJob := &captureJob{
10399
job: job{
104100
// cfg.StartTime may act as the job ID in a TiProxy cluster.
105101
startTime: cfg.StartTime,
106102
},
107103
cfg: cfg,
108104
}
105+
if err := jm.capture.Start(cfg); err != nil {
106+
jm.lg.Warn("start capture failed", zap.String("job", newJob.String()), zap.Error(err))
107+
return errors.Wrapf(err, "start capture failed")
108+
}
109109
jm.lg.Info("start capture", zap.String("job", newJob.String()))
110110
jm.addToHistory(newJob)
111111
return nil
@@ -116,6 +116,13 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
116116
if running != nil {
117117
return errors.Errorf("a job is running: %s", running.String())
118118
}
119+
newJob := &replayJob{
120+
job: job{
121+
// cfg.StartTime may act as the job ID in a TiProxy cluster.
122+
startTime: cfg.StartTime,
123+
},
124+
cfg: cfg,
125+
}
119126
// TODO: support update configs online
120127
err := jm.replay.Start(cfg, jm.certManager.SQLTLS(), jm.hsHandler, &backend.BCConfig{
121128
ProxyProtocol: jm.cfg.Proxy.ProxyProtocol != "",
@@ -125,16 +132,9 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
125132
ConnBufferSize: jm.cfg.Proxy.ConnBufferSize,
126133
})
127134
if err != nil {
128-
jm.lg.Warn("start replay failed", zap.Error(err))
135+
jm.lg.Warn("start replay failed", zap.String("job", newJob.String()), zap.Error(err))
129136
return errors.Wrapf(err, "start replay failed")
130137
}
131-
newJob := &replayJob{
132-
job: job{
133-
// cfg.StartTime may act as the job ID in a TiProxy cluster.
134-
startTime: cfg.StartTime,
135-
},
136-
cfg: cfg,
137-
}
138138
jm.lg.Info("start replay", zap.String("job", newJob.String()))
139139
jm.addToHistory(newJob)
140140
return nil

pkg/sqlreplay/manager/manager_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,29 +110,28 @@ func TestMarshalJobHistory(t *testing.T) {
110110
"status": "canceled",
111111
"start_time": "2020-01-01T00:00:00Z",
112112
"end_time": "2020-01-01T02:01:01Z",
113-
"duration": "2h0m0s",
114-
"output": "/tmp/traffic",
115113
"progress": "50%",
116-
"error": "mock error"
114+
"error": "mock error",
115+
"output": "/tmp/traffic",
116+
"duration": "2h0m0s"
117117
},
118118
{
119119
"type": "replay",
120120
"status": "running",
121121
"start_time": "2020-01-01T00:00:00Z",
122+
"progress": "0%",
122123
"input": "/tmp/traffic",
123-
"username": "root",
124-
"speed": 1,
125-
"progress": "0%"
124+
"username": "root"
126125
},
127126
{
128127
"type": "replay",
129128
"status": "done",
130129
"start_time": "2020-01-01T00:00:00Z",
131130
"end_time": "2020-01-01T02:01:01Z",
131+
"progress": "100%",
132132
"input": "/tmp/traffic",
133133
"username": "root",
134-
"speed": 0.5,
135-
"progress": "100%"
134+
"speed": 0.5
136135
}
137136
]`, mgr.Jobs())
138137
}

0 commit comments

Comments
 (0)