@@ -17,6 +17,9 @@ package gateway
1717import (
1818 "context"
1919 "fmt"
20+ ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
21+ "google.golang.org/grpc/credentials/insecure"
22+ "google.golang.org/protobuf/types/known/emptypb"
2023 "net"
2124 "sync"
2225 "time"
@@ -29,16 +32,34 @@ import (
2932 "google.golang.org/grpc/status"
3033)
3134
32- func startGRPCProxy (ctx context.Context , port int , ctrlList []string ) error {
33- cp := & ctrlProxy {ctrlLists : ctrlList }
35+ func newCtrlProxy (port int , allowProxyMethod map [string ]string , ctrlList []string ) * ctrlProxy {
36+ return & ctrlProxy {
37+ ctrlLists : ctrlList ,
38+ port : port ,
39+ allowProxyMethod : allowProxyMethod ,
40+ ticker : time .NewTicker (time .Second ),
41+ }
42+ }
43+
44+ type ctrlProxy struct {
45+ ctrlLists []string
46+ rwMutex sync.RWMutex
47+ leaderConn * grpc.ClientConn
48+ allowProxyMethod map [string ]string
49+ ticker * time.Ticker
50+ port int
51+ }
52+
53+ func (cp * ctrlProxy ) start (ctx context.Context ) error {
3454 grpcServer := grpc .NewServer (
35- //grpc.CustomCodec(proxy.Codec()),
3655 grpc .UnknownServiceHandler (proxy .TransparentHandler (cp .director )),
3756 )
38- listen , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
57+ listen , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , cp . port ))
3958 if err != nil {
4059 return err
4160 }
61+
62+ go cp .updateLeader (ctx )
4263 go func () {
4364 if err := grpcServer .Serve (listen ); err != nil {
4465 panic ("start grpc server failed: " + err .Error ())
@@ -54,44 +75,133 @@ func startGRPCProxy(ctx context.Context, port int, ctrlList []string) error {
5475 }
5576 }
5677 }()
78+ go cp .updateLeader (ctx )
5779 return nil
5880}
5981
60- type ctrlProxy struct {
61- ctrlLists []string
62- rwMutex sync.RWMutex
63- leaderConn * grpc.ClientConn
64- haveLeader bool
65- }
66-
6782func (cp * ctrlProxy ) director (ctx context.Context , fullMethodName string ) (context.Context , * grpc.ClientConn , error ) {
6883 md , _ := metadata .FromIncomingContext (ctx )
6984 outCtx := metadata .NewOutgoingContext (ctx , md .Copy ())
7085
71- if ! cp .haveLeader {
86+ if cp .leaderConn == nil {
7287 return outCtx , nil , status .Errorf (codes .Internal , "No leader founded" )
7388 }
7489
75- switch fullMethodName {
76- case "xxx" :
77- default :
90+ _ , exist := cp .allowProxyMethod [fullMethodName ]
91+ if ! exist {
92+ log .Warning (nil , "invalid access" , map [string ]interface {}{
93+ "method" : fullMethodName ,
94+ })
7895 return outCtx , nil , status .Errorf (codes .Unimplemented , "Unknown method" )
7996 }
97+
8098 cp .rwMutex .RLock ()
8199 defer cp .rwMutex .RUnlock ()
82100 return outCtx , cp .leaderConn , nil
83101}
84102
85103func (cp * ctrlProxy ) updateLeader (ctx context.Context ) {
86- ticker := time .NewTicker (time .Second )
87- defer ticker .Stop ()
104+ defer cp .ticker .Stop ()
88105 for {
89106 select {
90107 case <- ctx .Done ():
91108 return
92- case <- ticker .C :
109+ case <- cp . ticker .C :
93110 cp .rwMutex .Lock ()
111+ if cp .leaderConn == nil {
112+ cp .leaderConn = getLeaderControllerGRPCConn (ctx , cp .ctrlLists ... )
113+ } else {
114+ pingClient := ctrlpb .NewPingServerClient (cp .leaderConn )
115+ res , err := pingClient .Ping (ctx , & emptypb.Empty {})
116+ if err != nil {
117+ cp .leaderConn = getLeaderControllerGRPCConn (ctx , cp .ctrlLists ... )
118+ } else if res .LeaderAddr != cp .leaderConn .Target () {
119+ cp .leaderConn = getLeaderControllerGRPCConn (ctx , res .LeaderAddr )
120+ }
121+ }
122+ if cp .leaderConn == nil {
123+ log .Error (ctx , "connect to leader failed" , nil )
124+ }
94125 cp .rwMutex .Unlock ()
95126 }
96127 }
97128}
129+
130+ func getLeaderControllerGRPCConn (ctx context.Context , endpoints ... string ) * grpc.ClientConn {
131+ var leaderAddr string
132+ var leaderConn * grpc.ClientConn
133+ tryConnectLeaderOnce := false
134+ for idx := range endpoints {
135+ conn := createGRPCConn (ctx , endpoints [idx ])
136+ if conn == nil {
137+ continue
138+ }
139+ pingClient := ctrlpb .NewPingServerClient (conn )
140+ res , err := pingClient .Ping (ctx , & emptypb.Empty {})
141+ if err != nil {
142+ log .Warning (ctx , "ping controller failed" , map [string ]interface {}{
143+ "endpoint" : endpoints [idx ],
144+ })
145+ continue
146+ }
147+ leaderAddr = res .LeaderAddr
148+ if leaderAddr == endpoints [idx ] {
149+ leaderConn = conn
150+ tryConnectLeaderOnce = false
151+ } else {
152+ _ = conn .Close ()
153+ }
154+ break
155+ }
156+
157+ if leaderAddr == "" {
158+ return nil
159+ }
160+
161+ if leaderConn != nil {
162+ return leaderConn
163+ } else if ! tryConnectLeaderOnce {
164+ leaderConn = createGRPCConn (ctx , leaderAddr )
165+ }
166+
167+ if leaderConn == nil {
168+ log .Error (ctx , "ping controller failed" , nil )
169+ return nil
170+ }
171+ return leaderConn
172+ }
173+
174+ func createGRPCConn (ctx context.Context , addr string ) * grpc.ClientConn {
175+ if addr == "" {
176+ return nil
177+ }
178+ var err error
179+ var opts []grpc.DialOption
180+ opts = append (opts , grpc .WithTransportCredentials (insecure .NewCredentials ()))
181+ opts = append (opts , grpc .WithBlock ())
182+ ctx , cancel := context .WithCancel (ctx )
183+ timeout := false
184+ go func () {
185+ ticker := time .Tick (time .Second )
186+ select {
187+ case <- ctx .Done ():
188+ case <- ticker :
189+ cancel ()
190+ timeout = true
191+ }
192+ }()
193+ conn , err := grpc .DialContext (ctx , addr , opts ... )
194+ cancel ()
195+ if timeout {
196+ log .Warning (ctx , "dial to controller timeout, try to another controller" , map [string ]interface {}{
197+ "endpoint" : addr ,
198+ })
199+ return nil
200+ } else if err != nil {
201+ log .Warning (ctx , "dial to controller failed, try to another controller" , map [string ]interface {}{
202+ "endpoint" : addr ,
203+ })
204+ return nil
205+ }
206+ return conn
207+ }
0 commit comments