Skip to content

Commit d687ca6

Browse files
committed
feat: add proxy for controller in gateway
1 parent 70fba0f commit d687ca6

File tree

14 files changed

+1915
-92
lines changed

14 files changed

+1915
-92
lines changed

deploy/all-in-one.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ spec:
151151
nodePort: 30002
152152
port: 8081
153153
targetPort: 8081
154+
- name: ctrl-proxy
155+
nodePort: 30003
156+
port: 8082
157+
targetPort: 8082
154158
selector:
155159
app: vanus-gateway
156160
type: NodePort

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/linkall-labs/vanus/client v0.1.0
2323
github.com/linkall-labs/vanus/proto v0.1.0
2424
github.com/linkall-labs/vanus/raft v0.1.0
25+
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32
2526
github.com/ncw/directio v1.0.5
2627
github.com/pkg/errors v0.9.1
2728
github.com/prometheus/client_golang v1.11.1
@@ -48,6 +49,7 @@ replace (
4849
)
4950

5051
require (
52+
github.com/BurntSushi/toml v0.3.1 // indirect
5153
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220209173558-ad29539cd2e9 // indirect
5254
github.com/beorn7/perks v1.0.1 // indirect
5355
github.com/cespare/xxhash/v2 v2.1.1 // indirect
@@ -111,11 +113,15 @@ require (
111113
go.uber.org/multierr v1.6.0 // indirect
112114
go.uber.org/zap v1.17.0 // indirect
113115
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
116+
golang.org/x/mod v0.4.2 // indirect
114117
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
115118
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
116119
golang.org/x/text v0.3.7 // indirect
120+
golang.org/x/tools v0.1.5 // indirect
121+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
117122
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
118123
gopkg.in/yaml.v2 v2.4.0 // indirect
124+
honnef.co/go/tools v0.1.3 // indirect
119125
k8s.io/apimachinery v0.23.4 // indirect
120126
k8s.io/klog/v2 v2.30.0 // indirect
121127
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
373373
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
374374
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
375375
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
376+
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32 h1:CC9KzU7WPrK6DTppkUGiwmttoHCNwOLT7Z+stp1eIpU=
377+
github.com/mwitkow/grpc-proxy v0.0.0-20220126150247-db34e7bfee32/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
376378
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
377379
github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
378380
github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk=
@@ -601,6 +603,7 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
601603
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
602604
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
603605
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
606+
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
604607
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
605608
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
606609
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -644,6 +647,7 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v
644647
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
645648
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
646649
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
650+
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
647651
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
648652
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
649653
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -729,6 +733,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w
729733
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
730734
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
731735
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
736+
golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
732737
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
733738
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
734739
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -895,6 +900,7 @@ google.golang.org/genproto v0.0.0-20210222152913-aa3ee6e6a81c/go.mod h1:FWY/as6D
895900
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
896901
google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
897902
google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
903+
google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
898904
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
899905
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
900906
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
@@ -973,6 +979,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
973979
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
974980
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
975981
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
982+
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
983+
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
976984
k8s.io/api v0.23.4/go.mod h1:i77F4JfyNNrhOjZF7OwwNJS5Y1S9dpwvb9iYRYRczfI=
977985
k8s.io/apimachinery v0.23.4 h1:fhnuMd/xUL3Cjfl64j5ULKZ1/J9n8NuQEgNL+WXWfdM=
978986
k8s.io/apimachinery v0.23.4/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM=

internal/gateway/gateway.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,23 @@ const (
3636
xceVanusEventbus = "xvanuseventbus"
3737
)
3838

39+
var (
40+
allowCtrlProxyList = map[string]string{
41+
"/linkall.vanus.controller.PingServer/Ping": "ALLOW",
42+
}
43+
)
44+
3945
type ceGateway struct {
4046
// ceClient v2.Client
4147
busWriter sync.Map
4248
config Config
49+
cp *ctrlProxy
4350
}
4451

4552
func NewGateway(config Config) *ceGateway {
4653
return &ceGateway{
4754
config: config,
55+
cp: newCtrlProxy(config.Port+2, allowCtrlProxyList, config.ControllerAddr),
4856
}
4957
}
5058

internal/gateway/grpc_proxy.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright 2022 Linkall Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gateway
16+
17+
import (
18+
"context"
19+
"fmt"
20+
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
21+
"google.golang.org/grpc/credentials/insecure"
22+
"google.golang.org/protobuf/types/known/emptypb"
23+
"net"
24+
"sync"
25+
"time"
26+
27+
"github.com/linkall-labs/vanus/observability/log"
28+
"github.com/mwitkow/grpc-proxy/proxy"
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/metadata"
32+
"google.golang.org/grpc/status"
33+
)
34+
35+
func newCtrlProxy(port int, allowProxyMethod map[string]string, ctrlList []string) *ctrlProxy {
36+
return &ctrlProxy{
37+
ctrlLists: ctrlList,
38+
port: port,
39+
allowProxyMethod: allowProxyMethod,
40+
ticker: time.NewTicker(time.Second),
41+
}
42+
}
43+
44+
type ctrlProxy struct {
45+
ctrlLists []string
46+
rwMutex sync.RWMutex
47+
leaderConn *grpc.ClientConn
48+
allowProxyMethod map[string]string
49+
ticker *time.Ticker
50+
port int
51+
}
52+
53+
func (cp *ctrlProxy) start(ctx context.Context) error {
54+
grpcServer := grpc.NewServer(
55+
grpc.UnknownServiceHandler(proxy.TransparentHandler(cp.director)),
56+
)
57+
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.port))
58+
if err != nil {
59+
return err
60+
}
61+
62+
go cp.updateLeader(ctx)
63+
go func() {
64+
if err := grpcServer.Serve(listen); err != nil {
65+
panic("start grpc server failed: " + err.Error())
66+
}
67+
log.Info(ctx, "the grpc server shutdown", nil)
68+
}()
69+
70+
go func() {
71+
for {
72+
select {
73+
case <-ctx.Done():
74+
grpcServer.GracefulStop()
75+
}
76+
}
77+
}()
78+
go cp.updateLeader(ctx)
79+
return nil
80+
}
81+
82+
func (cp *ctrlProxy) director(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
83+
md, _ := metadata.FromIncomingContext(ctx)
84+
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
85+
86+
if cp.leaderConn == nil {
87+
return outCtx, nil, status.Errorf(codes.Internal, "No leader founded")
88+
}
89+
90+
_, exist := cp.allowProxyMethod[fullMethodName]
91+
if !exist {
92+
log.Warning(nil, "invalid access", map[string]interface{}{
93+
"method": fullMethodName,
94+
})
95+
return outCtx, nil, status.Errorf(codes.Unimplemented, "Unknown method")
96+
}
97+
98+
cp.rwMutex.RLock()
99+
defer cp.rwMutex.RUnlock()
100+
return outCtx, cp.leaderConn, nil
101+
}
102+
103+
func (cp *ctrlProxy) updateLeader(ctx context.Context) {
104+
defer cp.ticker.Stop()
105+
for {
106+
select {
107+
case <-ctx.Done():
108+
return
109+
case <-cp.ticker.C:
110+
cp.rwMutex.Lock()
111+
if cp.leaderConn == nil {
112+
cp.leaderConn = getLeaderControllerGRPCConn(ctx, cp.ctrlLists...)
113+
} else {
114+
pingClient := ctrlpb.NewPingServerClient(cp.leaderConn)
115+
res, err := pingClient.Ping(ctx, &emptypb.Empty{})
116+
if err != nil {
117+
cp.leaderConn = getLeaderControllerGRPCConn(ctx, cp.ctrlLists...)
118+
} else if res.LeaderAddr != cp.leaderConn.Target() {
119+
cp.leaderConn = getLeaderControllerGRPCConn(ctx, res.LeaderAddr)
120+
}
121+
}
122+
if cp.leaderConn == nil {
123+
log.Error(ctx, "connect to leader failed", nil)
124+
}
125+
cp.rwMutex.Unlock()
126+
}
127+
}
128+
}
129+
130+
func getLeaderControllerGRPCConn(ctx context.Context, endpoints ...string) *grpc.ClientConn {
131+
var leaderAddr string
132+
var leaderConn *grpc.ClientConn
133+
tryConnectLeaderOnce := false
134+
for idx := range endpoints {
135+
conn := createGRPCConn(ctx, endpoints[idx])
136+
if conn == nil {
137+
continue
138+
}
139+
pingClient := ctrlpb.NewPingServerClient(conn)
140+
res, err := pingClient.Ping(ctx, &emptypb.Empty{})
141+
if err != nil {
142+
log.Warning(ctx, "ping controller failed", map[string]interface{}{
143+
"endpoint": endpoints[idx],
144+
})
145+
continue
146+
}
147+
leaderAddr = res.LeaderAddr
148+
if leaderAddr == endpoints[idx] {
149+
leaderConn = conn
150+
tryConnectLeaderOnce = false
151+
} else {
152+
_ = conn.Close()
153+
}
154+
break
155+
}
156+
157+
if leaderAddr == "" {
158+
return nil
159+
}
160+
161+
if leaderConn != nil {
162+
return leaderConn
163+
} else if !tryConnectLeaderOnce {
164+
leaderConn = createGRPCConn(ctx, leaderAddr)
165+
}
166+
167+
if leaderConn == nil {
168+
log.Error(ctx, "ping controller failed", nil)
169+
return nil
170+
}
171+
return leaderConn
172+
}
173+
174+
func createGRPCConn(ctx context.Context, addr string) *grpc.ClientConn {
175+
if addr == "" {
176+
return nil
177+
}
178+
var err error
179+
var opts []grpc.DialOption
180+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
181+
opts = append(opts, grpc.WithBlock())
182+
ctx, cancel := context.WithCancel(ctx)
183+
timeout := false
184+
go func() {
185+
ticker := time.Tick(time.Second)
186+
select {
187+
case <-ctx.Done():
188+
case <-ticker:
189+
cancel()
190+
timeout = true
191+
}
192+
}()
193+
conn, err := grpc.DialContext(ctx, addr, opts...)
194+
cancel()
195+
if timeout {
196+
log.Warning(ctx, "dial to controller timeout, try to another controller", map[string]interface{}{
197+
"endpoint": addr,
198+
})
199+
return nil
200+
} else if err != nil {
201+
log.Warning(ctx, "dial to controller failed, try to another controller", map[string]interface{}{
202+
"endpoint": addr,
203+
})
204+
return nil
205+
}
206+
return conn
207+
}

0 commit comments

Comments
 (0)