Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions zrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc/internal"
"github.com/zeromicro/go-zero/zrpc/internal/auth"
"github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker"
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -133,6 +136,13 @@ func WithCallTimeout(timeout time.Duration) grpc.CallOption {
return clientinterceptors.WithCallTimeout(timeout)
}

// WrapPicker wraps the given picker with circuit breaker.
// Use this for custom load balancers to enable instance-level circuit breaker.
// retryable indicates whether to retry on another node when circuit breaker is open.
func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker {
return breaker.WrapPicker(info, picker, retryable)
}

func makeLBServiceConfig(balancerName string) string {
if len(balancerName) == 0 {
balancerName = p2c.Name
Expand Down
12 changes: 12 additions & 0 deletions zrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash"
"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -286,3 +288,13 @@ func TestSetHashKey(t *testing.T) {

assert.Empty(t, consistenthash.GetHashKey(context.Background()))
}

func TestWrapPicker(t *testing.T) {
info := base.PickerBuildInfo{
ReadySCs: map[balancer.SubConn]base.SubConnInfo{},
}
inner := base.NewErrPicker(balancer.ErrNoSubConnAvailable)

picker := WrapPicker(info, inner, true)
assert.NotNil(t, picker)
}
122 changes: 122 additions & 0 deletions zrpc/internal/balancer/breaker/breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package breaker

import (
"context"
"errors"
"path"

"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/zrpc/internal/codes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
)

const retries = 3

var emptyPickResult balancer.PickResult

type (
breakerKey struct{}

// Unwrapper is the interface for unwrapping the inner picker.
Unwrapper interface {
Unwrap() balancer.Picker
}
)

type breakerPicker struct {
picker balancer.Picker
addrMap map[balancer.SubConn]string
retryable bool
}

// Unwrap returns the inner picker.
func (p *breakerPicker) Unwrap() balancer.Picker {
return p.picker
}

// WrapPicker wraps the given picker with circuit breaker.
// retryable indicates whether to retry on another node when circuit breaker is open.
func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker {
addrMap := make(map[balancer.SubConn]string, len(info.ReadySCs))
for conn, connInfo := range info.ReadySCs {
addrMap[conn] = connInfo.Address.Addr
}

return &breakerPicker{
picker: picker,
addrMap: addrMap,
retryable: retryable,
}
}

func (p *breakerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if !HasBreaker(info.Ctx) {
return p.picker.Pick(info)
}

if !p.retryable {
return p.pick(info)
}

var (
err error
result balancer.PickResult
)

for i := 0; i <= retries; i++ {
result, err = p.pick(info)
if err == nil || !errors.Is(err, breaker.ErrServiceUnavailable) {
break
}
}

return result, err
}

func (p *breakerPicker) pick(info balancer.PickInfo) (balancer.PickResult, error) {
result, err := p.picker.Pick(info)
if err != nil {
return result, err
}

addr := p.addrMap[result.SubConn]
breakerName := path.Join(addr, info.FullMethodName)
promise, err := breaker.GetBreaker(breakerName).AllowCtx(info.Ctx)
if err != nil {
if result.Done != nil {
result.Done(balancer.DoneInfo{Err: err})
}
return emptyPickResult, err
}

return balancer.PickResult{
SubConn: result.SubConn,
Done: p.buildDoneFunc(result.Done, promise),
}, nil
}

func (p *breakerPicker) buildDoneFunc(done func(balancer.DoneInfo), promise breaker.Promise) func(balancer.DoneInfo) {
return func(info balancer.DoneInfo) {
if done != nil {
done(info)
}

if info.Err != nil && !codes.Acceptable(info.Err) {
promise.Reject(info.Err.Error())
} else {
promise.Accept()
}
}
}

// HasBreaker checks if the circuit breaker is enabled in context.
func HasBreaker(ctx context.Context) bool {
v, ok := ctx.Value(breakerKey{}).(bool)
return ok && v
}

// WithBreaker marks the circuit breaker as enabled in context.
func WithBreaker(ctx context.Context) context.Context {
return context.WithValue(ctx, breakerKey{}, true)
}
Loading
Loading