Skip to content

Commit 3b60b7e

Browse files
authored
router, backend: fix that an unhealthy backend may be never removed (#697)
1 parent d65b2a4 commit 3b60b7e

File tree

5 files changed

+44
-19
lines changed

5 files changed

+44
-19
lines changed

pkg/balance/router/mock_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type mockRedirectableConn struct {
2525
from BackendInst
2626
to BackendInst
2727
receiver ConnEventReceiver
28+
closing bool
2829
}
2930

3031
func newMockRedirectableConn(t *testing.T, id uint64) *mockRedirectableConn {
@@ -56,10 +57,13 @@ func (conn *mockRedirectableConn) Value(k any) any {
5657

5758
func (conn *mockRedirectableConn) Redirect(inst BackendInst) bool {
5859
conn.Lock()
60+
defer conn.Unlock()
61+
if conn.closing {
62+
return false
63+
}
5964
require.Nil(conn.t, conn.to)
6065
require.True(conn.t, inst.Healthy())
6166
conn.to = inst
62-
conn.Unlock()
6367
return true
6468
}
6569

pkg/balance/router/router_score.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -385,21 +385,26 @@ func (router *ScoreBasedRouter) rebalance(ctx context.Context) {
385385

386386
func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *backendWrapper, toBackend *backendWrapper,
387387
reason string, logFields []zap.Field, curTime time.Time) {
388-
fields := []zap.Field{
389-
zap.Uint64("connID", conn.ConnectionID()),
390-
zap.String("from", fromBackend.addr),
391-
zap.String("to", toBackend.addr),
388+
// Skip the connection if it's closing.
389+
if conn.Redirect(toBackend) {
390+
fields := []zap.Field{
391+
zap.Uint64("connID", conn.ConnectionID()),
392+
zap.String("from", fromBackend.addr),
393+
zap.String("to", toBackend.addr),
394+
}
395+
fields = append(fields, logFields...)
396+
router.logger.Debug("begin redirect connection", fields...)
397+
fromBackend.connScore--
398+
router.removeBackendIfEmpty(fromBackend)
399+
toBackend.connScore++
400+
conn.phase = phaseRedirectNotify
401+
conn.redirectReason = reason
402+
conn.redirectingBackend = toBackend
403+
} else {
404+
// Avoid it to be redirected again immediately.
405+
conn.phase = phaseRedirectFail
392406
}
393-
fields = append(fields, logFields...)
394-
router.logger.Debug("begin redirect connection", fields...)
395-
fromBackend.connScore--
396-
router.removeBackendIfEmpty(fromBackend)
397-
toBackend.connScore++
398-
conn.phase = phaseRedirectNotify
399-
conn.redirectReason = reason
400407
conn.lastRedirect = curTime
401-
conn.Redirect(toBackend)
402-
conn.redirectingBackend = toBackend
403408
}
404409

405410
func (router *ScoreBasedRouter) removeBackendIfEmpty(backend *backendWrapper) bool {

pkg/balance/router/router_score_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,3 +915,16 @@ func TestControlSpeed(t *testing.T) {
915915
tester.redirectFinish(redirectingNum, false)
916916
}
917917
}
918+
919+
func TestRedirectFail(t *testing.T) {
920+
tester := newRouterTester(t, nil)
921+
tester.addBackends(1)
922+
tester.addConnections(1)
923+
tester.conns[1].closing = true
924+
tester.killBackends(1)
925+
tester.addBackends(1)
926+
tester.rebalance(1)
927+
// If the connection refuses to redirect, the connScore should not change.
928+
require.Equal(t, 1, tester.getBackendByIndex(0).connScore)
929+
require.Equal(t, 0, tester.getBackendByIndex(1).connScore)
930+
}

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -513,13 +513,12 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context) {
513513
// tryRedirect tries to migrate the session if the session is redirect-able.
514514
// NOTE: processLock should be held before calling this function.
515515
func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
516-
if mgr.closeStatus.Load() >= statusNotifyClose || ctx.Err() != nil {
517-
return
518-
}
519516
backendInst := mgr.redirectInfo.Load()
517+
// No redirection signal or redirection is finished.
520518
if backendInst == nil {
521519
return
522520
}
521+
// Redirection will be retried after the next command finishes.
523522
if !mgr.cmdProcessor.finishedTxn() {
524523
return
525524
}
@@ -536,6 +535,10 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
536535
// - Avoid the risk of deadlock
537536
mgr.redirectResCh <- rs
538537
}()
538+
// Even if the connection is closing, the redirection result must still be sent to recover the connection scores.
539+
if mgr.closeStatus.Load() >= statusNotifyClose || ctx.Err() != nil {
540+
return
541+
}
539542
// It may have been too long since the redirection signal was sent, and the target backend may be unhealthy now.
540543
if !(*backendInst).Healthy() {
541544
rs.err = ErrTargetUnhealthy

pkg/proxy/backend/backend_conn_mgr_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -638,8 +638,8 @@ func TestCloseWhileRedirect(t *testing.T) {
638638
})
639639
// Make sure the process goroutine finishes.
640640
ts.mp.wg.Wait()
641-
// Redirect() should not panic after Close().
642-
ts.mp.Redirect(newMockBackendInst(ts))
641+
// Redirect() should not panic after Close() and it returns false.
642+
require.False(t, ts.mp.Redirect(newMockBackendInst(ts)))
643643
eventReceiver.checkEvent(t, eventSucceed)
644644
wg.Wait()
645645
eventReceiver.checkEvent(t, eventClose)

0 commit comments

Comments
 (0)