Skip to content

Commit 57a854f

Browse files
authored
manager, replay: show the history of traffic jobs (#686)
1 parent 41f35d7 commit 57a854f

File tree

10 files changed

+262
-60
lines changed

10 files changed

+262
-60
lines changed

pkg/proxy/backend/mock_proxy_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ func (mc *mockCapture) Capture(packet []byte, startTime time.Time, connID uint64
145145
}
146146
}
147147

148-
func (mc *mockCapture) Progress() (float64, error) {
149-
return 0, nil
148+
func (mc *mockCapture) Progress() (float64, time.Time, error) {
149+
return 0, time.Time{}, nil
150150
}
151151

152152
func (mc *mockCapture) Close() {

pkg/sqlreplay/capture/capture.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Capture interface {
4444
// Capture captures traffic
4545
Capture(packet []byte, startTime time.Time, connID uint64, initSession func() (string, error))
4646
// Progress returns the progress of the capture job
47-
Progress() (float64, error)
47+
Progress() (float64, time.Time, error)
4848
// Close closes the capture
4949
Close()
5050
}
@@ -340,17 +340,17 @@ func (c *capture) writeMeta(duration time.Duration, cmds uint64) {
340340
}
341341
}
342342

343-
func (c *capture) Progress() (float64, error) {
343+
func (c *capture) Progress() (float64, time.Time, error) {
344344
c.Lock()
345345
defer c.Unlock()
346346
if c.status == statusIdle || c.cfg.Duration == 0 {
347-
return c.progress, c.err
347+
return c.progress, c.endTime, c.err
348348
}
349349
progress := float64(time.Since(c.startTime)) / float64(c.cfg.Duration)
350350
if progress > 1 {
351351
progress = 1
352352
}
353-
return progress, c.err
353+
return progress, c.endTime, c.err
354354
}
355355

356356
// stopNoLock must be called after holding a lock.

pkg/sqlreplay/capture/capture_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,20 @@ func TestProgress(t *testing.T) {
170170

171171
now := time.Now()
172172
require.NoError(t, cpt.Start(cfg))
173-
progress, err := cpt.Progress()
173+
progress, _, err := cpt.Progress()
174174
require.NoError(t, err)
175175
require.Less(t, progress, 0.3)
176176

177177
setStartTime(now.Add(-5 * time.Second))
178-
progress, err = cpt.Progress()
178+
progress, _, err = cpt.Progress()
179179
require.NoError(t, err)
180180
require.GreaterOrEqual(t, progress, 0.5)
181181

182182
packet := append([]byte{pnet.ComQuery.Byte()}, []byte("select 1")...)
183183
cpt.Capture(packet, time.Now(), 100, mockInitSession)
184184
cpt.Stop(errors.Errorf("mock error"))
185185
cpt.wg.Wait()
186-
progress, err = cpt.Progress()
186+
progress, _, err = cpt.Progress()
187187
require.ErrorContains(t, err, "mock error")
188188
require.GreaterOrEqual(t, progress, 0.5)
189189
require.Less(t, progress, 1.0)

pkg/sqlreplay/manager/job.go

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ package manager
55

66
import (
77
"encoding/json"
8+
"fmt"
89
"time"
910

11+
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
12+
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
1013
"github.com/siddontang/go/hack"
1114
)
1215

@@ -20,55 +23,117 @@ const (
2023
type Job interface {
2124
Type() jobType
2225
String() string
23-
SetProgress(progress float64, err error)
26+
MarshalJSON() ([]byte, error)
27+
SetProgress(progress float64, endTime time.Time, err error)
2428
IsRunning() bool
2529
}
2630

2731
type job struct {
28-
Status string
29-
StartTime time.Time
30-
Duration time.Duration
31-
Progress float64
32-
// TODO: error can not be marshaled.
33-
Error error
32+
startTime time.Time
33+
endTime time.Time
34+
progress float64
35+
err error
36+
}
37+
38+
type job4Marshal struct {
39+
Type string `json:"type"`
40+
Status string `json:"status"`
41+
StartTime string `json:"start_time"`
42+
EndTime string `json:"end_time,omitempty"`
43+
Duration string `json:"duration,omitempty"`
44+
Output string `json:"output,omitempty"`
45+
Input string `json:"input,omitempty"`
46+
Username string `json:"username,omitempty"`
47+
Speed float64 `json:"speed,omitempty"`
48+
Progress string `json:"progress"`
49+
Err string `json:"error,omitempty"`
3450
}
3551

3652
func (job *job) IsRunning() bool {
37-
return job.Error == nil && job.Progress < 1
53+
return job.err == nil && job.progress < 1
3854
}
3955

40-
// TODO: refine the output
41-
func (job *job) String() string {
42-
b, err := json.Marshal(job)
43-
if err != nil {
44-
return err.Error()
56+
func (job *job) SetProgress(progress float64, endTime time.Time, err error) {
57+
if progress > job.progress {
58+
job.progress = progress
4559
}
46-
return hack.String(b)
60+
job.endTime = endTime
61+
job.err = err
4762
}
4863

49-
func (job *job) SetProgress(progress float64, err error) {
50-
if progress > job.Progress {
51-
job.Progress = progress
64+
func (job *job) getJob4Marshal() *job4Marshal {
65+
jm := &job4Marshal{
66+
StartTime: job.startTime.String(),
67+
Progress: fmt.Sprintf("%d%%", int(job.progress*100)),
68+
}
69+
if !job.endTime.IsZero() {
70+
jm.EndTime = job.endTime.String()
5271
}
53-
job.Error = err
72+
if job.err != nil {
73+
jm.Status = "canceled"
74+
jm.Err = job.err.Error()
75+
} else if job.progress >= 1.0 {
76+
jm.Status = "done"
77+
} else {
78+
jm.Status = "running"
79+
}
80+
return jm
5481
}
5582

5683
var _ Job = (*captureJob)(nil)
5784

5885
type captureJob struct {
5986
job
87+
cfg capture.CaptureConfig
6088
}
6189

6290
func (job *captureJob) Type() jobType {
6391
return Capture
6492
}
6593

94+
func (job *captureJob) MarshalJSON() ([]byte, error) {
95+
job4Marshal := job.getJob4Marshal()
96+
job4Marshal.Type = "capture"
97+
job4Marshal.Output = job.cfg.Output
98+
job4Marshal.Duration = job.cfg.Duration.String()
99+
return json.Marshal(job4Marshal)
100+
}
101+
102+
func (job *captureJob) String() string {
103+
b, err := json.Marshal(job)
104+
if err != nil {
105+
return ""
106+
}
107+
return hack.String(b)
108+
}
109+
66110
var _ Job = (*replayJob)(nil)
67111

68112
type replayJob struct {
69113
job
114+
cfg replay.ReplayConfig
70115
}
71116

72117
func (job *replayJob) Type() jobType {
73118
return Replay
74119
}
120+
121+
func (job *replayJob) MarshalJSON() ([]byte, error) {
122+
job4Marshal := job.getJob4Marshal()
123+
job4Marshal.Type = "replay"
124+
job4Marshal.Input = job.cfg.Input
125+
job4Marshal.Username = job.cfg.Username
126+
job4Marshal.Speed = job.cfg.Speed
127+
if job4Marshal.Speed == 0 {
128+
job4Marshal.Speed = 1
129+
}
130+
return json.Marshal(job4Marshal)
131+
}
132+
133+
func (job *replayJob) String() string {
134+
b, err := json.Marshal(job)
135+
if err != nil {
136+
return ""
137+
}
138+
return hack.String(b)
139+
}

pkg/sqlreplay/manager/job_test.go

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"time"
99

1010
"github.com/pingcap/tiproxy/lib/util/errors"
11+
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
12+
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
1113
"github.com/stretchr/testify/require"
1214
)
1315

@@ -20,8 +22,7 @@ func TestIsRunning(t *testing.T) {
2022
{
2123
job: &captureJob{
2224
job: job{
23-
StartTime: time.Now(),
24-
Duration: 10 * time.Second,
25+
startTime: time.Now(),
2526
},
2627
},
2728
tp: Capture,
@@ -30,8 +31,7 @@ func TestIsRunning(t *testing.T) {
3031
{
3132
job: &replayJob{
3233
job: job{
33-
StartTime: time.Now(),
34-
Duration: 10 * time.Second,
34+
startTime: time.Now(),
3535
},
3636
},
3737
tp: Replay,
@@ -40,10 +40,9 @@ func TestIsRunning(t *testing.T) {
4040
{
4141
job: &captureJob{
4242
job: job{
43-
StartTime: time.Now().Add(-5 * time.Second),
44-
Duration: 10 * time.Second,
45-
Progress: 0.5,
46-
Error: errors.New("stopped manually"),
43+
startTime: time.Now().Add(-5 * time.Second),
44+
progress: 0.5,
45+
err: errors.New("stopped manually"),
4746
},
4847
},
4948
tp: Capture,
@@ -52,9 +51,8 @@ func TestIsRunning(t *testing.T) {
5251
{
5352
job: &replayJob{
5453
job: job{
55-
StartTime: time.Now().Add(-20 * time.Second),
56-
Duration: 10 * time.Second,
57-
Progress: 1.0,
54+
startTime: time.Now().Add(-20 * time.Second),
55+
progress: 1.0,
5856
},
5957
},
6058
tp: Replay,
@@ -93,12 +91,73 @@ func TestSetProgress(t *testing.T) {
9391
for i, test := range tests {
9492
job := &captureJob{
9593
job: job{
96-
StartTime: time.Now(),
97-
Duration: 10 * time.Second,
94+
startTime: time.Now(),
9895
},
9996
}
100-
job.SetProgress(test.progress, test.err)
101-
require.Equal(t, test.expectedProgress, job.Progress, "case %d", i)
97+
now := time.Now()
98+
job.SetProgress(test.progress, now, test.err)
99+
require.Equal(t, now, job.endTime, "case %d", i)
100+
require.Equal(t, test.expectedProgress, job.progress, "case %d", i)
102101
require.Equal(t, test.running, job.IsRunning(), "case %d", i)
103102
}
104103
}
104+
105+
func TestMarshalJob(t *testing.T) {
106+
startTime, err := time.Parse("2006-01-02 15:04:05", "2020-01-01 00:00:00")
107+
require.NoError(t, err)
108+
endTime, err := time.Parse("2006-01-02 15:04:05", "2020-01-01 02:01:01")
109+
require.NoError(t, err)
110+
111+
tests := []struct {
112+
job Job
113+
marshal string
114+
}{
115+
{
116+
job: &captureJob{
117+
job: job{
118+
startTime: startTime,
119+
endTime: endTime,
120+
progress: 0.5,
121+
err: errors.New("mock error"),
122+
},
123+
cfg: capture.CaptureConfig{
124+
Output: "/tmp/traffic",
125+
Duration: 2 * time.Hour,
126+
},
127+
},
128+
marshal: `{"type":"capture","status":"canceled","start_time":"2020-01-01 00:00:00 +0000 UTC","end_time":"2020-01-01 02:01:01 +0000 UTC","duration":"2h0m0s","output":"/tmp/traffic","progress":"50%","error":"mock error"}`,
129+
},
130+
{
131+
job: &replayJob{
132+
job: job{
133+
startTime: startTime,
134+
progress: 0,
135+
},
136+
cfg: replay.ReplayConfig{
137+
Input: "/tmp/traffic",
138+
Username: "root",
139+
},
140+
},
141+
marshal: `{"type":"replay","status":"running","start_time":"2020-01-01 00:00:00 +0000 UTC","input":"/tmp/traffic","username":"root","speed":1,"progress":"0%"}`,
142+
},
143+
{
144+
job: &replayJob{
145+
job: job{
146+
startTime: startTime,
147+
endTime: endTime,
148+
progress: 1,
149+
},
150+
cfg: replay.ReplayConfig{
151+
Input: "/tmp/traffic",
152+
Username: "root",
153+
Speed: 0.5,
154+
},
155+
},
156+
marshal: `{"type":"replay","status":"done","start_time":"2020-01-01 00:00:00 +0000 UTC","end_time":"2020-01-01 02:01:01 +0000 UTC","input":"/tmp/traffic","username":"root","speed":0.5,"progress":"100%"}`,
157+
},
158+
}
159+
160+
for i, test := range tests {
161+
require.Equal(t, test.marshal, test.job.String(), "case %d", i)
162+
}
163+
}

pkg/sqlreplay/manager/manager.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ func (jm *jobManager) updateProgress() {
6161
if job.IsRunning() {
6262
switch job.Type() {
6363
case Capture:
64-
progress, err := jm.capture.Progress()
65-
job.SetProgress(progress, err)
64+
progress, endTime, err := jm.capture.Progress()
65+
job.SetProgress(progress, endTime, err)
6666
case Replay:
67-
progress, err := jm.replay.Progress()
68-
job.SetProgress(progress, err)
67+
progress, endTime, err := jm.replay.Progress()
68+
job.SetProgress(progress, endTime, err)
6969
}
7070
}
7171
}
@@ -93,9 +93,9 @@ func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
9393
}
9494
newJob := &captureJob{
9595
job: job{
96-
StartTime: time.Now(),
97-
Duration: cfg.Duration,
96+
startTime: time.Now(),
9897
},
98+
cfg: cfg,
9999
}
100100
jm.lg.Info("start capture", zap.String("job", newJob.String()))
101101
jm.jobHistory = append(jm.jobHistory, newJob)
@@ -121,8 +121,9 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
121121
}
122122
newJob := &replayJob{
123123
job: job{
124-
StartTime: time.Now(),
124+
startTime: time.Now(),
125125
},
126+
cfg: cfg,
126127
}
127128
jm.lg.Info("start replay", zap.String("job", newJob.String()))
128129
jm.jobHistory = append(jm.jobHistory, newJob)
@@ -135,7 +136,7 @@ func (jm *jobManager) GetCapture() capture.Capture {
135136

136137
func (jm *jobManager) Jobs() string {
137138
jm.updateProgress()
138-
b, err := json.Marshal(jm.jobHistory)
139+
b, err := json.MarshalIndent(jm.jobHistory, "", " ")
139140
if err != nil {
140141
return err.Error()
141142
}

0 commit comments

Comments
 (0)