-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_balance.go
More file actions
144 lines (121 loc) · 2.63 KB
/
load_balance.go
File metadata and controls
144 lines (121 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package grpcloadbalancing
import (
"errors"
"sync"
)
/*
* based on http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling#
*/
// LoadBalance represent loadbalance . prefer to use mutex since shared state
type LoadBalance struct {
sync.Mutex
lastSelected int // lastSelected server
cw int // current weight
gcd int // gcd of all weights
maxWeight int
endpoints []*Endpoint
}
// NewLoadBalance create load balance
func NewLoadBalance(endpoints []*Endpoint) *LoadBalance {
// calculate gcd
return &LoadBalance{
lastSelected: -1,
cw: 0,
endpoints: endpoints,
gcd: <-calculateEndpointsGCD(endpoints),
maxWeight: <-getMaxWeight(endpoints),
}
}
// Get endpoint load balance
func (l *LoadBalance) Get() (*Endpoint, error) {
l.Lock()
defer l.Unlock()
endPoint, err := l.getEndpoint()
if err != nil {
return nil, err
}
if err := endPoint.checkOrInitiateNewConnection(); err != nil {
return nil, err
}
return endPoint, nil
}
// AddEndpoint add new endpoint and reset stats
func (l *LoadBalance) AddEndpoint(endpoint *Endpoint) {
l.Lock()
defer l.Unlock()
l.endpoints = append(l.endpoints, endpoint)
l.reset()
}
// reset and recalculated lastSelected, cw, gcd, maxWeight
func (l *LoadBalance) reset() {
l.Lock()
defer l.Unlock()
l.lastSelected = -1
l.cw = 0
l.gcd = <-calculateEndpointsGCD(l.endpoints)
l.maxWeight = <-getMaxWeight(l.endpoints)
}
// Destroy connection and commit suicide
func (l *LoadBalance) Destroy() error {
l.Lock()
defer l.Unlock()
for _, v := range l.endpoints {
v.destroy()
}
l.endpoints = nil
return nil
}
func (l *LoadBalance) getEndpoint() (*Endpoint, error) {
for {
l.lastSelected = (l.lastSelected + 1) % len(l.endpoints)
if l.lastSelected == 0 {
l.cw = l.cw - l.gcd
if l.cw <= 0 {
l.cw = l.maxWeight
if l.cw == 0 {
return nil, errors.New("null error")
}
}
}
if l.endpoints[l.lastSelected].weight >= l.cw {
return l.endpoints[l.lastSelected], nil
}
continue
}
}
func getMaxWeight(endpoints []*Endpoint) chan int {
result := make(chan int)
go func() {
max := -1
for _, v := range endpoints {
if v.weight > max {
max = v.weight
}
}
result <- max
close(result)
}()
return result
}
func calculateEndpointsGCD(endpoints []*Endpoint) chan int {
gcdResult := make(chan int)
go func() {
divider := -1
for _, v := range endpoints {
if divider == -1 {
divider = v.weight
} else {
divider = gcd(divider, v.weight)
}
}
gcdResult <- divider
close(gcdResult)
}()
return gcdResult
}
func gcd(a, b int) int {
for b != 0 {
a, b = b, a%b
}
return a
}