Skip to content

Commit 31aa2e0

Browse files
authored
*: fix several bugs of traffic replay (#698)
1 parent 3b60b7e commit 31aa2e0

27 files changed

+358
-108
lines changed

.golangci.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ issues:
3030
linters:
3131
- gosec
3232
text: "G204:"
33+
- path: ".*.go"
34+
linters:
35+
- gosec
36+
text: "G115:"
3337

3438
linters:
3539
enable:

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ cmd_%:
4747
go build $(BUILDFLAGS) -o $(OUTPUT) $(SOURCE)
4848

4949
golangci-lint:
50-
GOBIN=$(GOBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.54.1
50+
GOBIN=$(GOBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0
5151

5252
go-header:
5353
GOBIN=$(GOBIN) go install github.com/denis-tingaikin/go-header/cmd/go-header@latest

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
1414
github.com/j-keck/arping v1.0.3
1515
github.com/klauspost/compress v1.16.6
16+
github.com/pelletier/go-toml/v2 v2.0.5
1617
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
1718
github.com/pingcap/sysutil v1.0.0
1819
github.com/pingcap/tidb v1.1.0-beta.0.20230103132820-3ccff46aa3bc
@@ -73,7 +74,6 @@ require (
7374
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
7475
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7576
github.com/modern-go/reflect2 v1.0.2 // indirect
76-
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
7777
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 // indirect
7878
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 // indirect
7979
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c // indirect

pkg/manager/vip/network_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2024 PingCAP, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
// go:build linux
4+
//go:build linux
55

66
package vip
77

pkg/proxy/net/capability.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (f Capability) String() string {
9696
if cnt > 0 {
9797
str.WriteByte('|')
9898
}
99-
fmt.Fprintf(str, c.Str)
99+
fmt.Fprint(str, c.Str)
100100
cnt++
101101
}
102102
}

pkg/proxy/net/mysql.go

Lines changed: 76 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -621,13 +621,20 @@ func ParsePrepareStmtResp(resp []byte) (stmtID uint32, paramNum int) {
621621
return
622622
}
623623

624-
func MakeExecuteStmtRequest(stmtID uint32, args []any) ([]byte, error) {
624+
func MakeExecuteStmtRequest(stmtID uint32, args []any, newParamBound bool) ([]byte, error) {
625625
paramNum := len(args)
626626
paramTypes := make([]byte, paramNum*2)
627627
paramValues := make([][]byte, paramNum)
628628
nullBitmap := make([]byte, (paramNum+7)>>3)
629-
dataLen := 1 + 4 + 1 + 4 + len(nullBitmap) + 1 + len(paramTypes)
629+
dataLen := 1 + 4 + 1 + 4
630+
if paramNum > 0 {
631+
dataLen += len(nullBitmap) + 1
632+
}
630633
var newParamBoundFlag byte = 0
634+
if newParamBound {
635+
newParamBoundFlag = 1
636+
dataLen += len(paramTypes)
637+
}
631638

632639
for i := range args {
633640
if args[i] == nil {
@@ -636,7 +643,6 @@ func MakeExecuteStmtRequest(stmtID uint32, args []any) ([]byte, error) {
636643
continue
637644
}
638645

639-
newParamBoundFlag = 1
640646
switch v := args[i].(type) {
641647
case int8:
642648
paramTypes[i<<1] = fieldTypeTiny
@@ -719,7 +725,7 @@ func MakeExecuteStmtRequest(stmtID uint32, args []any) ([]byte, error) {
719725
pos += len(nullBitmap)
720726
request[pos] = newParamBoundFlag
721727
pos++
722-
if newParamBoundFlag == 1 {
728+
if newParamBound {
723729
copy(request[pos:], paramTypes)
724730
pos += len(paramTypes)
725731
}
@@ -734,55 +740,62 @@ func MakeExecuteStmtRequest(stmtID uint32, args []any) ([]byte, error) {
734740
// ParseExecuteStmtRequest parses ComStmtExecute request.
735741
// NOTICE: the type of returned args may be wrong because it doesn't have the knowledge of real param types.
736742
// E.g. []byte is returned as string, and int is returned as int32.
737-
func ParseExecuteStmtRequest(data []byte, paramNum int) (stmtID uint32, args []any, err error) {
738-
if len(data) < 1+4+1+4+1 {
739-
return 0, nil, errors.WithStack(gomysql.ErrMalformPacket)
743+
func ParseExecuteStmtRequest(data []byte, paramNum int, paramTypes []byte) (stmtID uint32, args []any, newParamTypes []byte, err error) {
744+
if len(data) < 1+4+1+4 {
745+
return 0, nil, nil, errors.WithStack(gomysql.ErrMalformPacket)
740746
}
741747

742748
pos := 1
743749
stmtID = binary.LittleEndian.Uint32(data[pos : pos+4])
750+
// paramNum is contained in the ComStmtPrepare but paramTypes is contained in the first ComStmtExecute (with newParamBoundFlag==1).
751+
// If the prepared statement is parsed from the session states, the paramTypes may be empty but the paramNum is not in the session states.
752+
// Just return empty args in this case, which is fine currently.
753+
if paramNum == 0 {
754+
return stmtID, nil, nil, nil
755+
}
744756
// cursor flag and iteration count
745757
pos += 4 + 1 + 4
746758
if len(data) < pos+((paramNum+7)>>3)+1 {
747-
return 0, nil, errors.WithStack(gomysql.ErrMalformPacket)
759+
return 0, nil, nil, errors.WithStack(gomysql.ErrMalformPacket)
748760
}
749761
nullBitmap := data[pos : pos+((paramNum+7)>>3)]
750762
pos += len(nullBitmap)
751763
newParamBoundFlag := data[pos]
752764
pos += 1
753765
args = make([]any, paramNum)
754-
if newParamBoundFlag == 0 {
755-
return stmtID, args, nil
756-
}
757766

758-
if len(data) < pos+paramNum*2 {
759-
return 0, nil, errors.WithStack(gomysql.ErrMalformPacket)
767+
if newParamBoundFlag > 0 {
768+
if len(data) < pos+paramNum<<1 {
769+
return 0, nil, nil, errors.WithStack(gomysql.ErrMalformPacket)
770+
}
771+
paramTypes = data[pos : pos+paramNum<<1]
772+
pos += paramNum << 1
760773
}
761-
paramTypes := data[pos : pos+paramNum*2]
762-
pos += paramNum * 2
763774

764775
for i := 0; i < paramNum; i++ {
765776
if nullBitmap[i/8]&(1<<(uint(i)%8)) > 0 {
766777
args[i] = nil
767778
continue
768779
}
769780
switch paramTypes[i<<1] {
781+
case fieldTypeNULL:
782+
args[i] = nil
770783
case fieldTypeTiny:
771784
if paramTypes[(i<<1)+1] == 0x80 {
772785
args[i] = uint8(data[pos])
773786
} else {
774787
args[i] = int8(data[pos])
775788
}
776789
pos += 1
777-
case fieldTypeShort:
790+
case fieldTypeShort, fieldTypeYear:
778791
v := binary.LittleEndian.Uint16(data[pos : pos+2])
779792
if paramTypes[(i<<1)+1] == 0x80 {
780793
args[i] = v
781794
} else {
782795
args[i] = int16(v)
783796
}
784797
pos += 2
785-
case fieldTypeLong:
798+
case fieldTypeLong, fieldTypeInt24:
786799
v := binary.LittleEndian.Uint32(data[pos : pos+4])
787800
if paramTypes[(i<<1)+1] == 0x80 {
788801
args[i] = v
@@ -804,19 +817,58 @@ func ParseExecuteStmtRequest(data []byte, paramNum int) (stmtID uint32, args []a
804817
case fieldTypeDouble:
805818
args[i] = math.Float64frombits(binary.LittleEndian.Uint64(data[pos : pos+8]))
806819
pos += 8
807-
case fieldTypeString:
808-
v, _, off, err := ParseLengthEncodedBytes(data[pos:])
820+
case fieldTypeDate, fieldTypeTimestamp, fieldTypeDateTime:
821+
length := data[pos]
822+
pos++
823+
switch length {
824+
case 0:
825+
args[i] = "0000-00-00 00:00:00"
826+
case 4:
827+
pos, args[i] = BinaryDate(pos, data)
828+
case 7:
829+
pos, args[i] = BinaryDateTime(pos, data)
830+
case 11:
831+
pos, args[i] = BinaryTimestamp(pos, data)
832+
case 13:
833+
pos, args[i] = BinaryTimestampWithTZ(pos, data)
834+
default:
835+
return 0, nil, nil, errors.WithStack(gomysql.ErrMalformPacket)
836+
}
837+
case fieldTypeTime:
838+
length := data[pos]
839+
pos++
840+
switch length {
841+
case 0:
842+
args[i] = "0"
843+
case 8:
844+
isNegative := data[pos]
845+
pos++
846+
pos, args[i] = BinaryDuration(pos, data, isNegative)
847+
case 12:
848+
isNegative := data[pos]
849+
pos++
850+
pos, args[i] = BinaryDurationWithMS(pos, data, isNegative)
851+
default:
852+
return 0, nil, nil, errors.WithStack(gomysql.ErrMalformPacket)
853+
}
854+
case fieldTypeDecimal, fieldTypeNewDecimal, fieldTypeVarChar, fieldTypeString, fieldTypeVarString, fieldTypeBLOB, fieldTypeTinyBLOB,
855+
fieldTypeMediumBLOB, fieldTypeLongBLOB, fieldTypeEnum, fieldTypeSet, fieldTypeGeometry, fieldTypeBit, fieldTypeJSON, fieldTypeVector:
856+
v, isNull, n, err := ParseLengthEncodedBytes(data[pos:])
809857
if err != nil {
810-
return 0, nil, errors.Wrapf(err, "parse param %d err", i)
858+
return 0, nil, nil, errors.Wrapf(err, "parse param err, type: %d, idx: %d, pos: %d", paramTypes[i<<1], i, pos)
811859
}
812-
args[i] = hack.String(v)
813-
pos += off
860+
if isNull {
861+
args[i] = nil
862+
} else {
863+
args[i] = hack.String(v)
864+
}
865+
pos += n
814866
default:
815-
return 0, nil, errors.Errorf("unsupported type %d", paramTypes[i<<1])
867+
return 0, nil, nil, errors.Errorf("unsupported type %d", paramTypes[i<<1])
816868
}
817869
}
818870

819-
return stmtID, args, nil
871+
return stmtID, args, paramTypes, nil
820872
}
821873

822874
func MakeCloseStmtRequest(stmtID uint32) []byte {

pkg/proxy/net/mysql_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,40 @@ func TestPrepareStmts(t *testing.T) {
126126
float64(1.2),
127127
nil,
128128
}
129+
expectedTypes := []byte{
130+
fieldTypeNULL, 0,
131+
fieldTypeString, 0,
132+
fieldTypeTiny, 0x80,
133+
fieldTypeShort, 0,
134+
fieldTypeLong, 0,
135+
fieldTypeLongLong, 0,
136+
fieldTypeShort, 0x80,
137+
fieldTypeLong, 0x80,
138+
fieldTypeLongLong, 0x80,
139+
fieldTypeFloat, 0,
140+
fieldTypeDouble, 0,
141+
fieldTypeNULL, 0,
142+
}
129143

130144
b := MakePrepareStmtRequest("select ?")
131145
require.Len(t, b, len("select ?")+1)
132146

133-
data1, err := MakeExecuteStmtRequest(1, args)
147+
data1, err := MakeExecuteStmtRequest(1, args, true)
134148
require.NoError(t, err)
135149

136-
stmtID, pArgs, err := ParseExecuteStmtRequest(data1, len(args))
150+
stmtID, pArgs, newParamTypes, err := ParseExecuteStmtRequest(data1, len(args), nil)
137151
require.NoError(t, err)
138152
require.Equal(t, uint32(1), stmtID)
139153
require.EqualValues(t, args, pArgs)
154+
require.Equal(t, expectedTypes, newParamTypes)
155+
156+
data2, err := MakeExecuteStmtRequest(1, pArgs, false)
157+
require.NoError(t, err)
158+
require.NotEqual(t, data1, data2)
140159

141-
data2, err := MakeExecuteStmtRequest(1, pArgs)
160+
stmtID, pArgs, newParamTypes, err = ParseExecuteStmtRequest(data1, len(args), newParamTypes)
142161
require.NoError(t, err)
143-
require.Equal(t, data1, data2)
162+
require.Equal(t, uint32(1), stmtID)
163+
require.EqualValues(t, args, pArgs)
164+
require.Equal(t, expectedTypes, newParamTypes)
144165
}

pkg/proxy/net/protocol.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package net
2626

2727
import (
2828
"bytes"
29+
"encoding/binary"
30+
"fmt"
2931
"io"
3032
)
3133

@@ -125,12 +127,10 @@ func DumpLengthEncodedInt(buffer []byte, n uint64) []byte {
125127
case n <= 0xffffff:
126128
return append(buffer, 0xfd, byte(n), byte(n>>8), byte(n>>16))
127129

128-
case n <= 0xffffffffffffffff:
130+
default:
129131
return append(buffer, 0xfe, byte(n), byte(n>>8), byte(n>>16), byte(n>>24),
130132
byte(n>>32), byte(n>>40), byte(n>>48), byte(n>>56))
131133
}
132-
133-
return buffer
134134
}
135135

136136
// DumpLengthEncodedString dumps string<int>.
@@ -182,3 +182,67 @@ func Uint64ToBytes(n uint64) []byte {
182182
byte(n >> 56),
183183
}
184184
}
185+
186+
func BinaryDate(pos int, paramValues []byte) (int, string) {
187+
year := binary.LittleEndian.Uint16(paramValues[pos : pos+2])
188+
pos += 2
189+
month := paramValues[pos]
190+
pos++
191+
day := paramValues[pos]
192+
pos++
193+
return pos, fmt.Sprintf("%04d-%02d-%02d", year, month, day)
194+
}
195+
196+
func BinaryDateTime(pos int, paramValues []byte) (int, string) {
197+
pos, date := BinaryDate(pos, paramValues)
198+
hour := paramValues[pos]
199+
pos++
200+
minute := paramValues[pos]
201+
pos++
202+
second := paramValues[pos]
203+
pos++
204+
return pos, fmt.Sprintf("%s %02d:%02d:%02d", date, hour, minute, second)
205+
}
206+
207+
func BinaryTimestamp(pos int, paramValues []byte) (int, string) {
208+
pos, dateTime := BinaryDateTime(pos, paramValues)
209+
microSecond := binary.LittleEndian.Uint32(paramValues[pos : pos+4])
210+
pos += 4
211+
return pos, fmt.Sprintf("%s.%06d", dateTime, microSecond)
212+
}
213+
214+
func BinaryTimestampWithTZ(pos int, paramValues []byte) (int, string) {
215+
pos, timestamp := BinaryTimestamp(pos, paramValues)
216+
tzShiftInMin := int16(binary.LittleEndian.Uint16(paramValues[pos : pos+2]))
217+
tzShiftHour := tzShiftInMin / 60
218+
tzShiftAbsMin := tzShiftInMin % 60
219+
if tzShiftAbsMin < 0 {
220+
tzShiftAbsMin = -tzShiftAbsMin
221+
}
222+
pos += 2
223+
return pos, fmt.Sprintf("%s%+02d:%02d", timestamp, tzShiftHour, tzShiftAbsMin)
224+
}
225+
226+
func BinaryDuration(pos int, paramValues []byte, isNegative uint8) (int, string) {
227+
sign := ""
228+
if isNegative >= 1 {
229+
sign = "-"
230+
}
231+
days := binary.LittleEndian.Uint32(paramValues[pos : pos+4])
232+
pos += 4
233+
hours := paramValues[pos]
234+
pos++
235+
minutes := paramValues[pos]
236+
pos++
237+
seconds := paramValues[pos]
238+
pos++
239+
return pos, fmt.Sprintf("%s%d %02d:%02d:%02d", sign, days, hours, minutes, seconds)
240+
}
241+
242+
func BinaryDurationWithMS(pos int, paramValues []byte,
243+
isNegative uint8) (int, string) {
244+
pos, dur := BinaryDuration(pos, paramValues, isNegative)
245+
microSecond := binary.LittleEndian.Uint32(paramValues[pos : pos+4])
246+
pos += 4
247+
return pos, fmt.Sprintf("%s.%06d", dur, microSecond)
248+
}

pkg/sqlreplay/capture/capture.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ func (c *capture) Capture(packet []byte, startTime time.Time, connID uint64, ini
277277
// initSession is slow, do not call it in the lock.
278278
sql, err := initSession()
279279
if err != nil {
280-
c.lg.Warn("failed to init session", zap.Error(err))
280+
// Maybe the connection is in transaction or closing.
281+
c.lg.Debug("failed to init session", zap.Uint64("connID", connID), zap.Error(err))
281282
return
282283
}
283284
initPacket := make([]byte, 0, len(sql)+1)

pkg/sqlreplay/cmd/cmd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestDigest(t *testing.T) {
139139
require.Equal(t, cmd1.Digest(), cmd4.Digest())
140140
require.Equal(t, "select ?", cmd4.QueryText())
141141

142-
data, err := pnet.MakeExecuteStmtRequest(1, []any{1})
142+
data, err := pnet.MakeExecuteStmtRequest(1, []any{1}, true)
143143
require.NoError(t, err)
144144
cmd5 := NewCommand(data, time.Now(), 100)
145145
cmd5.PreparedStmt = "select ?"

0 commit comments

Comments
 (0)