Skip to content

Commit 5d2dfc2

Browse files
authored
backend, net: add more logs (#482)
1 parent 6762ed9 commit 5d2dfc2

File tree

4 files changed

+80
-5
lines changed

4 files changed

+80
-5
lines changed

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"encoding/binary"
1010
"encoding/json"
1111
"fmt"
12-
1312
"net"
1413
"os"
1514
"strings"
@@ -20,6 +19,7 @@ import (
2019

2120
"github.com/cenkalti/backoff/v4"
2221
"github.com/go-mysql-org/go-mysql/mysql"
22+
"github.com/pingcap/tidb/parser"
2323
"github.com/pingcap/tiproxy/lib/config"
2424
"github.com/pingcap/tiproxy/lib/util/errors"
2525
"github.com/pingcap/tiproxy/lib/util/waitgroup"
@@ -277,19 +277,34 @@ func (mgr *BackendConnManager) getBackendIO(ctx context.Context, cctx ConnContex
277277
// ExecuteCmd forwards messages between the client and the backend.
278278
// If it finds that the session is ready for redirection, it migrates the session.
279279
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {
280+
startTime := monotime.Now()
280281
mgr.processLock.Lock()
281282
defer func() {
282283
mgr.setQuitSourceByErr(err)
283284
mgr.handshakeHandler.OnTraffic(mgr)
284-
mgr.lastActiveTime = monotime.Now()
285+
now := monotime.Now()
286+
if err != nil && errors.Is(err, ErrBackendConn) {
287+
cmd, data := pnet.Command(request[0]), request[1:]
288+
var query string
289+
if cmd == pnet.ComQuery {
290+
query = parser.Normalize(pnet.ParseQueryPacket(data))
291+
if len(query) > 256 {
292+
query = query[:256]
293+
}
294+
}
295+
// idle_time: maybe the idle time exceeds wait_timeout?
296+
// execute_time and query: maybe this query causes TiDB OOM?
297+
mgr.logger.Info("backend disconnects", zap.Duration("idle_time", time.Duration(now-mgr.lastActiveTime)),
298+
zap.Duration("execute_time", time.Duration(now-startTime)), zap.Stringer("cmd", cmd), zap.String("query", query))
299+
}
300+
mgr.lastActiveTime = now
285301
mgr.processLock.Unlock()
286302
}()
287303
if len(request) < 1 {
288304
err = mysql.ErrMalformPacket
289305
return
290306
}
291307
cmd := pnet.Command(request[0])
292-
startTime := monotime.Now()
293308

294309
// Once the request is accepted, it's treated in the transaction, so we don't check graceful shutdown here.
295310
if mgr.closeStatus.Load() >= statusClosing {

pkg/proxy/backend/backend_conn_mgr_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,6 +1297,49 @@ func TestTrafficMetrics(t *testing.T) {
12971297
ts.runTests(runners)
12981298
}
12991299

1300+
func TestDisconnectLog(t *testing.T) {
1301+
ts := newBackendMgrTester(t)
1302+
tests := []struct {
1303+
runner runner
1304+
checker checker
1305+
}{
1306+
{
1307+
// 1st handshake
1308+
runner: runner{
1309+
client: ts.mc.authenticate,
1310+
proxy: ts.firstHandshake4Proxy,
1311+
backend: ts.handshake4Backend,
1312+
},
1313+
},
1314+
{
1315+
// proxy logs SQL when the backend disconnects
1316+
runner: runner{
1317+
client: func(packetIO *pnet.PacketIO) error {
1318+
ts.mc.sql = "select 1"
1319+
return ts.mc.request(packetIO)
1320+
},
1321+
proxy: func(clientIO, backendIO *pnet.PacketIO) error {
1322+
err := ts.forwardCmd4Proxy(clientIO, backendIO)
1323+
_ = clientIO.Close()
1324+
return err
1325+
},
1326+
backend: func(packetIO *pnet.PacketIO) error {
1327+
return packetIO.Close()
1328+
},
1329+
},
1330+
checker: func(t *testing.T, ts *testSuite) {
1331+
require.True(t, pnet.IsDisconnectError(ts.mc.err))
1332+
require.ErrorIs(t, ts.mp.err, ErrBackendConn)
1333+
require.True(t, strings.Contains(ts.mp.text.String(), "select ?"))
1334+
},
1335+
},
1336+
}
1337+
// Do not run ts.runTests(runners) to skip the general checker.
1338+
for _, test := range tests {
1339+
ts.runAndCheck(ts.t, test.checker, test.runner.client, test.runner.backend, test.runner.proxy)
1340+
}
1341+
}
1342+
13001343
func BenchmarkSyncMap(b *testing.B) {
13011344
for i := 0; i < b.N; i++ {
13021345
var m sync.Map

pkg/proxy/net/mysql.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,3 +467,12 @@ func Attr2ZapFields(attrs map[string]string) []zap.Field {
467467
}
468468
return fields
469469
}
470+
471+
// ParseQueryPacket returns the statement in the CMD_QUERY packet.
472+
// data is the payload after byte CMD_QUERY.
473+
func ParseQueryPacket(data []byte) string {
474+
if len(data) > 0 && data[len(data)-1] == 0 {
475+
data = data[:len(data)-1]
476+
}
477+
return hack.String(data)
478+
}

pkg/proxy/net/packetio.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,11 @@ func (p *PacketIO) readOnePacket() ([]byte, bool, error) {
247247
if err := ReadFull(p.readWriter, p.header[:]); err != nil {
248248
return nil, false, errors.Wrap(ErrReadConn, err)
249249
}
250-
p.readWriter.SetSequence(p.header[3] + 1)
250+
sequence, pktSequence := p.header[3], p.readWriter.Sequence()
251+
if sequence != pktSequence {
252+
p.logger.Warn("sequence mismatch", zap.Uint8("expected", pktSequence), zap.Uint8("actual", sequence))
253+
}
254+
p.readWriter.SetSequence(sequence + 1)
251255

252256
length := int(p.header[0]) | int(p.header[1])<<8 | int(p.header[2])<<16
253257
data := make([]byte, length)
@@ -349,7 +353,11 @@ func (p *PacketIO) ForwardUntil(dest *PacketIO, isEnd func(firstByte byte, first
349353
}
350354
} else {
351355
for {
352-
p.readWriter.SetSequence(header[3] + 1)
356+
sequence, pktSequence := header[3], p.readWriter.Sequence()
357+
if sequence != pktSequence {
358+
p.logger.Warn("sequence mismatch", zap.Uint8("expected", pktSequence), zap.Uint8("actual", sequence))
359+
}
360+
p.readWriter.SetSequence(sequence + 1)
353361
// Sequence may be different (e.g. with compression) so we can't just copy the data to the destination.
354362
dest.readWriter.SetSequence(dest.readWriter.Sequence() + 1)
355363
p.limitReader.N = int64(length + 4)

0 commit comments

Comments
 (0)