Skip to content

Commit 475507c

Browse files
authored
router: fix connection score becomes wrong when a connection closes during redirecting (#451)
1 parent 3c493b2 commit 475507c

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

pkg/manager/router/router.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ func (b *backendWrapper) String() string {
139139
// connWrapper wraps RedirectableConn.
140140
type connWrapper struct {
141141
RedirectableConn
142-
phase connPhase
142+
// Reference to the target backend if it's redirecting, otherwise nil.
143+
redirectingBackend *backendWrapper
143144
// Last redirect start time of this connection.
144145
lastRedirect time.Time
146+
phase connPhase
145147
}

pkg/manager/router/router_score.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error {
190190
connWrapper.phase = phaseRedirectNotify
191191
// we dont care the results
192192
_ = connWrapper.Redirect(backend)
193+
connWrapper.redirectingBackend = backend
193194
}
194195
}
195196
}
@@ -263,18 +264,25 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec
263264
router.adjustBackendList(toBe, true)
264265
connWrapper.phase = phaseRedirectFail
265266
}
267+
connWrapper.redirectingBackend = nil
266268
addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect)
267269
}
268270

269271
// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
270272
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error {
271273
router.Lock()
272274
defer router.Unlock()
273-
// OnConnClosed is always called after processing ongoing redirect events,
274-
// so the addr passed in is the right backend.
275275
be := router.ensureBackend(addr, true)
276-
be.Value.connScore--
277-
router.removeConn(be, router.getConnWrapper(conn))
276+
connWrapper := router.getConnWrapper(conn)
277+
redirectingBackend := connWrapper.Value.redirectingBackend
278+
// If this connection is redirecting, decrease the score of the target backend.
279+
if redirectingBackend != nil {
280+
redirectingBackend.connScore--
281+
connWrapper.Value.redirectingBackend = nil
282+
} else {
283+
be.Value.connScore--
284+
}
285+
router.removeConn(be, connWrapper)
278286
return nil
279287
}
280288

@@ -375,6 +383,7 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
375383
conn.phase = phaseRedirectNotify
376384
conn.lastRedirect = curTime
377385
conn.Redirect(idlestBackend)
386+
conn.redirectingBackend = idlestBackend
378387
}
379388
}
380389

pkg/manager/router/router_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,3 +900,23 @@ func TestBackendHealthy(t *testing.T) {
900900
require.False(t, conn.to.Healthy())
901901
tester.redirectFinish(1, false)
902902
}
903+
904+
func TestCloseRedirectingConns(t *testing.T) {
905+
// Make the connection redirect.
906+
tester := newRouterTester(t)
907+
tester.addBackends(1)
908+
tester.addConnections(1)
909+
require.Equal(t, 1, tester.getBackendByIndex(0).connScore)
910+
tester.killBackends(1)
911+
tester.addBackends(1)
912+
tester.rebalance(1)
913+
require.Equal(t, 0, tester.getBackendByIndex(0).connScore)
914+
require.Equal(t, 1, tester.getBackendByIndex(1).connScore)
915+
// Close the connection.
916+
tester.updateBackendStatusByAddr(tester.getBackendByIndex(0).Addr(), StatusHealthy)
917+
tester.closeConnections(1, true)
918+
require.Equal(t, 0, tester.getBackendByIndex(0).connScore)
919+
require.Equal(t, 0, tester.getBackendByIndex(1).connScore)
920+
require.Equal(t, 0, tester.getBackendByIndex(0).connList.Len())
921+
require.Equal(t, 0, tester.getBackendByIndex(1).connList.Len())
922+
}

0 commit comments

Comments
 (0)