Skip to content

Commit 63693d6

Browse files
committed
feat: improve breaker by combining error rate and latency
Signed-off-by: kevin <wanjunfeng@gmail.com>
1 parent 8e7e569 commit 63693d6

File tree

4 files changed

+419
-42
lines changed

4 files changed

+419
-42
lines changed

core/breaker/breaker.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ type (
9292
}
9393

9494
circuitBreaker struct {
95-
name string
95+
name string
96+
timeout time.Duration
9697
throttle
9798
}
9899

@@ -117,7 +118,7 @@ func NewBreaker(opts ...Option) Breaker {
117118
if len(b.name) == 0 {
118119
b.name = stringx.Rand()
119120
}
120-
b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
121+
b.throttle = newLoggedThrottle(b.name, newGoogleBreaker(b.timeout))
121122

122123
return &b
123124
}
@@ -202,6 +203,12 @@ func WithName(name string) Option {
202203
}
203204
}
204205

206+
func WithTimeout(timeout time.Duration) Option {
207+
return func(b *circuitBreaker) {
208+
b.timeout = timeout
209+
}
210+
}
211+
205212
func defaultAcceptable(err error) bool {
206213
return err == nil
207214
}

core/breaker/breaker_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,21 @@ func BenchmarkGoogleBreaker(b *testing.B) {
388388
}
389389
}
390390

391+
func TestWithTimeout(t *testing.T) {
392+
timeout := 100 * time.Millisecond
393+
b := NewBreaker(WithTimeout(timeout))
394+
cb, ok := b.(*circuitBreaker)
395+
assert.True(t, ok)
396+
assert.Equal(t, timeout, cb.timeout)
397+
}
398+
399+
func TestNopPromise(t *testing.T) {
400+
var p nopPromise
401+
// These methods should not panic and do nothing
402+
p.Accept()
403+
p.Reject("test reason")
404+
}
405+
391406
type mockedPromise struct{}
392407

393408
func (m *mockedPromise) Accept() {

core/breaker/googlebreaker.go

Lines changed: 124 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package breaker
22

33
import (
4+
"math"
5+
"sync/atomic"
46
"time"
57

68
"github.com/zeromicro/go-zero/core/collection"
@@ -11,22 +13,31 @@ import (
1113

1214
const (
1315
// 250ms for bucket duration
14-
window = time.Second * 10
15-
buckets = 40
16-
forcePassDuration = time.Second
17-
k = 1.5
18-
minK = 1.1
19-
protection = 5
16+
window = time.Second * 10
17+
buckets = 40
18+
forcePassDuration = time.Second
19+
k = 1.5
20+
minK = 1.1
21+
protection = 5
22+
latencyActivationMultiplier = 3
23+
latencyCeilingRatio = 0.95
24+
latencyBaselineDecayBeta = 0.25
25+
latencyBaselineRiseBeta = 0.01
26+
latencyCurrentBeta = 0.25
27+
latencyMaxDropRatio = 0.3
2028
)
2129

2230
// googleBreaker is a netflixBreaker pattern from google.
2331
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
2432
type (
2533
googleBreaker struct {
26-
k float64
27-
stat *collection.RollingWindow[int64, *bucket]
28-
proba *mathx.Proba
29-
lastPass *syncx.AtomicDuration
34+
k float64
35+
stat *collection.RollingWindow[int64, *bucket]
36+
proba *mathx.Proba
37+
lastPass *syncx.AtomicDuration
38+
timeoutUs int64
39+
noLoadLatencyUs int64
40+
currentLatencyUs int64
3041
}
3142

3243
windowResult struct {
@@ -37,27 +48,25 @@ type (
3748
}
3849
)
3950

40-
func newGoogleBreaker() *googleBreaker {
51+
func newGoogleBreaker(timeout time.Duration) *googleBreaker {
4152
bucketDuration := time.Duration(int64(window) / int64(buckets))
42-
st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
53+
st := collection.NewRollingWindow(func() *bucket {
4354
return new(bucket)
4455
}, buckets, bucketDuration)
4556
return &googleBreaker{
46-
stat: st,
47-
k: k,
48-
proba: mathx.NewProba(),
49-
lastPass: syncx.NewAtomicDuration(),
57+
stat: st,
58+
k: k,
59+
proba: mathx.NewProba(),
60+
lastPass: syncx.NewAtomicDuration(),
61+
timeoutUs: timeout.Microseconds(),
5062
}
5163
}
5264

5365
func (b *googleBreaker) accept() error {
54-
var w float64
5566
history := b.history()
56-
w = b.k - (b.k-minK)*float64(history.failingBuckets)/buckets
57-
weightedAccepts := mathx.AtLeast(w, minK) * float64(history.accepts)
58-
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
59-
// for better performance, no need to care about the negative ratio
60-
dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1)
67+
errorRatio := b.calcK(history)
68+
latencyRatio := b.calcLatencyRatio()
69+
dropRatio := math.Max(errorRatio, latencyRatio)
6170
if dropRatio <= 0 {
6271
return nil
6372
}
@@ -86,10 +95,40 @@ func (b *googleBreaker) allow() (internalPromise, error) {
8695
}
8796

8897
return googlePromise{
89-
b: b,
98+
b: b,
99+
start: timex.Now(),
90100
}, nil
91101
}
92102

103+
func (b *googleBreaker) calcK(history windowResult) float64 {
104+
w := b.k - (b.k-minK)*float64(history.failingBuckets)/buckets
105+
weightedAccepts := mathx.AtLeast(w, minK) * float64(history.accepts)
106+
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
107+
// for better performance, no need to care about the negative ratio
108+
return (float64(history.total-protection) - weightedAccepts) / float64(history.total+1)
109+
}
110+
111+
func (b *googleBreaker) calcLatencyRatio() float64 {
112+
if b.timeoutUs <= 0 {
113+
return 0
114+
}
115+
116+
noLoadLatency := atomic.LoadInt64(&b.noLoadLatencyUs)
117+
currentLatencyUs := atomic.LoadInt64(&b.currentLatencyUs)
118+
if noLoadLatency <= 0 || currentLatencyUs <= 0 {
119+
return 0
120+
}
121+
122+
threshold := noLoadLatency * latencyActivationMultiplier
123+
ceiling := int64(float64(b.timeoutUs) * latencyCeilingRatio)
124+
if currentLatencyUs < threshold || ceiling <= threshold {
125+
return 0
126+
}
127+
128+
ratio := float64(currentLatencyUs-threshold) / float64(ceiling-threshold)
129+
return math.Min(1.0, math.Max(0.0, ratio)) * latencyMaxDropRatio
130+
}
131+
93132
func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error {
94133
if err := b.accept(); err != nil {
95134
b.markDrop()
@@ -101,10 +140,11 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac
101140
}
102141

103142
var succ bool
143+
start := timex.Now()
104144
defer func() {
105145
// if req() panic, success is false, mark as failure
106146
if succ {
107-
b.markSuccess()
147+
b.markSuccess(timex.Since(start).Microseconds())
108148
} else {
109149
b.markFailure()
110150
}
@@ -118,18 +158,6 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac
118158
return err
119159
}
120160

121-
func (b *googleBreaker) markDrop() {
122-
b.stat.Add(drop)
123-
}
124-
125-
func (b *googleBreaker) markFailure() {
126-
b.stat.Add(fail)
127-
}
128-
129-
func (b *googleBreaker) markSuccess() {
130-
b.stat.Add(success)
131-
}
132-
133161
func (b *googleBreaker) history() windowResult {
134162
var result windowResult
135163

@@ -151,12 +179,70 @@ func (b *googleBreaker) history() windowResult {
151179
return result
152180
}
153181

182+
func (b *googleBreaker) markDrop() {
183+
b.stat.Add(drop)
184+
}
185+
186+
func (b *googleBreaker) markFailure() {
187+
b.stat.Add(fail)
188+
}
189+
190+
func (b *googleBreaker) markSuccess(latencyUs int64) {
191+
b.stat.Add(success)
192+
if b.timeoutUs > 0 {
193+
b.updateLatency(latencyUs)
194+
}
195+
}
196+
197+
func (b *googleBreaker) updateBaselineLatency(latencyUs int64) {
198+
noLoadLatency := atomic.LoadInt64(&b.noLoadLatencyUs)
199+
if noLoadLatency <= 0 {
200+
atomic.StoreInt64(&b.noLoadLatencyUs, latencyUs)
201+
return
202+
}
203+
204+
var beta float64
205+
if latencyUs < noLoadLatency {
206+
// Fast decay when latency decreases
207+
beta = latencyBaselineDecayBeta
208+
} else {
209+
// Slow rise when latency increases
210+
beta = latencyBaselineRiseBeta
211+
}
212+
213+
newBaseline := int64(beta*float64(latencyUs) + (1-beta)*float64(noLoadLatency))
214+
atomic.StoreInt64(&b.noLoadLatencyUs, newBaseline)
215+
}
216+
217+
func (b *googleBreaker) updateCurrentLatency(latencyUs int64) {
218+
currentLatency := atomic.LoadInt64(&b.currentLatencyUs)
219+
if currentLatency <= 0 {
220+
atomic.StoreInt64(&b.currentLatencyUs, latencyUs)
221+
return
222+
}
223+
224+
// Fast EMA to update current latency
225+
newCurrent := int64(latencyCurrentBeta*float64(latencyUs) + (1-latencyCurrentBeta)*float64(currentLatency))
226+
atomic.StoreInt64(&b.currentLatencyUs, newCurrent)
227+
}
228+
229+
func (b *googleBreaker) updateLatency(latencyUs int64) {
230+
if latencyUs <= 0 || b.timeoutUs <= 0 {
231+
return
232+
}
233+
234+
b.updateBaselineLatency(latencyUs)
235+
b.updateCurrentLatency(latencyUs)
236+
}
237+
154238
type googlePromise struct {
155-
b *googleBreaker
239+
b *googleBreaker
240+
start time.Duration
156241
}
157242

158243
func (p googlePromise) Accept() {
159-
p.b.markSuccess()
244+
latencyUs := timex.Since(p.start).Microseconds()
245+
p.b.markSuccess(latencyUs)
160246
}
161247

162248
func (p googlePromise) Reject() {

0 commit comments

Comments
 (0)