Skip to content

Commit 791d6f0

Browse files
authored
Merge pull request #191 from ydb-platform/safe-balancers
* Refactored balancers (makes concurrent-safe)
2 parents b096da3 + b3cd632 commit 791d6f0

File tree

11 files changed

+473
-558
lines changed

11 files changed

+473
-558
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
* Refactored balancers (makes concurrent-safe)
2+
* Excluded separate balancers lock from cluster
3+
* Refactored `cluster.Cluster` interface (`Insert` and `Remove` returning nothing now)
4+
* Replaced unsafe `cluster.close` boolean flag to `cluster.done` chan for listening close event
5+
* Added internal checker `cluster.isClosed()` for check cluster state
6+
* Extracted getting available conn from balancer to internal helper `cluster.get` (called inside `cluster.Get` as last effort)
7+
* Added checking `conn.Conn` availability with `conn.Ping()` in prefer nodeID case
8+
19
## v3.18.1
210
* Added `conn.Ping(ctx)` method for check availability of `conn.Conn`
311
* Refactored `cluster.Cluster.Get(ctx)` to return only available connection (instead of returning any connection from balancer)
File renamed without changes.

internal/balancer/multi/multi.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package multi
22

33
import (
4+
"sync"
5+
46
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
57
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
68
)
@@ -18,6 +20,7 @@ type multiHandle struct {
1820
}
1921

2022
type multi struct {
23+
mu sync.RWMutex
2124
balancer []balancer.Balancer
2225
filter []func(conn.Conn) bool
2326
}
@@ -35,6 +38,8 @@ func (m *multi) Create() balancer.Balancer {
3538

3639
func WithBalancer(b balancer.Balancer, filter func(cc conn.Conn) bool) Option {
3740
return func(m *multi) {
41+
m.mu.Lock()
42+
defer m.mu.Unlock()
3843
m.balancer = append(m.balancer, b)
3944
m.filter = append(m.filter, filter)
4045
}
@@ -43,6 +48,8 @@ func WithBalancer(b balancer.Balancer, filter func(cc conn.Conn) bool) Option {
4348
type Option func(*multi)
4449

4550
func (m *multi) Contains(x balancer.Element) bool {
51+
m.mu.RLock()
52+
defer m.mu.RUnlock()
4653
for i, h := range x.(multiHandle).elements {
4754
if h != nil && m.balancer[i].Contains(h) {
4855
return true
@@ -52,6 +59,8 @@ func (m *multi) Contains(x balancer.Element) bool {
5259
}
5360

5461
func (m *multi) Next() conn.Conn {
62+
m.mu.RLock()
63+
defer m.mu.RUnlock()
5564
for _, b := range m.balancer {
5665
if c := b.Next(); c != nil {
5766
return c
@@ -61,6 +70,9 @@ func (m *multi) Next() conn.Conn {
6170
}
6271

6372
func (m *multi) Insert(conn conn.Conn) balancer.Element {
73+
m.mu.Lock()
74+
defer m.mu.Unlock()
75+
6476
var (
6577
n = len(m.filter)
6678
h = multiHandle{
@@ -82,6 +94,9 @@ func (m *multi) Insert(conn conn.Conn) balancer.Element {
8294
}
8395

8496
func (m *multi) Remove(x balancer.Element) (removed bool) {
97+
m.mu.Lock()
98+
defer m.mu.Unlock()
99+
85100
for i, h := range x.(multiHandle).elements {
86101
if h != nil {
87102
if m.balancer[i].Remove(h) {

internal/balancer/rr/rr.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
// routine – that is, not a runtime metric) and interprets it as inversion of
1919
// weight.
2020
type roundRobin struct {
21+
mu sync.RWMutex
2122
min float32
2223
max float32
2324
belt []int
@@ -32,28 +33,29 @@ func (r *roundRobin) Create() balancer.Balancer {
3233

3334
func RoundRobin() balancer.Balancer {
3435
return &roundRobin{
35-
r: rand.New(),
36+
r: rand.New(rand.WithLock()),
3637
}
3738
}
3839

3940
func RandomChoice() balancer.Balancer {
4041
return &randomChoice{
4142
roundRobin: roundRobin{
42-
r: rand.New(),
43+
r: rand.New(rand.WithLock()),
4344
},
4445
}
4546
}
4647

4748
type randomChoice struct {
4849
roundRobin
49-
m sync.Mutex
5050
}
5151

5252
func (r *randomChoice) Create() balancer.Balancer {
5353
return RandomChoice()
5454
}
5555

5656
func (r *roundRobin) Next() conn.Conn {
57+
r.mu.RLock()
58+
defer r.mu.RUnlock()
5759
if n := len(r.conns); n == 0 {
5860
return nil
5961
}
@@ -63,23 +65,27 @@ func (r *roundRobin) Next() conn.Conn {
6365
}
6466

6567
func (r *randomChoice) Next() conn.Conn {
68+
r.mu.RLock()
69+
defer r.mu.RUnlock()
6670
if n := len(r.conns); n == 0 {
6771
return nil
6872
}
69-
r.m.Lock()
7073
i := r.belt[r.r.Int(len(r.belt))]
71-
r.m.Unlock()
7274
return r.conns[i].Conn
7375
}
7476

7577
func (r *roundRobin) Insert(conn conn.Conn) balancer.Element {
78+
r.mu.Lock()
79+
defer r.mu.Unlock()
7680
e := r.conns.Insert(conn)
7781
r.updateMinMax(e.Conn)
7882
r.belt = r.distribute()
7983
return e
8084
}
8185

8286
func (r *roundRobin) Remove(x balancer.Element) bool {
87+
r.mu.Lock()
88+
defer r.mu.Unlock()
8389
el := x.(*list.Element)
8490
r.conns.Remove(el)
8591
r.inspectMinMax(el.Conn.Endpoint().LoadFactor())
@@ -88,6 +94,8 @@ func (r *roundRobin) Remove(x balancer.Element) bool {
8894
}
8995

9096
func (r *roundRobin) Contains(x balancer.Element) bool {
97+
r.mu.RLock()
98+
defer r.mu.RUnlock()
9199
if x == nil {
92100
return false
93101
}
@@ -98,6 +106,7 @@ func (r *roundRobin) Contains(x balancer.Element) bool {
98106
return r.conns.Contains(el)
99107
}
100108

109+
// r.mu must be held
101110
func (r *roundRobin) updateMinMax(cc conn.Conn) {
102111
if len(r.conns) == 1 {
103112
r.min = cc.Endpoint().LoadFactor()
@@ -112,6 +121,7 @@ func (r *roundRobin) updateMinMax(cc conn.Conn) {
112121
}
113122
}
114123

124+
// r.mu must be held
115125
func (r *roundRobin) inspectMinMax(loadFactor float32) {
116126
if r.min != loadFactor && r.max != loadFactor {
117127
return
@@ -133,13 +143,15 @@ func (r *roundRobin) inspectMinMax(loadFactor float32) {
133143
}
134144
}
135145

146+
// r.mu must be held
136147
func (r *roundRobin) distribute() []int {
137148
return r.spread(distribution(
138149
r.min, int32(len(r.conns)),
139150
r.max, 1,
140151
))
141152
}
142153

154+
// r.mu must be held
143155
func (r *roundRobin) spread(f func(float32) int32) []int {
144156
var (
145157
dist = make([]int32, 0, len(r.conns))
@@ -244,13 +256,3 @@ func (h *distItemsHeap) Pop() interface{} {
244256
*h = p[:n-1]
245257
return x
246258
}
247-
248-
func IsRoundRobin(i interface{}) bool {
249-
_, ok := i.(*roundRobin)
250-
return ok
251-
}
252-
253-
func IsRandomChoice(i interface{}) bool {
254-
_, ok := i.(*randomChoice)
255-
return ok
256-
}

0 commit comments

Comments
 (0)