Skip to content

Commit ba27ab9

Browse files
authored
refactor!: gateway (#314)
* refactor!: gateway * update gateway owner to JK * fix due to review
1 parent d0eb300 commit ba27ab9

File tree

31 files changed

+2204
-971
lines changed

31 files changed

+2204
-971
lines changed

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ issues:
425425
- cyclop
426426
- gochecknoinits
427427
- gochecknoglobals
428+
- forbidigo
428429
- text: "unexported-return: exported func (NewController|NewGateway) returns unexported type"
429430
linters:
430431
- revive

CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
/internal/controller/ @xdlbdy
1515
/internal/controller/eventbus/ @ifplusor
16-
/internal/gateway/ @ifplusor
16+
/internal/gateway/ @hwjiangkai
1717
/internal/kv/ @xdlbdy
1818
/internal/raft/ @ifplusor
1919
/internal/store/ @ifplusor

client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//go:generate mockgen -source=client.go -destination=mock_client.go -package=client
1516
package client
1617

1718
import (

client/pkg/api/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//go:generate mockgen -source=client.go -destination=mock_client.go -package=api
1516
package api
1617

1718
import (

client/pkg/api/option.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@ type WriteOptions struct {
2525
Oneway bool
2626
}
2727

28-
func (opts *WriteOptions) Copy() *WriteOptions {
28+
func (wo *WriteOptions) Apply(opts ...WriteOption) {
29+
for i := range opts {
30+
opts[i](wo)
31+
}
32+
}
33+
34+
func (wo *WriteOptions) Copy() *WriteOptions {
2935
return &WriteOptions{
30-
Oneway: opts.Oneway,
31-
Policy: opts.Policy,
36+
Oneway: wo.Oneway,
37+
Policy: wo.Policy,
3238
}
3339
}
3440

@@ -40,11 +46,17 @@ type ReadOptions struct {
4046
Policy ReadPolicy
4147
}
4248

43-
func (opts *ReadOptions) Copy() *ReadOptions {
49+
func (ro *ReadOptions) Apply(opts ...ReadOption) {
50+
for i := range opts {
51+
opts[i](ro)
52+
}
53+
}
54+
55+
func (ro *ReadOptions) Copy() *ReadOptions {
4456
return &ReadOptions{
45-
BatchSize: opts.BatchSize,
46-
PollingTimeout: opts.PollingTimeout,
47-
Policy: opts.Policy,
57+
BatchSize: ro.BatchSize,
58+
PollingTimeout: ro.PollingTimeout,
59+
Policy: ro.Policy,
4860
}
4961
}
5062

cmd/gateway/main.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/linkall-labs/vanus/internal/gateway"
2323
"github.com/linkall-labs/vanus/observability/log"
2424
"github.com/linkall-labs/vanus/observability/tracing"
25+
"github.com/linkall-labs/vanus/pkg/util/signal"
2526
)
2627

2728
var (
@@ -41,22 +42,20 @@ func main() {
4142

4243
tracing.Init("Vanus-Gateway")
4344

44-
go gateway.NewHTTPServer(*cfg).MustStartHTTP()
45-
45+
ctx := signal.SetupSignalContext()
4646
ga := gateway.NewGateway(*cfg)
47-
err = ga.StartCtrlProxy(context.Background())
48-
if err != nil {
49-
log.Error(context.Background(), "start controller proxy failed", map[string]interface{}{
50-
log.KeyError: err,
51-
})
52-
os.Exit(-1)
53-
}
5447

55-
err = ga.StartReceive(context.Background())
56-
if err != nil {
57-
log.Error(context.Background(), "start CloudEvents gateway failed", map[string]interface{}{
48+
if err = ga.Start(ctx); err != nil {
49+
log.Error(context.Background(), "start gateway failed", map[string]interface{}{
5850
log.KeyError: err,
5951
})
6052
os.Exit(-1)
6153
}
54+
log.Info(ctx, "Gateway has started", nil)
55+
select {
56+
case <-ctx.Done():
57+
log.Info(ctx, "received system signal, preparing exit", nil)
58+
}
59+
ga.Stop()
60+
log.Info(ctx, "the gateway has been shutdown gracefully", nil)
6261
}

deploy/yaml/gateway.yaml

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@ spec:
1111
- port: 8080
1212
targetPort: 8080
1313
nodePort: 30001
14-
name: put
14+
name: proxy
1515
- port: 8081
1616
targetPort: 8081
1717
nodePort: 30002
18-
name: get
19-
- name: ctrl-proxy
20-
nodePort: 30003
21-
port: 8082
22-
targetPort: 8082
18+
name: cloudevents
2319
---
2420
apiVersion: v1
2521
kind: ConfigMap
@@ -58,12 +54,10 @@ spec:
5854
image: public.ecr.aws/vanus/gateway:v0.4.0
5955
imagePullPolicy: IfNotPresent
6056
ports:
61-
- name: httpput
57+
- name: proxy
6258
containerPort: 8080
63-
- name: httpget
59+
- name: cloudevents
6460
containerPort: 8081
65-
- name: ctrl-proxy
66-
containerPort: 8082
6761
volumeMounts:
6862
- name: config-gateway
6963
mountPath: /vanus/config

internal/controller/snowflake/snowflake.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (sf *snowflake) RegisterNode(ctx context.Context, in *wrapperspb.UInt32Valu
112112
//
113113
// if exist {
114114
// return nil, errors.New("node has been register")
115-
//}
115+
// }
116116

117117
n := &node{
118118
ID: id,

internal/gateway/config.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,31 @@
1414

1515
package gateway
1616

17-
import "github.com/linkall-labs/vanus/internal/primitive"
17+
import (
18+
"github.com/linkall-labs/vanus/internal/gateway/proxy"
19+
"github.com/linkall-labs/vanus/internal/primitive"
20+
"google.golang.org/grpc/credentials/insecure"
21+
)
1822

1923
type Config struct {
20-
Port int `yaml:"port"`
21-
ControllerAddr []string `yaml:"controllers"`
22-
TracingURL string `yaml:"tracing_url"`
24+
Port int `yaml:"port"`
25+
ControllerAddr []string `yaml:"controllers"`
26+
TracingURL string `yaml:"tracing_url"`
27+
GRPCReflectionEnable bool `yaml:"grpc_reflection_enable"`
28+
}
29+
30+
func (c Config) GetProxyConfig() proxy.Config {
31+
return proxy.Config{
32+
Endpoints: c.ControllerAddr,
33+
ProxyPort: c.Port,
34+
CloudEventReceiverPort: c.GetCloudEventReceiverPort(),
35+
GRPCReflectionEnable: c.GRPCReflectionEnable,
36+
Credentials: insecure.NewCredentials(),
37+
}
38+
}
39+
40+
func (c Config) GetCloudEventReceiverPort() int {
41+
return c.Port + 1
2342
}
2443

2544
func InitConfig(filename string) (*Config, error) {

internal/gateway/gateway.go

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,15 @@ import (
3030
"github.com/google/uuid"
3131
eb "github.com/linkall-labs/vanus/client"
3232
"github.com/linkall-labs/vanus/client/pkg/api"
33+
"github.com/linkall-labs/vanus/internal/gateway/proxy"
3334
"github.com/linkall-labs/vanus/internal/primitive"
3435
"github.com/linkall-labs/vanus/observability/log"
3536
"github.com/linkall-labs/vanus/observability/tracing"
3637
"go.opentelemetry.io/otel/trace"
3738
)
3839

3940
const (
40-
httpRequestPrefix = "/gateway"
41-
ctrlProxyPortShift = 2
42-
)
43-
44-
var (
45-
allowCtrlProxyList = map[string]string{
46-
"/linkall.vanus.controller.PingServer/Ping": "ALLOW",
47-
"/linkall.vanus.controller.EventBusController/ListEventBus": "ALLOW",
48-
"/linkall.vanus.controller.EventBusController/CreateEventBus": "ALLOW",
49-
"/linkall.vanus.controller.EventBusController/DeleteEventBus": "ALLOW",
50-
"/linkall.vanus.controller.EventBusController/GetEventBus": "ALLOW",
51-
"/linkall.vanus.controller.EventLogController/ListSegment": "ALLOW",
52-
"/linkall.vanus.controller.TriggerController/CreateSubscription": "ALLOW",
53-
"/linkall.vanus.controller.TriggerController/DeleteSubscription": "ALLOW",
54-
"/linkall.vanus.controller.TriggerController/GetSubscription": "ALLOW",
55-
"/linkall.vanus.controller.TriggerController/ListSubscription": "ALLOW",
56-
}
41+
httpRequestPrefix = "/gateway"
5742
)
5843

5944
var (
@@ -67,28 +52,44 @@ type EventData struct {
6752

6853
type ceGateway struct {
6954
// ceClient v2.Client
70-
busWriter sync.Map
71-
config Config
72-
client eb.Client
73-
cp *ctrlProxy
74-
tracer *tracing.Tracer
55+
busWriter sync.Map
56+
config Config
57+
client eb.Client
58+
proxySrv *proxy.ControllerProxy
59+
tracer *tracing.Tracer
60+
ceListener net.Listener
7561
}
7662

7763
func NewGateway(config Config) *ceGateway {
7864
return &ceGateway{
79-
config: config,
80-
client: eb.Connect(config.ControllerAddr),
81-
cp: newCtrlProxy(config.Port+ctrlProxyPortShift, allowCtrlProxyList, config.ControllerAddr),
82-
tracer: tracing.NewTracer("cloudevents", trace.SpanKindServer),
65+
config: config,
66+
client: eb.Connect(config.ControllerAddr),
67+
proxySrv: proxy.NewControllerProxy(config.GetProxyConfig()),
68+
tracer: tracing.NewTracer("cloudevents", trace.SpanKindServer),
8369
}
8470
}
8571

86-
func (ga *ceGateway) StartCtrlProxy(ctx context.Context) error {
87-
return ga.cp.start(ctx)
72+
func (ga *ceGateway) Start(ctx context.Context) error {
73+
if err := ga.startCloudEventsReceiver(ctx); err != nil {
74+
return err
75+
}
76+
if err := ga.proxySrv.Start(); err != nil {
77+
return err
78+
}
79+
return nil
8880
}
8981

90-
func (ga *ceGateway) StartReceive(ctx context.Context) error {
91-
ls, err := net.Listen("tcp", fmt.Sprintf(":%d", ga.config.Port))
82+
func (ga *ceGateway) Stop() {
83+
ga.proxySrv.Stop()
84+
if err := ga.ceListener.Close(); err != nil {
85+
log.Warning(context.Background(), "close CloudEvents listener error", map[string]interface{}{
86+
log.KeyError: err,
87+
})
88+
}
89+
}
90+
91+
func (ga *ceGateway) startCloudEventsReceiver(ctx context.Context) error {
92+
ls, err := net.Listen("tcp", fmt.Sprintf(":%d", ga.config.GetCloudEventReceiverPort()))
9293
if err != nil {
9394
return err
9495
}
@@ -97,7 +98,14 @@ func (ga *ceGateway) StartReceive(ctx context.Context) error {
9798
if err != nil {
9899
return err
99100
}
100-
return c.StartReceiver(ctx, ga.receive)
101+
102+
ga.ceListener = ls
103+
go func() {
104+
if err := c.StartReceiver(ctx, ga.receive); err != nil {
105+
panic(fmt.Sprintf("start CloudEvents receiver failed: %s", err.Error()))
106+
}
107+
}()
108+
return nil
101109
}
102110

103111
func (ga *ceGateway) receive(ctx context.Context, event v2.Event) (*v2.Event, protocol.Result) {

0 commit comments

Comments
 (0)