Skip to content

Commit 052bed0

Browse files
authored
sqlreplay, api: support compress option to traffic capture (#720)
1 parent 451e530 commit 052bed0

File tree

7 files changed

+87
-36
lines changed

7 files changed

+87
-36
lines changed

lib/cli/traffic.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,14 @@ func GetTrafficCaptureCmd(ctx *Context) *cobra.Command {
3030
output := captureCmd.PersistentFlags().String("output", "", "output directory for traffic files")
3131
duration := captureCmd.PersistentFlags().String("duration", "", "the duration of traffic capture")
3232
encrypt := captureCmd.PersistentFlags().String("encrypt-method", "", "the encryption method used for encrypting traffic files")
33+
compress := captureCmd.PersistentFlags().Bool("compress", true, "whether compress the traffic files")
3334
captureCmd.RunE = func(cmd *cobra.Command, args []string) error {
34-
reader := GetFormReader(map[string]string{"output": *output, "duration": *duration, "encrypt-method": *encrypt})
35+
reader := GetFormReader(map[string]string{
36+
"output": *output,
37+
"duration": *duration,
38+
"encrypt-method": *encrypt,
39+
"compress": strconv.FormatBool(*compress),
40+
})
3541
resp, err := doRequest(cmd.Context(), ctx, http.MethodPost, "/api/traffic/capture", reader)
3642
if err != nil {
3743
return err

pkg/server/api/traffic.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/gin-gonic/gin"
1313
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
1414
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
15+
"go.uber.org/zap"
1516
)
1617

1718
func (h *Server) registerTraffic(group *gin.RouterGroup) {
@@ -33,6 +34,16 @@ func (h *Server) TrafficCapture(c *gin.Context) {
3334
cfg.Duration = duration
3435
}
3536
cfg.EncryptMethod = c.PostForm("encrypt-method")
37+
38+
compress := true
39+
if compressStr := c.PostForm("compress"); compressStr != "" {
40+
var err error
41+
if compress, err = strconv.ParseBool(compressStr); err != nil {
42+
h.lg.Warn("parsing argument 'compress' error, using true", zap.String("compress", c.PostForm("compress")), zap.Error(err))
43+
compress = true
44+
}
45+
}
46+
cfg.Compress = compress
3647
cfg.KeyFile = h.mgr.CfgMgr.GetConfig().Security.Encryption.KeyPath
3748

3849
if err := h.mgr.ReplayJobMgr.StartCapture(cfg); err != nil {

pkg/server/api/traffic_test.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,25 @@ func TestTraffic(t *testing.T) {
3939
require.NoError(t, err)
4040
require.Equal(t, "capture started", string(all))
4141
require.Equal(t, "capture", mgr.curJob)
42-
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp"}, mgr.captureCfg)
42+
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", Compress: true}, mgr.captureCfg)
43+
})
44+
doHTTP(t, http.MethodPost, "/api/traffic/cancel", httpOpts{}, func(t *testing.T, r *http.Response) {
45+
require.Equal(t, http.StatusOK, r.StatusCode)
46+
all, err := io.ReadAll(r.Body)
47+
require.NoError(t, err)
48+
require.Equal(t, "stopped", string(all))
49+
require.Equal(t, "", mgr.curJob)
50+
})
51+
doHTTP(t, http.MethodPost, "/api/traffic/capture", httpOpts{
52+
reader: cli.GetFormReader(map[string]string{"output": "/tmp", "duration": "1h", "encrypt-method": "aes256-ctr", "compress": "false"}),
53+
header: map[string]string{"Content-Type": "application/x-www-form-urlencoded"},
54+
}, func(t *testing.T, r *http.Response) {
55+
require.Equal(t, http.StatusOK, r.StatusCode)
56+
all, err := io.ReadAll(r.Body)
57+
require.NoError(t, err)
58+
require.Equal(t, "capture started", string(all))
59+
require.Equal(t, "capture", mgr.curJob)
60+
require.Equal(t, capture.CaptureConfig{Duration: time.Hour, Output: "/tmp", EncryptMethod: "aes256-ctr", Compress: false}, mgr.captureCfg)
4361
})
4462
doHTTP(t, http.MethodPost, "/api/traffic/replay", httpOpts{
4563
reader: cli.GetFormReader(map[string]string{"input": "/tmp"}),
@@ -52,10 +70,6 @@ func TestTraffic(t *testing.T) {
5270
})
5371
doHTTP(t, http.MethodPost, "/api/traffic/cancel", httpOpts{}, func(t *testing.T, r *http.Response) {
5472
require.Equal(t, http.StatusOK, r.StatusCode)
55-
all, err := io.ReadAll(r.Body)
56-
require.NoError(t, err)
57-
require.Equal(t, "stopped", string(all))
58-
require.Equal(t, "", mgr.curJob)
5973
})
6074
doHTTP(t, http.MethodPost, "/api/traffic/replay", httpOpts{
6175
reader: cli.GetFormReader(map[string]string{"input": "/tmp", "speed": "abc"}),

pkg/sqlreplay/capture/capture.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type CaptureConfig struct {
5555
EncryptMethod string
5656
KeyFile string
5757
Duration time.Duration
58+
Compress bool
5859
cmdLogger store.Writer
5960
bufferCap int
6061
flushThreshold int
@@ -226,6 +227,7 @@ func (c *capture) flushBuffer(bufCh <-chan *bytes.Buffer) {
226227
Dir: c.cfg.Output,
227228
EncryptMethod: c.cfg.EncryptMethod,
228229
KeyFile: c.cfg.KeyFile,
230+
Compress: c.cfg.Compress,
229231
})
230232
if err != nil {
231233
c.lg.Error("failed to create capture writer", zap.Error(err))

pkg/sqlreplay/store/line.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type WriterCfg struct {
2222
EncryptMethod string
2323
KeyFile string
2424
FileSize int
25+
Compress bool
2526
}
2627

2728
func NewWriter(cfg WriterCfg) (Writer, error) {

pkg/sqlreplay/store/rotate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func newRotateWriter(cfg WriterCfg) *rotateWriter {
3838
Filename: filepath.Join(cfg.Dir, fileName),
3939
MaxSize: cfg.FileSize,
4040
LocalTime: true,
41-
Compress: true,
41+
Compress: cfg.Compress,
4242
},
4343
}
4444
}

pkg/sqlreplay/store/rotate_test.go

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func TestFileRotation(t *testing.T) {
2222
writer := newRotateWriter(WriterCfg{
2323
Dir: tmpDir,
2424
FileSize: 1,
25+
Compress: true,
2526
})
2627
defer writer.Close()
2728

@@ -186,39 +187,55 @@ func TestIterateFiles(t *testing.T) {
186187

187188
func TestReadGZip(t *testing.T) {
188189
tmpDir := t.TempDir()
189-
writer := newRotateWriter(WriterCfg{
190-
Dir: tmpDir,
191-
FileSize: 1,
192-
})
193-
defer writer.Close()
190+
for _, compress := range []bool{true, false} {
191+
require.NoError(t, os.RemoveAll(tmpDir))
192+
require.NoError(t, os.MkdirAll(tmpDir, 0777))
194193

195-
data := make([]byte, 100*1024)
196-
for i := 0; i < 11; i++ {
197-
require.NoError(t, writer.Write(data))
198-
}
199-
// files are rotated and compressed at backendground asynchronously
200-
require.Eventually(t, func() bool {
201-
files := listFiles(t, tmpDir)
202-
for _, f := range files {
203-
if strings.HasPrefix(f, "traffic") && strings.HasSuffix(f, ".gz") {
204-
return true
194+
writer := newRotateWriter(WriterCfg{
195+
Dir: tmpDir,
196+
FileSize: 1,
197+
Compress: compress,
198+
})
199+
data := make([]byte, 100*1024)
200+
for i := 0; i < 11; i++ {
201+
require.NoError(t, writer.Write(data))
202+
}
203+
// files are rotated and compressed at backendground asynchronously
204+
if compress {
205+
require.Eventually(t, func() bool {
206+
files := listFiles(t, tmpDir)
207+
for _, f := range files {
208+
if strings.HasPrefix(f, "traffic") && strings.HasSuffix(f, ".gz") {
209+
return true
210+
}
211+
}
212+
t.Logf("traffic files: %v", files)
213+
return false
214+
}, 5*time.Second, 10*time.Millisecond)
215+
} else {
216+
time.Sleep(100 * time.Millisecond)
217+
files := listFiles(t, tmpDir)
218+
for _, f := range files {
219+
if strings.HasPrefix(f, "traffic") {
220+
require.False(t, strings.HasSuffix(f, ".gz"))
221+
}
205222
}
206223
}
207-
t.Logf("traffic files: %v", files)
208-
return false
209-
}, 5*time.Second, 10*time.Millisecond)
224+
require.NoError(t, writer.Close())
210225

211-
lg, _ := logger.CreateLoggerForTest(t)
212-
l := newRotateReader(lg, tmpDir)
213-
for i := 0; i < 11; i++ {
214-
data = make([]byte, 100*1024)
215-
_, err := io.ReadFull(l, data)
216-
require.NoError(t, err)
217-
for j := 0; j < 100*1024; j++ {
218-
require.Equal(t, byte(0), data[j])
226+
lg, _ := logger.CreateLoggerForTest(t)
227+
l := newRotateReader(lg, tmpDir)
228+
for i := 0; i < 11; i++ {
229+
data = make([]byte, 100*1024)
230+
_, err := io.ReadFull(l, data)
231+
require.NoError(t, err)
232+
for j := 0; j < 100*1024; j++ {
233+
require.Equal(t, byte(0), data[j])
234+
}
219235
}
236+
data = make([]byte, 1)
237+
_, err := l.Read(data)
238+
require.True(t, errors.Is(err, io.EOF))
239+
l.Close()
220240
}
221-
data = make([]byte, 1)
222-
_, err := l.Read(data)
223-
require.True(t, errors.Is(err, io.EOF))
224241
}

0 commit comments

Comments
 (0)