diff --git a/zrpc/client.go b/zrpc/client.go index 767ee28dc619..9f779de5bb3e 100644 --- a/zrpc/client.go +++ b/zrpc/client.go @@ -9,10 +9,13 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc/internal" "github.com/zeromicro/go-zero/zrpc/internal/auth" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash" "github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c" "github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/keepalive" ) @@ -133,6 +136,13 @@ func WithCallTimeout(timeout time.Duration) grpc.CallOption { return clientinterceptors.WithCallTimeout(timeout) } +// WrapPicker wraps the given picker with circuit breaker. +// Use this for custom load balancers to enable instance-level circuit breaker. +// retryable indicates whether to retry on another node when circuit breaker is open. +func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker { + return breaker.WrapPicker(info, picker, retryable) +} + func makeLBServiceConfig(balancerName string) string { if len(balancerName) == 0 { balancerName = p2c.Name diff --git a/zrpc/client_test.go b/zrpc/client_test.go index 3526cad1eefa..2dcb836a20aa 100644 --- a/zrpc/client_test.go +++ b/zrpc/client_test.go @@ -15,6 +15,8 @@ import ( "github.com/zeromicro/go-zero/zrpc/internal/balancer/consistenthash" "github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" @@ -286,3 +288,13 @@ func TestSetHashKey(t *testing.T) { assert.Empty(t, consistenthash.GetHashKey(context.Background())) } + +func TestWrapPicker(t *testing.T) { + info := base.PickerBuildInfo{ + ReadySCs: map[balancer.SubConn]base.SubConnInfo{}, + } + inner := base.NewErrPicker(balancer.ErrNoSubConnAvailable) + + picker := WrapPicker(info, inner, true) + assert.NotNil(t, picker) +} diff --git a/zrpc/internal/balancer/breaker/breaker.go b/zrpc/internal/balancer/breaker/breaker.go new file mode 100644 index 000000000000..0354186655d6 --- /dev/null +++ b/zrpc/internal/balancer/breaker/breaker.go @@ -0,0 +1,122 @@ +package breaker + +import ( + "context" + "errors" + "path" + + "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/zrpc/internal/codes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" +) + +const retries = 3 + +var emptyPickResult balancer.PickResult + +type ( + breakerKey struct{} + + // Unwrapper is the interface for unwrapping the inner picker. + Unwrapper interface { + Unwrap() balancer.Picker + } +) + +type breakerPicker struct { + picker balancer.Picker + addrMap map[balancer.SubConn]string + retryable bool +} + +// Unwrap returns the inner picker. +func (p *breakerPicker) Unwrap() balancer.Picker { + return p.picker +} + +// WrapPicker wraps the given picker with circuit breaker. +// retryable indicates whether to retry on another node when circuit breaker is open. +func WrapPicker(info base.PickerBuildInfo, picker balancer.Picker, retryable bool) balancer.Picker { + addrMap := make(map[balancer.SubConn]string, len(info.ReadySCs)) + for conn, connInfo := range info.ReadySCs { + addrMap[conn] = connInfo.Address.Addr + } + + return &breakerPicker{ + picker: picker, + addrMap: addrMap, + retryable: retryable, + } +} + +func (p *breakerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + if !HasBreaker(info.Ctx) { + return p.picker.Pick(info) + } + + if !p.retryable { + return p.pick(info) + } + + var ( + err error + result balancer.PickResult + ) + + for i := 0; i <= retries; i++ { + result, err = p.pick(info) + if err == nil || !errors.Is(err, breaker.ErrServiceUnavailable) { + break + } + } + + return result, err +} + +func (p *breakerPicker) pick(info balancer.PickInfo) (balancer.PickResult, error) { + result, err := p.picker.Pick(info) + if err != nil { + return result, err + } + + addr := p.addrMap[result.SubConn] + breakerName := path.Join(addr, info.FullMethodName) + promise, err := breaker.GetBreaker(breakerName).AllowCtx(info.Ctx) + if err != nil { + if result.Done != nil { + result.Done(balancer.DoneInfo{Err: err}) + } + return emptyPickResult, err + } + + return balancer.PickResult{ + SubConn: result.SubConn, + Done: p.buildDoneFunc(result.Done, promise), + }, nil +} + +func (p *breakerPicker) buildDoneFunc(done func(balancer.DoneInfo), promise breaker.Promise) func(balancer.DoneInfo) { + return func(info balancer.DoneInfo) { + if done != nil { + done(info) + } + + if info.Err != nil && !codes.Acceptable(info.Err) { + promise.Reject(info.Err.Error()) + } else { + promise.Accept() + } + } +} + +// HasBreaker checks if the circuit breaker is enabled in context. +func HasBreaker(ctx context.Context) bool { + v, ok := ctx.Value(breakerKey{}).(bool) + return ok && v +} + +// WithBreaker marks the circuit breaker as enabled in context. +func WithBreaker(ctx context.Context) context.Context { + return context.WithValue(ctx, breakerKey{}, true) +} diff --git a/zrpc/internal/balancer/breaker/breaker_test.go b/zrpc/internal/balancer/breaker/breaker_test.go new file mode 100644 index 000000000000..21ea10855616 --- /dev/null +++ b/zrpc/internal/balancer/breaker/breaker_test.go @@ -0,0 +1,332 @@ +package breaker + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/core/stat" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" +) + +func init() { + stat.SetReporter(nil) +} + +type mockSubConn struct { + addr string +} + +func (m *mockSubConn) UpdateAddresses([]resolver.Address) {} +func (m *mockSubConn) Connect() {} +func (m *mockSubConn) Shutdown() {} +func (m *mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { + return nil, nil +} + +type mockPicker struct { + err error + index int + results []balancer.PickResult +} + +func (m *mockPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + if m.err != nil { + return balancer.PickResult{}, m.err + } + if len(m.results) == 0 { + return balancer.PickResult{}, nil + } + result := m.results[m.index%len(m.results)] + m.index++ + return result, nil +} + +func TestWithBreaker(t *testing.T) { + ctx := context.Background() + assert.False(t, HasBreaker(ctx)) + + ctx = WithBreaker(ctx) + assert.True(t, HasBreaker(ctx)) +} + +func TestWrapPicker(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8080"} + info := base.PickerBuildInfo{ + ReadySCs: map[balancer.SubConn]base.SubConnInfo{ + sc: {Address: resolver.Address{Addr: "127.0.0.1:8080"}}, + }, + } + inner := &mockPicker{} + + picker := WrapPicker(info, inner, true) + assert.NotNil(t, picker) + + bp := picker.(*breakerPicker) + assert.Equal(t, inner, bp.picker) + assert.True(t, bp.retryable) + assert.Equal(t, "127.0.0.1:8080", bp.addrMap[sc]) +} + +func TestUnwrap(t *testing.T) { + inner := &mockPicker{} + picker := &breakerPicker{picker: inner} + + assert.Equal(t, inner, picker.Unwrap()) +} + +func TestPickWithoutBreaker(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8080"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8080"}, + } + + result, err := picker.Pick(balancer.PickInfo{Ctx: context.Background()}) + assert.NoError(t, err) + assert.Equal(t, sc, result.SubConn) +} + +func TestPickNotRetryable(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8080"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8080"}, + retryable: false, + } + + ctx := WithBreaker(context.Background()) + result, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/test", + }) + assert.NoError(t, err) + assert.NotNil(t, result.Done) +} + +func TestPickRetryableSuccess(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8080"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8080"}, + retryable: true, + } + + ctx := WithBreaker(context.Background()) + result, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/test", + }) + assert.NoError(t, err) + assert.NotNil(t, result.Done) +} + +func TestPickRetryablePickerError(t *testing.T) { + expectedErr := status.Error(codes.Unavailable, "unavailable") + inner := &mockPicker{err: expectedErr} + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{}, + retryable: true, + } + + ctx := WithBreaker(context.Background()) + _, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/test", + }) + assert.Equal(t, expectedErr, err) +} + +func TestPickNotRetryablePickerError(t *testing.T) { + expectedErr := status.Error(codes.Unavailable, "unavailable") + inner := &mockPicker{err: expectedErr} + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{}, + retryable: false, + } + + ctx := WithBreaker(context.Background()) + _, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/test", + }) + assert.Equal(t, expectedErr, err) +} + +func TestPickBreakerOpen(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8081"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8081"}, + retryable: false, + } + + // trigger breaker + for i := 0; i < 1000; i++ { + _ = breaker.DoWithAcceptable("127.0.0.1:8081/test", func() error { + return status.Error(codes.DeadlineExceeded, "timeout") + }, func(err error) bool { + return false + }) + } + + ctx := WithBreaker(context.Background()) + _, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/test", + }) + assert.ErrorIs(t, err, breaker.ErrServiceUnavailable) +} + +func TestPickRetryOnBreaker(t *testing.T) { + sc1 := &mockSubConn{addr: "127.0.0.1:8082"} + sc2 := &mockSubConn{addr: "127.0.0.1:8083"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc1}, {SubConn: sc2}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc1: "127.0.0.1:8082", sc2: "127.0.0.1:8083"}, + retryable: true, + } + + // trigger breaker for sc1 + for i := 0; i < 1000; i++ { + _ = breaker.DoWithAcceptable("127.0.0.1:8082/retry", func() error { + return status.Error(codes.DeadlineExceeded, "timeout") + }, func(err error) bool { + return false + }) + } + + ctx := WithBreaker(context.Background()) + result, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/retry", + }) + assert.NoError(t, err) + assert.Equal(t, sc2, result.SubConn) +} + +func TestPickAllBreakerOpen(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8084"} + inner := &mockPicker{ + results: []balancer.PickResult{{SubConn: sc}}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8084"}, + retryable: true, + } + + // trigger breaker + for i := 0; i < 1000; i++ { + _ = breaker.DoWithAcceptable("127.0.0.1:8084/all", func() error { + return status.Error(codes.DeadlineExceeded, "timeout") + }, func(err error) bool { + return false + }) + } + + ctx := WithBreaker(context.Background()) + _, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/all", + }) + assert.ErrorIs(t, err, breaker.ErrServiceUnavailable) +} + +func TestBuildDoneFuncWithNilDone(t *testing.T) { + picker := &breakerPicker{} + brk := breaker.NewBreaker() + promise, _ := brk.Allow() + + done := picker.buildDoneFunc(nil, promise) + assert.NotNil(t, done) + done(balancer.DoneInfo{}) +} + +func TestBuildDoneFuncWithDone(t *testing.T) { + picker := &breakerPicker{} + brk := breaker.NewBreaker() + promise, _ := brk.Allow() + + called := false + done := picker.buildDoneFunc(func(info balancer.DoneInfo) { + called = true + }, promise) + done(balancer.DoneInfo{}) + assert.True(t, called) +} + +func TestBuildDoneFuncAcceptable(t *testing.T) { + picker := &breakerPicker{} + brk := breaker.NewBreaker() + promise, _ := brk.Allow() + + done := picker.buildDoneFunc(nil, promise) + done(balancer.DoneInfo{Err: status.Error(codes.NotFound, "not found")}) +} + +func TestBuildDoneFuncNotAcceptable(t *testing.T) { + picker := &breakerPicker{} + brk := breaker.NewBreaker() + promise, _ := brk.Allow() + + done := picker.buildDoneFunc(nil, promise) + done(balancer.DoneInfo{Err: status.Error(codes.DeadlineExceeded, "timeout")}) +} + +func TestPickBreakerOpenWithDone(t *testing.T) { + sc := &mockSubConn{addr: "127.0.0.1:8085"} + doneCalled := false + inner := &mockPicker{ + results: []balancer.PickResult{{ + SubConn: sc, + Done: func(info balancer.DoneInfo) { + doneCalled = true + }, + }}, + } + picker := &breakerPicker{ + picker: inner, + addrMap: map[balancer.SubConn]string{sc: "127.0.0.1:8085"}, + retryable: false, + } + + // trigger breaker + for i := 0; i < 1000; i++ { + _ = breaker.DoWithAcceptable("127.0.0.1:8085/done", func() error { + return status.Error(codes.DeadlineExceeded, "timeout") + }, func(err error) bool { + return false + }) + } + + ctx := WithBreaker(context.Background()) + _, err := picker.Pick(balancer.PickInfo{ + Ctx: ctx, + FullMethodName: "/done", + }) + assert.ErrorIs(t, err, breaker.ErrServiceUnavailable) + assert.True(t, doneCalled) +} diff --git a/zrpc/internal/balancer/consistenthash/consistenthash.go b/zrpc/internal/balancer/consistenthash/consistenthash.go index 6543cd893dac..cf91dc3f4296 100644 --- a/zrpc/internal/balancer/consistenthash/consistenthash.go +++ b/zrpc/internal/balancer/consistenthash/consistenthash.go @@ -4,6 +4,7 @@ import ( "context" "github.com/zeromicro/go-zero/core/hash" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" @@ -48,10 +49,10 @@ func (b *pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { hashRing.Add(addr) } - return &picker{ + return breaker.WrapPicker(info, &picker{ hashRing: hashRing, conns: conns, - } + }, false) } func newBuilder() balancer.Builder { diff --git a/zrpc/internal/balancer/consistenthash/consistenthash_test.go b/zrpc/internal/balancer/consistenthash/consistenthash_test.go index 2f3524d462ae..47209422bccd 100644 --- a/zrpc/internal/balancer/consistenthash/consistenthash_test.go +++ b/zrpc/internal/balancer/consistenthash/consistenthash_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/hash" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/resolver" @@ -42,7 +43,7 @@ func TestPickerBuilder_BuildAndRing(t *testing.T) { }, } - p := b.Build(info).(*picker) + p := b.Build(info).(breaker.Unwrapper).Unwrap().(*picker) assert.NotNil(t, p.hashRing) assert.Len(t, p.conns, 2) } @@ -58,7 +59,7 @@ func TestPicker_HashConsistency(t *testing.T) { subConn2: {Address: resolver.Address{Addr: "127.0.0.1:8081"}}, }, } - p := pb.Build(info).(*picker) + p := pb.Build(info).(breaker.Unwrapper).Unwrap().(*picker) ctx := SetHashKey(context.Background(), "user_123") res1, err := p.Pick(balancer.PickInfo{Ctx: ctx}) assert.NoError(t, err) @@ -81,7 +82,7 @@ func TestPicker_MissingKey(t *testing.T) { subConn: {Address: resolver.Address{Addr: "127.0.0.1:8080"}}, }, } - p := pb.Build(info).(*picker) + p := pb.Build(info).(breaker.Unwrapper).Unwrap().(*picker) // No hash key in context _, err := p.Pick(balancer.PickInfo{Ctx: context.Background()}) @@ -157,7 +158,7 @@ func BenchmarkPicker_HashConsistency(b *testing.B) { subConn2: {Address: resolver.Address{Addr: "127.0.0.1:8081"}}, }, } - p := pb.Build(info).(*picker) + p := pb.Build(info).(breaker.Unwrapper).Unwrap().(*picker) ctx := SetHashKey(context.Background(), "hot_user_123") diff --git a/zrpc/internal/balancer/p2c/p2c.go b/zrpc/internal/balancer/p2c/p2c.go index b57f0c44cbc6..31f22e84878c 100644 --- a/zrpc/internal/balancer/p2c/p2c.go +++ b/zrpc/internal/balancer/p2c/p2c.go @@ -12,6 +12,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/timex" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "github.com/zeromicro/go-zero/zrpc/internal/codes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" @@ -61,11 +62,11 @@ func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { }) } - return &p2cPicker{ + return breaker.WrapPicker(info, &p2cPicker{ conns: conns, r: rand.New(rand.NewSource(time.Now().UnixNano())), stamp: syncx.NewAtomicDuration(), - } + }, true) } func newBuilder() balancer.Builder { diff --git a/zrpc/internal/balancer/p2c/p2c_test.go b/zrpc/internal/balancer/p2c/p2c_test.go index 8e1ec4b22bf1..87157eb66d31 100644 --- a/zrpc/internal/balancer/p2c/p2c_test.go +++ b/zrpc/internal/balancer/p2c/p2c_test.go @@ -12,6 +12,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/stringx" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" @@ -111,7 +112,7 @@ func TestP2cPicker_Pick(t *testing.T) { wg.Wait() dist := make(map[any]int) - conns := picker.(*p2cPicker).conns + conns := picker.(breaker.Unwrapper).Unwrap().(*p2cPicker).conns for _, conn := range conns { dist[conn.addr.Addr] = int(conn.requests) } diff --git a/zrpc/internal/clientinterceptors/breakerinterceptor.go b/zrpc/internal/clientinterceptors/breakerinterceptor.go index 63c068a6cccc..c11094d643b3 100644 --- a/zrpc/internal/clientinterceptors/breakerinterceptor.go +++ b/zrpc/internal/clientinterceptors/breakerinterceptor.go @@ -2,18 +2,13 @@ package clientinterceptors import ( "context" - "path" - "github.com/zeromicro/go-zero/core/breaker" - "github.com/zeromicro/go-zero/zrpc/internal/codes" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "google.golang.org/grpc" ) -// BreakerInterceptor is an interceptor that acts as a circuit breaker. +// BreakerInterceptor is an interceptor that enables the circuit breaker. func BreakerInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - breakerName := path.Join(cc.Target(), method) - return breaker.DoWithAcceptableCtx(ctx, breakerName, func() error { - return invoker(ctx, method, req, reply, cc, opts...) - }, codes.Acceptable) + return invoker(breaker.WithBreaker(ctx), method, req, reply, cc, opts...) } diff --git a/zrpc/internal/clientinterceptors/breakerinterceptor_test.go b/zrpc/internal/clientinterceptors/breakerinterceptor_test.go index 3ec8ba1a5270..7ef5f8817efa 100644 --- a/zrpc/internal/clientinterceptors/breakerinterceptor_test.go +++ b/zrpc/internal/clientinterceptors/breakerinterceptor_test.go @@ -6,53 +6,10 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/zeromicro/go-zero/core/breaker" - "github.com/zeromicro/go-zero/core/stat" - rcodes "github.com/zeromicro/go-zero/zrpc/internal/codes" + "github.com/zeromicro/go-zero/zrpc/internal/balancer/breaker" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -func init() { - stat.SetReporter(nil) -} - -type mockError struct { - st *status.Status -} - -func (m mockError) GRPCStatus() *status.Status { - return m.st -} - -func (m mockError) Error() string { - return "mocked error" -} - -func TestBreakerInterceptorNotFound(t *testing.T) { - err := mockError{st: status.New(codes.NotFound, "any")} - for i := 0; i < 1000; i++ { - assert.Equal(t, err, breaker.DoWithAcceptable("call", func() error { - return err - }, rcodes.Acceptable)) - } -} - -func TestBreakerInterceptorDeadlineExceeded(t *testing.T) { - err := mockError{st: status.New(codes.DeadlineExceeded, "any")} - errs := make(map[error]int) - for i := 0; i < 1000; i++ { - e := breaker.DoWithAcceptable("call", func() error { - return err - }, rcodes.Acceptable) - errs[e]++ - } - assert.Equal(t, 2, len(errs)) - assert.True(t, errs[err] > 0) - assert.True(t, errs[breaker.ErrServiceUnavailable] > 0) -} - func TestBreakerInterceptor(t *testing.T) { tests := []struct { name string @@ -73,6 +30,8 @@ func TestBreakerInterceptor(t *testing.T) { err := BreakerInterceptor(context.Background(), "/foo", nil, nil, cc, func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + // verify breaker is enabled in context + assert.True(t, breaker.HasBreaker(ctx)) return test.err }) assert.Equal(t, test.err, err)