Skip to content

Commit c0dadd4

Browse files
committed
feat: add proxy for controller in gateway
1 parent 70fba0f commit c0dadd4

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

internal/gateway/grpc_proxy.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2022 Linkall Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gateway
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"net"
21+
"sync"
22+
"time"
23+
24+
"github.com/linkall-labs/vanus/observability/log"
25+
"github.com/mwitkow/grpc-proxy/proxy"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/metadata"
29+
"google.golang.org/grpc/status"
30+
)
31+
32+
func startGRPCProxy(ctx context.Context, port int, ctrlList []string) error {
33+
cp := &ctrlProxy{ctrlLists: ctrlList}
34+
grpcServer := grpc.NewServer(
35+
//grpc.CustomCodec(proxy.Codec()),
36+
grpc.UnknownServiceHandler(proxy.TransparentHandler(cp.director)),
37+
)
38+
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
39+
if err != nil {
40+
return err
41+
}
42+
go func() {
43+
if err := grpcServer.Serve(listen); err != nil {
44+
panic("start grpc server failed: " + err.Error())
45+
}
46+
log.Info(ctx, "the grpc server shutdown", nil)
47+
}()
48+
49+
go func() {
50+
for {
51+
select {
52+
case <-ctx.Done():
53+
grpcServer.GracefulStop()
54+
}
55+
}
56+
}()
57+
return nil
58+
}
59+
60+
type ctrlProxy struct {
61+
ctrlLists []string
62+
rwMutex sync.RWMutex
63+
leaderConn *grpc.ClientConn
64+
haveLeader bool
65+
}
66+
67+
func (cp *ctrlProxy) director(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
68+
md, _ := metadata.FromIncomingContext(ctx)
69+
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
70+
71+
if !cp.haveLeader {
72+
return outCtx, nil, status.Errorf(codes.Internal, "No leader founded")
73+
}
74+
75+
switch fullMethodName {
76+
case "xxx":
77+
default:
78+
return outCtx, nil, status.Errorf(codes.Unimplemented, "Unknown method")
79+
}
80+
cp.rwMutex.RLock()
81+
defer cp.rwMutex.RUnlock()
82+
return outCtx, cp.leaderConn, nil
83+
}
84+
85+
func (cp *ctrlProxy) updateLeader(ctx context.Context) {
86+
ticker := time.NewTicker(time.Second)
87+
defer ticker.Stop()
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
return
92+
case <-ticker.C:
93+
cp.rwMutex.Lock()
94+
cp.rwMutex.Unlock()
95+
}
96+
}
97+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package gateway

0 commit comments

Comments
 (0)