Skip to content

Commit cf697c3

Browse files
router, backend: fix some migrations are skipped when some connections are closing (#813) (#814)
Co-authored-by: djshow832 <[email protected]>
1 parent 7ec9ae7 commit cf697c3

File tree

5 files changed

+49
-27
lines changed

5 files changed

+49
-27
lines changed

pkg/balance/router/mock_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/pingcap/tiproxy/pkg/balance/observer"
1515
"github.com/pingcap/tiproxy/pkg/balance/policy"
1616
"github.com/stretchr/testify/require"
17+
"go.uber.org/zap"
1718
"go.uber.org/zap/zapcore"
1819
)
1920

@@ -80,6 +81,10 @@ func (conn *mockRedirectableConn) ConnectionID() uint64 {
8081
return conn.connID
8182
}
8283

84+
func (conn *mockRedirectableConn) ConnInfo() []zap.Field {
85+
return nil
86+
}
87+
8388
func (conn *mockRedirectableConn) getAddr() (string, string) {
8489
conn.Lock()
8590
defer conn.Unlock()

pkg/balance/router/router.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
glist "github.com/bahlo/generic-list-go"
1111
"github.com/pingcap/tiproxy/lib/util/errors"
1212
"github.com/pingcap/tiproxy/pkg/balance/observer"
13+
"go.uber.org/zap"
1314
)
1415

1516
var (
@@ -69,6 +70,7 @@ type RedirectableConn interface {
6970
// Redirect returns false if the current conn is not redirectable.
7071
Redirect(backend BackendInst) bool
7172
ConnectionID() uint64
73+
ConnInfo() []zap.Field
7274
}
7375

7476
// BackendInst defines a backend that a connection is redirecting to.

pkg/balance/router/router_score.go

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -364,41 +364,38 @@ func (router *ScoreBasedRouter) rebalance(ctx context.Context) {
364364
}
365365
}
366366
// Migrate balanceCount connections.
367-
for i := 0; i < count && ctx.Err() == nil; i++ {
368-
var ce *glist.Element[*connWrapper]
369-
for ele := fromBackend.connList.Front(); ele != nil; ele = ele.Next() {
370-
conn := ele.Value
371-
switch conn.phase {
372-
case phaseRedirectNotify:
373-
// A connection cannot be redirected again when it has not finished redirecting.
367+
i := 0
368+
for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() {
369+
conn := ele.Value
370+
switch conn.phase {
371+
case phaseRedirectNotify:
372+
// A connection cannot be redirected again when it has not finished redirecting.
373+
continue
374+
case phaseRedirectFail:
375+
// If it failed recently, it will probably fail this time.
376+
if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) {
374377
continue
375-
case phaseRedirectFail:
376-
// If it failed recently, it will probably fail this time.
377-
if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) {
378-
continue
379-
}
380378
}
381-
ce = ele
382-
break
383379
}
384-
if ce == nil {
385-
break
380+
if router.redirectConn(conn, fromBackend, toBackend, reason, logFields, curTime) {
381+
router.lastRedirectTime = curTime
382+
i++
386383
}
387-
router.redirectConn(ce.Value, fromBackend, toBackend, reason, logFields, curTime)
388-
router.lastRedirectTime = curTime
389384
}
390385
}
391386

392387
func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *backendWrapper, toBackend *backendWrapper,
393-
reason string, logFields []zap.Field, curTime time.Time) {
388+
reason string, logFields []zap.Field, curTime time.Time) bool {
394389
// Skip the connection if it's closing.
395-
if conn.Redirect(toBackend) {
396-
fields := []zap.Field{
397-
zap.Uint64("connID", conn.ConnectionID()),
398-
zap.String("from", fromBackend.addr),
399-
zap.String("to", toBackend.addr),
400-
}
401-
fields = append(fields, logFields...)
390+
fields := []zap.Field{
391+
zap.Uint64("connID", conn.ConnectionID()),
392+
zap.String("from", fromBackend.addr),
393+
zap.String("to", toBackend.addr),
394+
}
395+
fields = append(fields, logFields...)
396+
fields = append(fields, conn.ConnInfo()...)
397+
succeed := conn.Redirect(toBackend)
398+
if succeed {
402399
router.logger.Debug("begin redirect connection", fields...)
403400
fromBackend.connScore--
404401
router.removeBackendIfEmpty(fromBackend)
@@ -409,8 +406,10 @@ func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *bac
409406
} else {
410407
// Avoid it to be redirected again immediately.
411408
conn.phase = phaseRedirectFail
409+
router.logger.Debug("skip redirecting because it's closing", fields...)
412410
}
413411
conn.lastRedirect = curTime
412+
return succeed
414413
}
415414

416415
func (router *ScoreBasedRouter) removeBackendIfEmpty(backend *backendWrapper) bool {

pkg/balance/router/router_score_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,4 +934,15 @@ func TestRedirectFail(t *testing.T) {
934934
// If the connection refuses to redirect, the connScore should not change.
935935
require.Equal(t, 1, tester.getBackendByIndex(0).connScore)
936936
require.Equal(t, 0, tester.getBackendByIndex(1).connScore)
937+
938+
tester = newRouterTester(t, nil)
939+
tester.addBackends(1)
940+
tester.addConnections(2)
941+
tester.conns[1].closing = true
942+
tester.killBackends(1)
943+
tester.addBackends(1)
944+
tester.rebalance(1)
945+
// Even if the first connection refuses to redirect, the second one should be redirected.
946+
require.Equal(t, 1, tester.getBackendByIndex(0).connScore)
947+
require.Equal(t, 1, tester.getBackendByIndex(1).connScore)
937948
}

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ type BackendConnManager struct {
137137
redirectResCh chan *redirectResult
138138
// GracefulClose() sets it without lock.
139139
closeStatus atomic.Int32
140+
// The time when the connection was created.
141+
createTime time.Time
140142
// The last time when the backend is active.
141143
lastActiveTime time.Time
142144
// The traffic recorded last time.
@@ -197,6 +199,7 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO pnet.Packet
197199
return errors.New("graceful shutdown before connecting")
198200
}
199201
startTime := time.Now()
202+
mgr.createTime = startTime
200203
var err error
201204
if len(username) == 0 {
202205
// real client
@@ -842,6 +845,8 @@ func (mgr *BackendConnManager) ConnInfo() []zap.Field {
842845
fields = mgr.authenticator.ConnInfo()
843846
}
844847
mgr.processLock.Unlock()
845-
fields = append(fields, zap.String("backend_addr", mgr.ServerAddr()))
848+
fields = append(fields, zap.String("backend_addr", mgr.ServerAddr()),
849+
zap.Time("create_time", mgr.createTime),
850+
zap.Time("last_active_time", mgr.lastActiveTime))
846851
return fields
847852
}

0 commit comments

Comments
 (0)