Skip to content

Commit fe91a34

Browse files
周曙光周曙光
authored andcommitted
feat(zrpc): instance breaker support
feat(zrpc): instance breaker support feat(zrpc): instance breaker support
1 parent 8e7e569 commit fe91a34

File tree

10 files changed

+495
-61
lines changed

10 files changed

+495
-61
lines changed

zrpc/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ import (
99
"github.com/zeromicro/go-zero/core/logx"
1010
"github.com/zeromicro/go-zero/zrpc/internal"
1111
"github.com/zeromicro/go-zero/zrpc/internal/auth"
12+
"github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker"
1213
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
1314
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
1415
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
1516
"google.golang.org/grpc"
17+
"google.golang.org/grpc/balancer"
18+
"google.golang.org/grpc/balancer/base"
1619
"google.golang.org/grpc/keepalive"
1720
)
1821

@@ -133,6 +136,13 @@ func WithCallTimeout(timeout time.Duration) grpc.CallOption {
133136
return clientinterceptors.WithCallTimeout(timeout)
134137
}
135138

139+
// WrapPicker wraps the given picker with circuit breaker.
140+
// Use this for custom load balancers to enable instance-level circuit breaker.
141+
// retryable indicates whether to retry on another node when circuit breaker is open.
142+
func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker {
143+
return breaker.WrapPicker(info, picker, retryable)
144+
}
145+
136146
func makeLBServiceConfig(balancerName string) string {
137147
if len(balancerName) == 0 {
138148
balancerName = p2c.Name

zrpc/client_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
1616
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
1717
"google.golang.org/grpc"
18+
"google.golang.org/grpc/balancer"
19+
"google.golang.org/grpc/balancer/base"
1820
"google.golang.org/grpc/codes"
1921
"google.golang.org/grpc/credentials/insecure"
2022
"google.golang.org/grpc/status"
@@ -286,3 +288,13 @@ func TestSetHashKey(t *testing.T) {
286288

287289
assert.Empty(t, consistenthash.GetHashKey(context.Background()))
288290
}
291+
292+
func TestWrapPicker(t *testing.T) {
293+
info := base.PickerBuildInfo{
294+
ReadySCs: map[balancer.SubConn]base.SubConnInfo{},
295+
}
296+
inner := base.NewErrPicker(balancer.ErrNoSubConnAvailable)
297+
298+
picker := WrapPicker(info, inner, true)
299+
assert.NotNil(t, picker)
300+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package breaker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"path"
7+
8+
"github.com/zeromicro/go-zero/core/breaker"
9+
"github.com/zeromicro/go-zero/zrpc/internal/codes"
10+
"google.golang.org/grpc/balancer"
11+
"google.golang.org/grpc/balancer/base"
12+
)
13+
14+
const retries = 3
15+
16+
var emptyPickResult balancer.PickResult
17+
18+
type (
19+
breakerKey struct{}
20+
21+
// Unwrapper is the interface for unwrapping the inner picker.
22+
Unwrapper interface {
23+
Unwrap() balancer.Picker
24+
}
25+
)
26+
27+
type breakerPicker struct {
28+
picker balancer.Picker
29+
addrMap map[balancer.SubConn]string
30+
retryable bool
31+
}
32+
33+
// Unwrap returns the inner picker.
34+
func (p *breakerPicker) Unwrap() balancer.Picker {
35+
return p.picker
36+
}
37+
38+
// WrapPicker wraps the given picker with circuit breaker.
39+
// retryable indicates whether to retry on another node when circuit breaker is open.
40+
func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker {
41+
addrMap := make(map[balancer.SubConn]string, len(info.ReadySCs))
42+
for conn, connInfo := range info.ReadySCs {
43+
addrMap[conn] = connInfo.Address.Addr
44+
}
45+
46+
return &breakerPicker{
47+
picker: picker,
48+
addrMap: addrMap,
49+
retryable: retryable,
50+
}
51+
}
52+
53+
func (p *breakerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
54+
if !HasBreaker(info.Ctx) {
55+
return p.picker.Pick(info)
56+
}
57+
58+
if !p.retryable {
59+
return p.pick(info)
60+
}
61+
62+
var (
63+
err error
64+
result balancer.PickResult
65+
)
66+
67+
for i := 0; i <= retries; i++ {
68+
result, err = p.pick(info)
69+
if err == nil || !errors.Is(err, breaker.ErrServiceUnavailable) {
70+
break
71+
}
72+
}
73+
74+
return result, err
75+
}
76+
77+
func (p *breakerPicker) pick(info balancer.PickInfo) (balancer.PickResult, error) {
78+
result, err := p.picker.Pick(info)
79+
if err != nil {
80+
return result, err
81+
}
82+
83+
addr := p.addrMap[result.SubConn]
84+
breakerName := path.Join(addr, info.FullMethodName)
85+
promise, err := breaker.GetBreaker(breakerName).AllowCtx(info.Ctx)
86+
if err != nil {
87+
if result.Done != nil {
88+
result.Done(balancer.DoneInfo{Err: err})
89+
}
90+
return emptyPickResult, err
91+
}
92+
93+
return balancer.PickResult{
94+
SubConn: result.SubConn,
95+
Done: p.buildDoneFunc(result.Done, promise),
96+
}, nil
97+
}
98+
99+
func (p *breakerPicker) buildDoneFunc(done func(balancer.DoneInfo), promise breaker.Promise) func(balancer.DoneInfo) {
100+
return func(info balancer.DoneInfo) {
101+
if done != nil {
102+
done(info)
103+
}
104+
105+
if info.Err != nil && !codes.Acceptable(info.Err) {
106+
promise.Reject(info.Err.Error())
107+
} else {
108+
promise.Accept()
109+
}
110+
}
111+
}
112+
113+
// HasBreaker checks if the circuit breaker is enabled in context.
114+
func HasBreaker(ctx context.Context) bool {
115+
v, ok := ctx.Value(breakerKey{}).(bool)
116+
return ok && v
117+
}
118+
119+
// WithBreaker marks the circuit breaker as enabled in context.
120+
func WithBreaker(ctx context.Context) context.Context {
121+
return context.WithValue(ctx, breakerKey{}, true)
122+
}

0 commit comments

Comments
 (0)