Skip to content

Commit 3d80091

Browse files
authored
router, backend: remove NotifyBackendStatus (#476)
1 parent 5ca4fc1 commit 3d80091

File tree

7 files changed

+170
-154
lines changed

7 files changed

+170
-154
lines changed

pkg/manager/router/backend_selector.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,25 @@
44
package router
55

66
type BackendSelector struct {
7-
excluded []string
8-
cur string
9-
routeOnce func(excluded []string) (string, error)
10-
onCreate func(addr string, conn RedirectableConn, succeed bool)
7+
excluded []BackendInst
8+
cur BackendInst
9+
routeOnce func(excluded []BackendInst) (BackendInst, error)
10+
onCreate func(backend BackendInst, conn RedirectableConn, succeed bool)
1111
}
1212

13-
func (bs *BackendSelector) Reset() {
14-
bs.excluded = bs.excluded[:0]
15-
}
16-
17-
func (bs *BackendSelector) Next() (string, error) {
18-
addr, err := bs.routeOnce(bs.excluded)
19-
if err != nil {
20-
return addr, err
13+
func (bs *BackendSelector) Next() (BackendInst, error) {
14+
backend, err := bs.routeOnce(bs.excluded)
15+
// If all backends are enumerated, reset and try again.
16+
if err == ErrNoBackend && len(bs.excluded) > 0 {
17+
bs.excluded = bs.excluded[:0]
18+
backend, err = bs.routeOnce(bs.excluded)
2119
}
22-
bs.cur = addr
23-
if bs.cur != "" {
24-
bs.excluded = append(bs.excluded, bs.cur)
20+
if err != nil {
21+
return backend, err
2522
}
26-
return bs.cur, nil
23+
bs.cur = backend
24+
bs.excluded = append(bs.excluded, backend)
25+
return backend, nil
2726
}
2827

2928
func (bs *BackendSelector) Finish(conn RedirectableConn, succeed bool) {

pkg/manager/router/router.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@ import (
88
"time"
99

1010
glist "github.com/bahlo/generic-list-go"
11+
"github.com/pingcap/tiproxy/lib/util/errors"
1112
"github.com/pingcap/tiproxy/pkg/util/monotime"
1213
)
1314

15+
var (
16+
ErrNoBackend = errors.New("no available backend")
17+
)
18+
1419
// ConnEventReceiver receives connection events.
1520
type ConnEventReceiver interface {
1621
OnRedirectSucceed(from, to string, conn RedirectableConn) error
@@ -66,7 +71,6 @@ type RedirectableConn interface {
6671
Value(key any) any
6772
// Redirect returns false if the current conn is not redirectable.
6873
Redirect(backend BackendInst) bool
69-
NotifyBackendStatus(status BackendStatus)
7074
ConnectionID() uint64
7175
}
7276

pkg/manager/router/router_score.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ func (router *ScoreBasedRouter) setConnWrapper(conn RedirectableConn, ce *glist.
7272
conn.SetValue(_routerKey, ce)
7373
}
7474

75-
func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
75+
func (router *ScoreBasedRouter) routeOnce(excluded []BackendInst) (BackendInst, error) {
7676
router.Lock()
7777
defer router.Unlock()
7878
if router.observeError != nil {
79-
return "", router.observeError
79+
return nil, router.observeError
8080
}
8181
for be := router.backends.Back(); be != nil; be = be.Prev() {
8282
backend := be.Value
@@ -87,29 +87,29 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
8787
}
8888
found := false
8989
for _, ex := range excluded {
90-
if ex == backend.addr {
90+
if ex.Addr() == backend.Addr() {
9191
found = true
9292
break
9393
}
9494
}
9595
if !found {
9696
backend.connScore++
9797
router.adjustBackendList(be, false)
98-
return backend.addr, nil
98+
return backend, nil
9999
}
100100
}
101101
// No available backends, maybe the health check result is outdated during rolling restart.
102102
// Refresh the backends asynchronously in this case.
103103
if router.observer != nil {
104104
router.observer.Refresh()
105105
}
106-
return "", nil
106+
return nil, ErrNoBackend
107107
}
108108

109-
func (router *ScoreBasedRouter) onCreateConn(addr string, conn RedirectableConn, succeed bool) {
109+
func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn RedirectableConn, succeed bool) {
110110
router.Lock()
111111
defer router.Unlock()
112-
be := router.ensureBackend(addr, true)
112+
be := router.ensureBackend(backendInst.Addr(), true)
113113
backend := be.Value
114114
if succeed {
115115
connWrapper := &connWrapper{
@@ -136,7 +136,6 @@ func (router *ScoreBasedRouter) addConn(be *glist.Element[*backendWrapper], conn
136136
ce := backend.connList.PushBack(conn)
137137
setBackendConnMetrics(backend.addr, backend.connList.Len())
138138
router.setConnWrapper(conn, ce)
139-
conn.NotifyBackendStatus(backend.Status())
140139
router.adjustBackendList(be, false)
141140
}
142141

@@ -313,10 +312,6 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]*BackendHea
313312
zap.String("prev", backend.mu.String()), zap.String("cur", health.String()))
314313
backend.setHealth(*health)
315314
router.adjustBackendList(be, true)
316-
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
317-
conn := ele.Value
318-
conn.NotifyBackendStatus(health.Status)
319-
}
320315
}
321316
}
322317
if len(backends) > 0 {

pkg/manager/router/router_static.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,41 @@
33

44
package router
55

6+
import "sync/atomic"
7+
68
var _ Router = &StaticRouter{}
79

810
type StaticRouter struct {
9-
addrs []string
10-
cnt int
11+
backends []*StaticBackend
12+
cnt int
1113
}
1214

13-
func NewStaticRouter(addr []string) *StaticRouter {
14-
return &StaticRouter{addrs: addr}
15+
func NewStaticRouter(addrs []string) *StaticRouter {
16+
backends := make([]*StaticBackend, 0, len(addrs))
17+
for _, addr := range addrs {
18+
backends = append(backends, NewStaticBackend(addr))
19+
}
20+
return &StaticRouter{backends: backends}
1521
}
1622

1723
func (r *StaticRouter) GetBackendSelector() BackendSelector {
1824
return BackendSelector{
19-
routeOnce: func(excluded []string) (string, error) {
20-
for _, addr := range r.addrs {
25+
routeOnce: func(excluded []BackendInst) (BackendInst, error) {
26+
for _, backend := range r.backends {
2127
found := false
2228
for _, e := range excluded {
23-
if e == addr {
29+
if e.Addr() == backend.Addr() {
2430
found = true
2531
break
2632
}
2733
}
2834
if !found {
29-
return addr, nil
35+
return backend, nil
3036
}
3137
}
32-
return "", nil
38+
return nil, ErrNoBackend
3339
},
34-
onCreate: func(addr string, conn RedirectableConn, succeed bool) {
40+
onCreate: func(backend BackendInst, conn RedirectableConn, succeed bool) {
3541
if succeed {
3642
r.cnt++
3743
}
@@ -68,3 +74,28 @@ func (r *StaticRouter) OnConnClosed(addr string, conn RedirectableConn) error {
6874
r.cnt--
6975
return nil
7076
}
77+
78+
type StaticBackend struct {
79+
addr string
80+
healthy atomic.Bool
81+
}
82+
83+
func NewStaticBackend(addr string) *StaticBackend {
84+
backend := &StaticBackend{
85+
addr: addr,
86+
}
87+
backend.healthy.Store(true)
88+
return backend
89+
}
90+
91+
func (b *StaticBackend) Addr() string {
92+
return b.addr
93+
}
94+
95+
func (b *StaticBackend) Healthy() bool {
96+
return b.healthy.Load()
97+
}
98+
99+
func (b *StaticBackend) SetHealthy(healthy bool) {
100+
b.healthy.Store(healthy)
101+
}

0 commit comments

Comments
 (0)