Skip to content

Commit a247996

Browse files
authored
Add CloudWeGo Kitex adapter (#473)
1 parent 31fdb14 commit a247996

File tree

8 files changed

+863
-0
lines changed

8 files changed

+863
-0
lines changed

pkg/adapters/kitex/client.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package kitex
2+
3+
import (
4+
"context"
5+
6+
sentinel "github.com/alibaba/sentinel-golang/api"
7+
"github.com/alibaba/sentinel-golang/core/base"
8+
"github.com/cloudwego/kitex/pkg/endpoint"
9+
)
10+
11+
// SentinelClientMiddleware returns new client.Middleware
12+
// Default resource name is {service's name}:{method}
13+
// Default block fallback is returning blockError
14+
// Define your own behavior by setting serverOptions
15+
func SentinelClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.Endpoint {
16+
options := newOptions(opts)
17+
return func(next endpoint.Endpoint) endpoint.Endpoint {
18+
return func(ctx context.Context, req, resp interface{}) error {
19+
resourceName := options.ResourceExtract(ctx, req, resp)
20+
entry, blockErr := sentinel.Entry(
21+
resourceName,
22+
sentinel.WithResourceType(base.ResTypeRPC),
23+
sentinel.WithTrafficType(base.Outbound),
24+
)
25+
if blockErr != nil {
26+
return options.BlockFallback(ctx, req, resp, blockErr)
27+
}
28+
defer entry.Exit()
29+
err := next(ctx, req, resp)
30+
if err != nil {
31+
sentinel.TraceError(entry, err)
32+
}
33+
return err
34+
}
35+
}
36+
}

pkg/adapters/kitex/client_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package kitex
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
sentinel "github.com/alibaba/sentinel-golang/api"
9+
"github.com/alibaba/sentinel-golang/core/flow"
10+
"github.com/cloudwego/kitex-examples/hello/kitex_gen/api"
11+
"github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello"
12+
"github.com/cloudwego/kitex/client"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
const FakeErrorMsg = "fake error for testing"
17+
18+
func TestSentinelClientMiddleware(t *testing.T) {
19+
bf := func(ctx context.Context, req, resp interface{}, blockErr error) error {
20+
return errors.New(FakeErrorMsg)
21+
}
22+
c, err := hello.NewClient("hello",
23+
client.WithMiddleware(SentinelClientMiddleware(
24+
WithBlockFallback(bf))))
25+
if err != nil {
26+
t.Fatal(err)
27+
}
28+
err = sentinel.InitDefault()
29+
if err != nil {
30+
t.Fatal(err)
31+
}
32+
req := &api.Request{}
33+
t.Run("success", func(t *testing.T) {
34+
_, err := flow.LoadRules([]*flow.Rule{
35+
{
36+
Resource: "hello:echo",
37+
Threshold: 1.0,
38+
TokenCalculateStrategy: flow.Direct,
39+
ControlBehavior: flow.Reject,
40+
},
41+
})
42+
assert.Nil(t, err)
43+
_, err = c.Echo(context.Background(), req)
44+
assert.NotNil(t, err)
45+
assert.NotEqual(t, FakeErrorMsg, err.Error())
46+
t.Run("second fail", func(t *testing.T) {
47+
_, err = c.Echo(context.Background(), req)
48+
assert.NotNil(t, err)
49+
assert.Equal(t, FakeErrorMsg, err.Error())
50+
})
51+
})
52+
}

pkg/adapters/kitex/doc.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/**
2+
This package provides Sentinel integration for Kitex.
3+
4+
For server side, users may append a Sentinel middleware to Kitex service, like:
5+
import (
6+
sentinelPlugin "github.com/alibaba/sentinel-golang/pkg/adapters/kitex"
7+
)
8+
srv := hello.NewServer(new(HelloImpl),server.WithMiddleware(SentinelServerMiddleware()))
9+
The plugin extracts service name and service method as the resource name by default.
10+
Users may provide customized resource name extractor when creating new
11+
Sentinel middleware (via WithResourceExtract options).
12+
13+
Fallback logic: the plugin will return the BlockError by default
14+
if current request is blocked by Sentinel rules. Users may also
15+
provide customized fallback logic via WithBlockFallback(handler) options.
16+
*/
17+
package kitex

pkg/adapters/kitex/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module github.com/alibaba/sentinel-golang/pkg/adapters/kitex
2+
3+
go 1.16
4+
5+
require (
6+
github.com/alibaba/sentinel-golang v1.0.4
7+
github.com/cloudwego/kitex v0.3.4
8+
github.com/cloudwego/kitex-examples v0.1.0
9+
github.com/stretchr/testify v1.7.0
10+
)

pkg/adapters/kitex/go.sum

Lines changed: 591 additions & 0 deletions
Large diffs are not rendered by default.

pkg/adapters/kitex/options.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package kitex
2+
3+
import (
4+
"context"
5+
6+
"github.com/cloudwego/kitex/pkg/rpcinfo"
7+
)
8+
9+
type Option struct {
10+
F func(o *options)
11+
}
12+
13+
type options struct {
14+
ResourceExtract func(ctx context.Context, req, resp interface{}) string
15+
BlockFallback func(ctx context.Context, req, resp interface{}, blockErr error) error
16+
}
17+
18+
func DefaultBlockFallback(ctx context.Context, req, resp interface{}, blockErr error) error {
19+
return blockErr
20+
}
21+
22+
func DefaultResourceExtract(ctx context.Context, req, resp interface{}) string {
23+
ri := rpcinfo.GetRPCInfo(ctx)
24+
return ri.To().ServiceName() + ":" + ri.To().Method()
25+
}
26+
27+
func newOptions(opts []Option) *options {
28+
o := &options{
29+
ResourceExtract: DefaultResourceExtract,
30+
BlockFallback: DefaultBlockFallback,
31+
}
32+
o.Apply(opts)
33+
return o
34+
}
35+
36+
func (o *options) Apply(opts []Option) {
37+
for _, op := range opts {
38+
op.F(o)
39+
}
40+
}
41+
42+
// WithResourceExtract sets the resource extractor
43+
func WithResourceExtract(f func(ctx context.Context, req, resp interface{}) string) Option {
44+
return Option{F: func(o *options) {
45+
o.ResourceExtract = f
46+
}}
47+
}
48+
49+
// WithBlockFallback sets the fallback handler
50+
func WithBlockFallback(f func(ctx context.Context, req, resp interface{}, blockErr error) error) Option {
51+
return Option{func(o *options) {
52+
o.BlockFallback = f
53+
}}
54+
}

pkg/adapters/kitex/server.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package kitex
2+
3+
import (
4+
"context"
5+
6+
sentinel "github.com/alibaba/sentinel-golang/api"
7+
"github.com/alibaba/sentinel-golang/core/base"
8+
"github.com/cloudwego/kitex/pkg/endpoint"
9+
)
10+
11+
// SentinelServerMiddleware returns new server.Middleware
12+
// Default resource name is {service's name}:{method}
13+
// Default block fallback is returning blockError
14+
// Define your own behavior by setting serverOptions
15+
func SentinelServerMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.Endpoint {
16+
options := newOptions(opts)
17+
return func(next endpoint.Endpoint) endpoint.Endpoint {
18+
return func(ctx context.Context, req, resp interface{}) error {
19+
resourceName := options.ResourceExtract(ctx, req, resp)
20+
entry, blockErr := sentinel.Entry(
21+
resourceName,
22+
sentinel.WithResourceType(base.ResTypeRPC),
23+
sentinel.WithTrafficType(base.Inbound),
24+
)
25+
if blockErr != nil {
26+
return options.BlockFallback(ctx, req, resp, blockErr)
27+
}
28+
defer entry.Exit()
29+
err := next(ctx, req, resp)
30+
if err != nil {
31+
sentinel.TraceError(entry, err)
32+
}
33+
return err
34+
}
35+
}
36+
}

pkg/adapters/kitex/server_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package kitex
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
sentinel "github.com/alibaba/sentinel-golang/api"
11+
"github.com/alibaba/sentinel-golang/core/flow"
12+
"github.com/cloudwego/kitex-examples/hello/kitex_gen/api"
13+
"github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello"
14+
"github.com/cloudwego/kitex/client"
15+
"github.com/cloudwego/kitex/pkg/rpcinfo"
16+
"github.com/cloudwego/kitex/server"
17+
"github.com/stretchr/testify/assert"
18+
)
19+
20+
// HelloImpl implements the last service interface defined in the IDL.
21+
type HelloImpl struct{}
22+
23+
// Echo implements the HelloImpl interface.
24+
func (s *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
25+
resp = &api.Response{Message: req.Message}
26+
return
27+
}
28+
29+
func TestSentinelServerMiddleware(t *testing.T) {
30+
bf := func(ctx context.Context, req, resp interface{}, blockErr error) error {
31+
return errors.New(FakeErrorMsg)
32+
}
33+
srv := hello.NewServer(new(HelloImpl),
34+
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "hello"}),
35+
server.WithMiddleware(SentinelServerMiddleware(
36+
WithBlockFallback(bf),
37+
)))
38+
go srv.Run()
39+
defer srv.Stop()
40+
time.Sleep(1 * time.Second)
41+
42+
c, err := hello.NewClient("hello", client.WithHostPorts(":8888"))
43+
assert.Nil(t, err)
44+
45+
err = sentinel.InitDefault()
46+
assert.Nil(t, err)
47+
req := &api.Request{}
48+
t.Run("success", func(t *testing.T) {
49+
_, err = flow.LoadRules([]*flow.Rule{
50+
{
51+
Resource: "hello:echo",
52+
Threshold: 1.0,
53+
TokenCalculateStrategy: flow.Direct,
54+
ControlBehavior: flow.Reject,
55+
},
56+
})
57+
assert.Nil(t, err)
58+
_, err := c.Echo(context.TODO(), req)
59+
assert.Nil(t, err)
60+
61+
t.Run("second fail", func(t *testing.T) {
62+
_, err := c.Echo(context.TODO(), req)
63+
assert.Error(t, err)
64+
assert.True(t, strings.Contains(err.Error(), FakeErrorMsg))
65+
})
66+
})
67+
}

0 commit comments

Comments
 (0)