diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 04a692a46..854c15add 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -41,6 +41,14 @@ func main() { go gateway.MustStartHTTP(*cfg) ga := gateway.NewGateway(*cfg) + err = ga.StartCtrlProxy(context.Background()) + if err != nil { + log.Error(context.Background(), "start controller proxy failed", map[string]interface{}{ + log.KeyError: err, + }) + os.Exit(-1) + } + err = ga.StartReceive(context.Background()) if err != nil { log.Error(context.Background(), "start CloudEvents gateway failed", map[string]interface{}{ diff --git a/deploy/all-in-one.yaml b/deploy/all-in-one.yaml index a18714731..29fc08d84 100644 --- a/deploy/all-in-one.yaml +++ b/deploy/all-in-one.yaml @@ -9,7 +9,6 @@ data: name: ${POD_NAME} ip: ${POD_IP} port: 2048 - gateway_endpoint: http://192.168.49.2:30001 etcd: - vanus-controller-0.vanus-controller.vanus.svc:2379 - vanus-controller-1.vanus-controller.vanus.svc:2379 @@ -96,48 +95,6 @@ spec: --- apiVersion: v1 kind: Service -metadata: - name: vanus-controller-0 - namespace: vanus -spec: - ports: - - name: grpc - nodePort: 32000 - port: 2048 - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-0 - type: NodePort ---- -apiVersion: v1 -kind: Service -metadata: - name: vanus-controller-1 - namespace: vanus -spec: - ports: - - name: grpc - nodePort: 32001 - port: 2048 - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-1 - type: NodePort ---- -apiVersion: v1 -kind: Service -metadata: - name: vanus-controller-2 - namespace: vanus -spec: - ports: - - name: grpc - nodePort: 32002 - port: 2048 - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-2 - type: NodePort ---- -apiVersion: v1 -kind: Service metadata: name: vanus-gateway namespace: vanus @@ -151,6 +108,10 @@ spec: nodePort: 30002 port: 8081 targetPort: 8081 + - name: ctrl-proxy + nodePort: 30003 + port: 8082 + targetPort: 8082 selector: app: vanus-gateway type: NodePort @@ -173,7 +134,7 @@ spec: app: vanus-gateway spec: containers: - - image: public.ecr.aws/vanus/gateway:v0.0.5 + - image: public.ecr.aws/vanus/gateway:v0.1.0 imagePullPolicy: IfNotPresent name: gateway ports: @@ -214,7 +175,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/trigger:v0.0.5 + image: public.ecr.aws/vanus/trigger:v0.1.0 imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 @@ -264,7 +225,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/controller:v0.0.5 + image: public.ecr.aws/vanus/controller:v0.1.0 imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 @@ -322,7 +283,7 @@ spec: - command: - /bin/sh - -c - - export VOLUME_ID=${HOSTNAME##*-}; /vanus/bin/store + - VOLUME_ID=${HOSTNAME##*-} /vanus/bin/store env: - name: VANUS_LOG_LEVEL value: DEBUG @@ -330,7 +291,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/store:v0.0.5 + image: public.ecr.aws/vanus/store:v0.1.0 imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 diff --git a/deploy/yaml/controller.yaml b/deploy/yaml/controller.yaml index 3cc455b17..1fb59c5d3 100644 --- a/deploy/yaml/controller.yaml +++ b/deploy/yaml/controller.yaml @@ -21,7 +21,6 @@ data: name: ${POD_NAME} ip: ${POD_IP} port: 2048 - gateway_endpoint: http://192.168.49.2:30001 etcd: - vanus-controller-0.vanus-controller.vanus.svc:2379 - vanus-controller-1.vanus-controller.vanus.svc:2379 @@ -57,48 +56,6 @@ spec: - port: 2048 name: vanus-controller --- -apiVersion: v1 -kind: Service -metadata: - name: vanus-controller-0 - namespace: vanus -spec: - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-0 - type: NodePort - ports: - - port: 2048 - nodePort: 32000 - name: grpc ---- -apiVersion: v1 -kind: Service -metadata: - name: vanus-controller-1 - namespace: vanus -spec: - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-1 - type: NodePort - ports: - - port: 2048 - nodePort: 32001 - name: grpc ---- -apiVersion: v1 -kind: Service -metadata: - name: vanus-controller-2 - namespace: vanus -spec: - selector: - statefulset.kubernetes.io/pod-name: vanus-controller-2 - type: NodePort - ports: - - port: 2048 - nodePort: 32002 - name: grpc ---- apiVersion: apps/v1 kind: StatefulSet metadata: @@ -130,7 +87,7 @@ spec: spec: containers: - name: controller - image: public.ecr.aws/vanus/controller:v0.0.5 + image: public.ecr.aws/vanus/controller:v0.1.0 imagePullPolicy: IfNotPresent livenessProbe: grpc: diff --git a/deploy/yaml/gateway.yaml b/deploy/yaml/gateway.yaml index 227d1416e..a6bdd0a17 100644 --- a/deploy/yaml/gateway.yaml +++ b/deploy/yaml/gateway.yaml @@ -29,7 +29,10 @@ spec: targetPort: 8081 nodePort: 30002 name: get - + - name: ctrl-proxy + nodePort: 30003 + port: 8082 + targetPort: 8082 --- apiVersion: v1 kind: ConfigMap @@ -64,13 +67,15 @@ spec: spec: containers: - name: gateway - image: public.ecr.aws/vanus/gateway:v0.0.5 + image: public.ecr.aws/vanus/gateway:v0.1.0 imagePullPolicy: IfNotPresent ports: - name: httpput containerPort: 8080 - name: httpget containerPort: 8081 + - name: ctrl-proxy + containerPort: 8082 volumeMounts: - name: config-gateway mountPath: /vanus/config diff --git a/deploy/yaml/store.yaml b/deploy/yaml/store.yaml index 3817f4f80..7d16b650a 100644 --- a/deploy/yaml/store.yaml +++ b/deploy/yaml/store.yaml @@ -61,7 +61,7 @@ spec: spec: containers: - name: store - image: public.ecr.aws/vanus/store:v0.0.5 + image: public.ecr.aws/vanus/store:v0.1.0 imagePullPolicy: IfNotPresent command: ["/bin/sh", "-c", "VOLUME_ID=${HOSTNAME##*-} /vanus/bin/store"] livenessProbe: diff --git a/deploy/yaml/trigger.yaml b/deploy/yaml/trigger.yaml index 261bc8469..1a048964c 100644 --- a/deploy/yaml/trigger.yaml +++ b/deploy/yaml/trigger.yaml @@ -45,7 +45,7 @@ spec: spec: containers: - name: trigger - image: public.ecr.aws/vanus/trigger:v0.0.5 + image: public.ecr.aws/vanus/trigger:v0.1.0 imagePullPolicy: IfNotPresent livenessProbe: grpc: diff --git a/deploy/yaml/vsctl.yaml b/deploy/yaml/vsctl.yaml deleted file mode 100644 index f0eeb9703..000000000 --- a/deploy/yaml/vsctl.yaml +++ /dev/null @@ -1,22 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: vanus-vsctl - namespace: vanus - labels: - app: vanus-vsctl -spec: - selector: - matchLabels: - app: vanus-vsctl - replicas: 1 - template: - metadata: - labels: - app: vanus-vsctl - spec: - containers: - - name: vsctl - image: public.ecr.aws/vanus/vsctl:5952928 - imagePullPolicy: Always - command: ['sh', '-c', 'sleep 360000'] \ No newline at end of file diff --git a/go.mod b/go.mod index a51ea0abb..b4c98e258 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/linkall-labs/vanus/client v0.1.0 github.com/linkall-labs/vanus/proto v0.1.0 github.com/linkall-labs/vanus/raft v0.1.0 + github.com/mwitkow/grpc-proxy v0.0.0 github.com/ncw/directio v1.0.5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.1 @@ -45,6 +46,7 @@ replace ( github.com/linkall-labs/vanus/client => ./client github.com/linkall-labs/vanus/proto => ./proto github.com/linkall-labs/vanus/raft => ./raft + github.com/mwitkow/grpc-proxy => github.com/linkall-labs/grpc-proxy v0.0.0-20220624142509-a3b0cb2bb86c ) require ( diff --git a/go.sum b/go.sum index 751bc1058..af517f025 100644 --- a/go.sum +++ b/go.sum @@ -338,6 +338,8 @@ github.com/labstack/gommon v0.3.1 h1:OomWaJXm7xR6L1HmEtGyQf26TEn7V6X88mktX9kee9o github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/linkall-labs/embed-etcd v0.0.1 h1:k1707kTZXS/HRqvt08K8fI3INJHiX1/VMPft5GWml0I= github.com/linkall-labs/embed-etcd v0.0.1/go.mod h1:2vIGENtcpLaQZ1JZvKq0LRUjvTlNCeY0r3ySTKwX0EA= +github.com/linkall-labs/grpc-proxy v0.0.0-20220624142509-a3b0cb2bb86c h1:Psf/nuPqSfn3yF/AInrDmIs9bY2Rv56Cmf855i6//K0= +github.com/linkall-labs/grpc-proxy v0.0.0-20220624142509-a3b0cb2bb86c/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -644,6 +646,7 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -673,6 +676,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -729,6 +733,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -895,6 +900,7 @@ google.golang.org/genproto v0.0.0-20210222152913-aa3ee6e6a81c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= @@ -973,6 +979,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= k8s.io/api v0.23.4/go.mod h1:i77F4JfyNNrhOjZF7OwwNJS5Y1S9dpwvb9iYRYRczfI= k8s.io/apimachinery v0.23.4 h1:fhnuMd/xUL3Cjfl64j5ULKZ1/J9n8NuQEgNL+WXWfdM= k8s.io/apimachinery v0.23.4/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM= diff --git a/internal/controller/config.go b/internal/controller/config.go index 09bb25bbc..10e8413da 100644 --- a/internal/controller/config.go +++ b/internal/controller/config.go @@ -34,7 +34,6 @@ type Config struct { EtcdConfig embedetcd.Config `yaml:"embed_etcd"` Topology map[string]string `yaml:"topology"` Replicas uint `yaml:"replicas"` - GatewayEndpoint string `yaml:"gateway_endpoint"` } func (c *Config) GetEtcdConfig() embedetcd.Config { @@ -51,7 +50,6 @@ func (c *Config) GetEventbusCtrlConfig() eventbus.Config { KVKeyPrefix: c.MetadataConfig.KeyPrefix, Replicas: c.Replicas, Topology: c.Topology, - GatewayEndpoint: c.GatewayEndpoint, } } diff --git a/internal/controller/eventbus/config.go b/internal/controller/eventbus/config.go index a640f8f1d..605ba9dbc 100644 --- a/internal/controller/eventbus/config.go +++ b/internal/controller/eventbus/config.go @@ -24,5 +24,4 @@ type Config struct { EtcdConfig embedetcd.Config `yaml:"etcd"` Replicas uint `yaml:"replicas"` Topology map[string]string `yaml:"topology"` - GatewayEndpoint string `yaml:"gateway_endpoint"` } diff --git a/internal/controller/eventbus/controller.go b/internal/controller/eventbus/controller.go index 0f8ef42bf..f55556845 100644 --- a/internal/controller/eventbus/controller.go +++ b/internal/controller/eventbus/controller.go @@ -424,8 +424,7 @@ func (ctrl *controller) ReportSegmentBlockIsFull(ctx context.Context, func (ctrl *controller) Ping(_ context.Context, _ *emptypb.Empty) (*ctrlpb.PingResponse, error) { return &ctrlpb.PingResponse{ - LeaderAddr: ctrl.member.GetLeaderAddr(), - GatewayAddr: ctrl.cfg.GatewayEndpoint, + LeaderAddr: ctrl.member.GetLeaderAddr(), }, nil } diff --git a/internal/gateway/gateway.go b/internal/gateway/gateway.go index b99356387..9a1e58c32 100644 --- a/internal/gateway/gateway.go +++ b/internal/gateway/gateway.go @@ -32,22 +32,43 @@ import ( ) const ( - httpRequestPrefix = "/gateway" - xceVanusEventbus = "xvanuseventbus" + httpRequestPrefix = "/gateway" + xceVanusEventbus = "xvanuseventbus" + ctrlProxyPortShift = 2 +) + +var ( + allowCtrlProxyList = map[string]string{ + "/linkall.vanus.controller.PingServer/Ping": "ALLOW", + "/linkall.vanus.controller.EventBusController/ListEventBus": "ALLOW", + "/linkall.vanus.controller.EventBusController/CreateEventBus": "ALLOW", + "/linkall.vanus.controller.EventBusController/DeleteEventBus": "ALLOW", + "/linkall.vanus.controller.EventBusController/GetEventBus": "ALLOW", + "/linkall.vanus.controller.TriggerController/CreateSubscription": "ALLOW", + "/linkall.vanus.controller.TriggerController/DeleteSubscription": "ALLOW", + "/linkall.vanus.controller.TriggerController/GetSubscription": "ALLOW", + "/linkall.vanus.controller.TriggerController/ListSubscription": "ALLOW", + } ) type ceGateway struct { // ceClient v2.Client busWriter sync.Map config Config + cp *ctrlProxy } func NewGateway(config Config) *ceGateway { return &ceGateway{ config: config, + cp: newCtrlProxy(config.Port+ctrlProxyPortShift, allowCtrlProxyList, config.ControllerAddr), } } +func (ga *ceGateway) StartCtrlProxy(ctx context.Context) error { + return ga.cp.start(ctx) +} + func (ga *ceGateway) StartReceive(ctx context.Context) error { ls, err := net.Listen("tcp", fmt.Sprintf(":%d", ga.config.Port)) if err != nil { diff --git a/internal/gateway/grpc_proxy.go b/internal/gateway/grpc_proxy.go new file mode 100644 index 000000000..f5d918b40 --- /dev/null +++ b/internal/gateway/grpc_proxy.go @@ -0,0 +1,204 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gateway + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/linkall-labs/vanus/observability/log" + ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" + "github.com/mwitkow/grpc-proxy/proxy" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +func newCtrlProxy(port int, allowProxyMethod map[string]string, ctrlList []string) *ctrlProxy { + return &ctrlProxy{ + ctrlLists: ctrlList, + port: port, + allowProxyMethod: allowProxyMethod, + ticker: time.NewTicker(time.Second), + } +} + +type ctrlProxy struct { + ctrlLists []string + rwMutex sync.RWMutex + leaderConn *grpc.ClientConn + allowProxyMethod map[string]string + ticker *time.Ticker + port int +} + +func (cp *ctrlProxy) start(ctx context.Context) error { + grpcServer := grpc.NewServer( + grpc.UnknownServiceHandler(proxy.TransparentHandler(cp.director)), + ) + listen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.port)) + if err != nil { + return err + } + + go cp.updateLeader(ctx) + go func() { + if err := grpcServer.Serve(listen); err != nil { + panic("start grpc server failed: " + err.Error()) + } + log.Info(ctx, "the grpc server shutdown", nil) + }() + + go func() { + <-ctx.Done() + grpcServer.GracefulStop() + }() + go cp.updateLeader(ctx) + return nil +} + +func (cp *ctrlProxy) director(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + md, _ := metadata.FromIncomingContext(ctx) + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) + + if cp.leaderConn == nil { + return outCtx, nil, status.Errorf(codes.Internal, "No leader founded") + } + + _, exist := cp.allowProxyMethod[fullMethodName] + if !exist { + log.Warning(ctx, "invalid access", map[string]interface{}{ + "method": fullMethodName, + }) + return outCtx, nil, status.Errorf(codes.Unimplemented, "Unknown method") + } + + cp.rwMutex.RLock() + defer cp.rwMutex.RUnlock() + return outCtx, cp.leaderConn, nil +} + +func (cp *ctrlProxy) updateLeader(ctx context.Context) { + defer cp.ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-cp.ticker.C: + cp.rwMutex.Lock() + if cp.leaderConn == nil { + cp.leaderConn = getLeaderControllerGRPCConn(ctx, cp.ctrlLists...) + } else { + pingClient := ctrlpb.NewPingServerClient(cp.leaderConn) + res, err := pingClient.Ping(ctx, &emptypb.Empty{}) + if err != nil { + cp.leaderConn = getLeaderControllerGRPCConn(ctx, cp.ctrlLists...) + } else if res.LeaderAddr != cp.leaderConn.Target() { + cp.leaderConn = getLeaderControllerGRPCConn(ctx, res.LeaderAddr) + } + } + if cp.leaderConn == nil { + log.Error(ctx, "connect to leader failed", nil) + } + cp.rwMutex.Unlock() + } + } +} + +func getLeaderControllerGRPCConn(ctx context.Context, endpoints ...string) *grpc.ClientConn { + var leaderAddr string + var leaderConn *grpc.ClientConn + tryConnectLeaderOnce := false + for idx := range endpoints { + conn := createGRPCConn(ctx, endpoints[idx]) + if conn == nil { + continue + } + pingClient := ctrlpb.NewPingServerClient(conn) + res, err := pingClient.Ping(ctx, &emptypb.Empty{}) + if err != nil { + log.Warning(ctx, "ping controller failed", map[string]interface{}{ + "endpoint": endpoints[idx], + }) + continue + } + leaderAddr = res.LeaderAddr + if leaderAddr == endpoints[idx] { + leaderConn = conn + tryConnectLeaderOnce = false + } else { + _ = conn.Close() + } + break + } + + if leaderAddr == "" { + return nil + } + + if leaderConn != nil { + return leaderConn + } else if !tryConnectLeaderOnce { + leaderConn = createGRPCConn(ctx, leaderAddr) + } + + if leaderConn == nil { + log.Error(ctx, "ping controller failed", nil) + return nil + } + return leaderConn +} + +func createGRPCConn(ctx context.Context, addr string) *grpc.ClientConn { + if addr == "" { + return nil + } + var err error + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + opts = append(opts, grpc.WithBlock()) + ctx, cancel := context.WithCancel(ctx) + timeout := false + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + select { + case <-ctx.Done(): + case <-ticker.C: + cancel() + timeout = true + } + }() + conn, err := grpc.DialContext(ctx, addr, opts...) + cancel() + if timeout { + log.Warning(ctx, "dial to controller timeout, try to another controller", map[string]interface{}{ + "endpoint": addr, + }) + return nil + } else if err != nil { + log.Warning(ctx, "dial to controller failed, try to another controller", map[string]interface{}{ + "endpoint": addr, + }) + return nil + } + return conn +} diff --git a/internal/gateway/grpc_proxy_test.go b/internal/gateway/grpc_proxy_test.go new file mode 100644 index 000000000..a6129d015 --- /dev/null +++ b/internal/gateway/grpc_proxy_test.go @@ -0,0 +1,185 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gateway + +import ( + stdCtx "context" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/golang/protobuf/ptypes/empty" + metapb "github.com/linkall-labs/vanus/proto/pkg/meta" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/emptypb" + + ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" + . "github.com/smartystreets/goconvey/convey" + "google.golang.org/grpc" +) + +func Test_ControllerProxy(t *testing.T) { + Convey("test grpc reverse proxy", t, func() { + listen1, err := net.Listen("tcp", fmt.Sprintf(":%d", 20001)) + So(err, ShouldBeNil) + ctrl := gomock.NewController(t) + pingSvc1 := ctrlpb.NewMockPingServerServer(ctrl) + ebSvc1 := ctrlpb.NewMockEventBusControllerServer(ctrl) + srv1 := grpc.NewServer() + ctrlpb.RegisterPingServerServer(srv1, pingSvc1) + ctrlpb.RegisterEventBusControllerServer(srv1, ebSvc1) + defer srv1.GracefulStop() + go func() { + _ = srv1.Serve(listen1) + }() + + listen2, err := net.Listen("tcp", fmt.Sprintf(":%d", 20002)) + So(err, ShouldBeNil) + pingSvc2 := ctrlpb.NewMockPingServerServer(ctrl) + ebSvc2 := ctrlpb.NewMockEventBusControllerServer(ctrl) + srv2 := grpc.NewServer() + ctrlpb.RegisterPingServerServer(srv2, pingSvc2) + ctrlpb.RegisterEventBusControllerServer(srv2, ebSvc2) + defer srv2.GracefulStop() + go func() { + _ = srv2.Serve(listen2) + }() + + listen3, err := net.Listen("tcp", fmt.Sprintf(":%d", 20003)) + So(err, ShouldBeNil) + pingSvc3 := ctrlpb.NewMockPingServerServer(ctrl) + ebSvc3 := ctrlpb.NewMockEventBusControllerServer(ctrl) + srv3 := grpc.NewServer() + ctrlpb.RegisterPingServerServer(srv3, pingSvc3) + ctrlpb.RegisterEventBusControllerServer(srv3, ebSvc3) + defer srv3.GracefulStop() + go func() { + _ = srv3.Serve(listen3) + }() + + ctx, cancel := stdCtx.WithCancel(stdCtx.Background()) + cp := newCtrlProxy(20000, map[string]string{}, []string{"127.0.0.1:20001", + "127.0.0.1:20002", "127.0.0.1:20003"}) + cp.ticker = time.NewTicker(10 * time.Millisecond) + err = cp.start(ctx) + So(err, ShouldBeNil) + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial("127.0.0.1:20000", opts...) + So(err, ShouldBeNil) + + pingRes := &ctrlpb.PingResponse{ + LeaderAddr: "127.0.0.1:20001", + GatewayAddr: "127.0.0.1:12345", + } + mutex := sync.Mutex{} + pingSvc1.EXPECT().Ping(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, + in *emptypb.Empty) (*ctrlpb.PingResponse, error) { + mutex.Lock() + defer mutex.Unlock() + return pingRes, nil + }) + pingSvc2.EXPECT().Ping(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, + in *emptypb.Empty) (*ctrlpb.PingResponse, error) { + mutex.Lock() + defer mutex.Unlock() + return pingRes, nil + }) + pingSvc3.EXPECT().Ping(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, + in *emptypb.Empty) (*ctrlpb.PingResponse, error) { + mutex.Lock() + defer mutex.Unlock() + return pingRes, nil + }) + + Convey("test allow methods", func() { + pingCli := ctrlpb.NewPingServerClient(conn) + res, err := pingCli.Ping(stdCtx.Background(), &empty.Empty{}) + So(res, ShouldBeNil) + So(err.Error(), ShouldContainSubstring, "No leader founded") + time.Sleep(100 * time.Millisecond) + _, err = pingCli.Ping(stdCtx.Background(), &empty.Empty{}) + So(err.Error(), ShouldContainSubstring, "Unknown method") + }) + + Convey("test ping", func() { + cp.allowProxyMethod["/linkall.vanus.controller.PingServer/Ping"] = "ALLOW" + pingCli := ctrlpb.NewPingServerClient(conn) + time.Sleep(100 * time.Millisecond) + res, err := pingCli.Ping(stdCtx.Background(), &empty.Empty{}) + So(err, ShouldBeNil) + So(res.LeaderAddr, ShouldEqual, "127.0.0.1:20001") + So(res.GatewayAddr, ShouldEqual, "127.0.0.1:12345") + + mutex.Lock() + pingRes.LeaderAddr = "127.0.0.1:20003" + mutex.Unlock() + time.Sleep(100 * time.Millisecond) + res, err = pingCli.Ping(stdCtx.Background(), &empty.Empty{}) + So(err, ShouldBeNil) + So(res.LeaderAddr, ShouldEqual, "127.0.0.1:20003") + So(res.GatewayAddr, ShouldEqual, "127.0.0.1:12345") + }) + + Convey("test list eventbus", func() { + cp.allowProxyMethod["/linkall.vanus.controller.PingServer/Ping"] = "ALLOW" + cp.allowProxyMethod["/linkall.vanus.controller.EventBusController/ListEventBus"] = "ALLOW" + ebCli := ctrlpb.NewEventBusControllerClient(conn) + time.Sleep(100 * time.Millisecond) + + ebSvc1.EXPECT().ListEventBus(gomock.Any(), gomock.Any()).Times(1).Return(&ctrlpb.ListEventbusResponse{ + Eventbus: []*metapb.EventBus{ + { + Name: "battle1", + LogNumber: 1, + Id: 1, + }, + { + Name: "battle2", + LogNumber: 2, + Id: 2, + }, + { + Name: "battle3", + LogNumber: 4, + Id: 3, + }, + { + Name: "battle4", + LogNumber: 4, + Id: 4, + }, + }, + }, nil) + res, err := ebCli.ListEventBus(ctx, &empty.Empty{}) + So(err, ShouldBeNil) + So(res.Eventbus, ShouldHaveLength, 4) + So(res.Eventbus[0].Name, ShouldEqual, "battle1") + So(res.Eventbus[0].Id, ShouldEqual, 1) + So(res.Eventbus[1].Name, ShouldEqual, "battle2") + So(res.Eventbus[1].Id, ShouldEqual, 2) + So(res.Eventbus[2].Name, ShouldEqual, "battle3") + So(res.Eventbus[2].Id, ShouldEqual, 3) + So(res.Eventbus[3].Name, ShouldEqual, "battle4") + So(res.Eventbus[3].Id, ShouldEqual, 4) + }) + + cancel() + time.Sleep(100 * time.Millisecond) + }) +} diff --git a/proto/pkg/controller/mock_controller.go b/proto/pkg/controller/mock_controller.go new file mode 100644 index 000000000..478779d56 --- /dev/null +++ b/proto/pkg/controller/mock_controller.go @@ -0,0 +1,1521 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: controller.pb.go + +// Package controller is a generated GoMock package. +package controller + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + meta "github.com/linkall-labs/vanus/proto/pkg/meta" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// MockPingServerClient is a mock of PingServerClient interface. +type MockPingServerClient struct { + ctrl *gomock.Controller + recorder *MockPingServerClientMockRecorder +} + +// MockPingServerClientMockRecorder is the mock recorder for MockPingServerClient. +type MockPingServerClientMockRecorder struct { + mock *MockPingServerClient +} + +// NewMockPingServerClient creates a new mock instance. +func NewMockPingServerClient(ctrl *gomock.Controller) *MockPingServerClient { + mock := &MockPingServerClient{ctrl: ctrl} + mock.recorder = &MockPingServerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPingServerClient) EXPECT() *MockPingServerClientMockRecorder { + return m.recorder +} + +// Ping mocks base method. +func (m *MockPingServerClient) Ping(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PingResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Ping", varargs...) + ret0, _ := ret[0].(*PingResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ping indicates an expected call of Ping. +func (mr *MockPingServerClientMockRecorder) Ping(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockPingServerClient)(nil).Ping), varargs...) +} + +// MockPingServerServer is a mock of PingServerServer interface. +type MockPingServerServer struct { + ctrl *gomock.Controller + recorder *MockPingServerServerMockRecorder +} + +// MockPingServerServerMockRecorder is the mock recorder for MockPingServerServer. +type MockPingServerServerMockRecorder struct { + mock *MockPingServerServer +} + +// NewMockPingServerServer creates a new mock instance. +func NewMockPingServerServer(ctrl *gomock.Controller) *MockPingServerServer { + mock := &MockPingServerServer{ctrl: ctrl} + mock.recorder = &MockPingServerServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPingServerServer) EXPECT() *MockPingServerServerMockRecorder { + return m.recorder +} + +// Ping mocks base method. +func (m *MockPingServerServer) Ping(arg0 context.Context, arg1 *emptypb.Empty) (*PingResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ping", arg0, arg1) + ret0, _ := ret[0].(*PingResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ping indicates an expected call of Ping. +func (mr *MockPingServerServerMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockPingServerServer)(nil).Ping), arg0, arg1) +} + +// MockEventBusControllerClient is a mock of EventBusControllerClient interface. +type MockEventBusControllerClient struct { + ctrl *gomock.Controller + recorder *MockEventBusControllerClientMockRecorder +} + +// MockEventBusControllerClientMockRecorder is the mock recorder for MockEventBusControllerClient. +type MockEventBusControllerClientMockRecorder struct { + mock *MockEventBusControllerClient +} + +// NewMockEventBusControllerClient creates a new mock instance. +func NewMockEventBusControllerClient(ctrl *gomock.Controller) *MockEventBusControllerClient { + mock := &MockEventBusControllerClient{ctrl: ctrl} + mock.recorder = &MockEventBusControllerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventBusControllerClient) EXPECT() *MockEventBusControllerClientMockRecorder { + return m.recorder +} + +// CreateEventBus mocks base method. +func (m *MockEventBusControllerClient) CreateEventBus(ctx context.Context, in *CreateEventBusRequest, opts ...grpc.CallOption) (*meta.EventBus, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateEventBus", varargs...) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateEventBus indicates an expected call of CreateEventBus. +func (mr *MockEventBusControllerClientMockRecorder) CreateEventBus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEventBus", reflect.TypeOf((*MockEventBusControllerClient)(nil).CreateEventBus), varargs...) +} + +// DeleteEventBus mocks base method. +func (m *MockEventBusControllerClient) DeleteEventBus(ctx context.Context, in *meta.EventBus, opts ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteEventBus", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteEventBus indicates an expected call of DeleteEventBus. +func (mr *MockEventBusControllerClientMockRecorder) DeleteEventBus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEventBus", reflect.TypeOf((*MockEventBusControllerClient)(nil).DeleteEventBus), varargs...) +} + +// GetEventBus mocks base method. +func (m *MockEventBusControllerClient) GetEventBus(ctx context.Context, in *meta.EventBus, opts ...grpc.CallOption) (*meta.EventBus, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetEventBus", varargs...) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetEventBus indicates an expected call of GetEventBus. +func (mr *MockEventBusControllerClientMockRecorder) GetEventBus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEventBus", reflect.TypeOf((*MockEventBusControllerClient)(nil).GetEventBus), varargs...) +} + +// ListEventBus mocks base method. +func (m *MockEventBusControllerClient) ListEventBus(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListEventbusResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListEventBus", varargs...) + ret0, _ := ret[0].(*ListEventbusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListEventBus indicates an expected call of ListEventBus. +func (mr *MockEventBusControllerClientMockRecorder) ListEventBus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListEventBus", reflect.TypeOf((*MockEventBusControllerClient)(nil).ListEventBus), varargs...) +} + +// UpdateEventBus mocks base method. +func (m *MockEventBusControllerClient) UpdateEventBus(ctx context.Context, in *UpdateEventBusRequest, opts ...grpc.CallOption) (*meta.EventBus, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UpdateEventBus", varargs...) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateEventBus indicates an expected call of UpdateEventBus. +func (mr *MockEventBusControllerClientMockRecorder) UpdateEventBus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateEventBus", reflect.TypeOf((*MockEventBusControllerClient)(nil).UpdateEventBus), varargs...) +} + +// MockEventBusControllerServer is a mock of EventBusControllerServer interface. +type MockEventBusControllerServer struct { + ctrl *gomock.Controller + recorder *MockEventBusControllerServerMockRecorder +} + +// MockEventBusControllerServerMockRecorder is the mock recorder for MockEventBusControllerServer. +type MockEventBusControllerServerMockRecorder struct { + mock *MockEventBusControllerServer +} + +// NewMockEventBusControllerServer creates a new mock instance. +func NewMockEventBusControllerServer(ctrl *gomock.Controller) *MockEventBusControllerServer { + mock := &MockEventBusControllerServer{ctrl: ctrl} + mock.recorder = &MockEventBusControllerServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventBusControllerServer) EXPECT() *MockEventBusControllerServerMockRecorder { + return m.recorder +} + +// CreateEventBus mocks base method. +func (m *MockEventBusControllerServer) CreateEventBus(arg0 context.Context, arg1 *CreateEventBusRequest) (*meta.EventBus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateEventBus", arg0, arg1) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateEventBus indicates an expected call of CreateEventBus. +func (mr *MockEventBusControllerServerMockRecorder) CreateEventBus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEventBus", reflect.TypeOf((*MockEventBusControllerServer)(nil).CreateEventBus), arg0, arg1) +} + +// DeleteEventBus mocks base method. +func (m *MockEventBusControllerServer) DeleteEventBus(arg0 context.Context, arg1 *meta.EventBus) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteEventBus", arg0, arg1) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteEventBus indicates an expected call of DeleteEventBus. +func (mr *MockEventBusControllerServerMockRecorder) DeleteEventBus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEventBus", reflect.TypeOf((*MockEventBusControllerServer)(nil).DeleteEventBus), arg0, arg1) +} + +// GetEventBus mocks base method. +func (m *MockEventBusControllerServer) GetEventBus(arg0 context.Context, arg1 *meta.EventBus) (*meta.EventBus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetEventBus", arg0, arg1) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetEventBus indicates an expected call of GetEventBus. +func (mr *MockEventBusControllerServerMockRecorder) GetEventBus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEventBus", reflect.TypeOf((*MockEventBusControllerServer)(nil).GetEventBus), arg0, arg1) +} + +// ListEventBus mocks base method. +func (m *MockEventBusControllerServer) ListEventBus(arg0 context.Context, arg1 *emptypb.Empty) (*ListEventbusResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListEventBus", arg0, arg1) + ret0, _ := ret[0].(*ListEventbusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListEventBus indicates an expected call of ListEventBus. +func (mr *MockEventBusControllerServerMockRecorder) ListEventBus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListEventBus", reflect.TypeOf((*MockEventBusControllerServer)(nil).ListEventBus), arg0, arg1) +} + +// UpdateEventBus mocks base method. +func (m *MockEventBusControllerServer) UpdateEventBus(arg0 context.Context, arg1 *UpdateEventBusRequest) (*meta.EventBus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateEventBus", arg0, arg1) + ret0, _ := ret[0].(*meta.EventBus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateEventBus indicates an expected call of UpdateEventBus. +func (mr *MockEventBusControllerServerMockRecorder) UpdateEventBus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateEventBus", reflect.TypeOf((*MockEventBusControllerServer)(nil).UpdateEventBus), arg0, arg1) +} + +// MockEventLogControllerClient is a mock of EventLogControllerClient interface. +type MockEventLogControllerClient struct { + ctrl *gomock.Controller + recorder *MockEventLogControllerClientMockRecorder +} + +// MockEventLogControllerClientMockRecorder is the mock recorder for MockEventLogControllerClient. +type MockEventLogControllerClientMockRecorder struct { + mock *MockEventLogControllerClient +} + +// NewMockEventLogControllerClient creates a new mock instance. +func NewMockEventLogControllerClient(ctrl *gomock.Controller) *MockEventLogControllerClient { + mock := &MockEventLogControllerClient{ctrl: ctrl} + mock.recorder = &MockEventLogControllerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventLogControllerClient) EXPECT() *MockEventLogControllerClientMockRecorder { + return m.recorder +} + +// GetAppendableSegment mocks base method. +func (m *MockEventLogControllerClient) GetAppendableSegment(ctx context.Context, in *GetAppendableSegmentRequest, opts ...grpc.CallOption) (*GetAppendableSegmentResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetAppendableSegment", varargs...) + ret0, _ := ret[0].(*GetAppendableSegmentResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAppendableSegment indicates an expected call of GetAppendableSegment. +func (mr *MockEventLogControllerClientMockRecorder) GetAppendableSegment(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAppendableSegment", reflect.TypeOf((*MockEventLogControllerClient)(nil).GetAppendableSegment), varargs...) +} + +// ListSegment mocks base method. +func (m *MockEventLogControllerClient) ListSegment(ctx context.Context, in *ListSegmentRequest, opts ...grpc.CallOption) (*ListSegmentResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListSegment", varargs...) + ret0, _ := ret[0].(*ListSegmentResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSegment indicates an expected call of ListSegment. +func (mr *MockEventLogControllerClientMockRecorder) ListSegment(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSegment", reflect.TypeOf((*MockEventLogControllerClient)(nil).ListSegment), varargs...) +} + +// MockEventLogControllerServer is a mock of EventLogControllerServer interface. +type MockEventLogControllerServer struct { + ctrl *gomock.Controller + recorder *MockEventLogControllerServerMockRecorder +} + +// MockEventLogControllerServerMockRecorder is the mock recorder for MockEventLogControllerServer. +type MockEventLogControllerServerMockRecorder struct { + mock *MockEventLogControllerServer +} + +// NewMockEventLogControllerServer creates a new mock instance. +func NewMockEventLogControllerServer(ctrl *gomock.Controller) *MockEventLogControllerServer { + mock := &MockEventLogControllerServer{ctrl: ctrl} + mock.recorder = &MockEventLogControllerServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventLogControllerServer) EXPECT() *MockEventLogControllerServerMockRecorder { + return m.recorder +} + +// GetAppendableSegment mocks base method. +func (m *MockEventLogControllerServer) GetAppendableSegment(arg0 context.Context, arg1 *GetAppendableSegmentRequest) (*GetAppendableSegmentResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAppendableSegment", arg0, arg1) + ret0, _ := ret[0].(*GetAppendableSegmentResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAppendableSegment indicates an expected call of GetAppendableSegment. +func (mr *MockEventLogControllerServerMockRecorder) GetAppendableSegment(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAppendableSegment", reflect.TypeOf((*MockEventLogControllerServer)(nil).GetAppendableSegment), arg0, arg1) +} + +// ListSegment mocks base method. +func (m *MockEventLogControllerServer) ListSegment(arg0 context.Context, arg1 *ListSegmentRequest) (*ListSegmentResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListSegment", arg0, arg1) + ret0, _ := ret[0].(*ListSegmentResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSegment indicates an expected call of ListSegment. +func (mr *MockEventLogControllerServerMockRecorder) ListSegment(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSegment", reflect.TypeOf((*MockEventLogControllerServer)(nil).ListSegment), arg0, arg1) +} + +// MockSegmentControllerClient is a mock of SegmentControllerClient interface. +type MockSegmentControllerClient struct { + ctrl *gomock.Controller + recorder *MockSegmentControllerClientMockRecorder +} + +// MockSegmentControllerClientMockRecorder is the mock recorder for MockSegmentControllerClient. +type MockSegmentControllerClientMockRecorder struct { + mock *MockSegmentControllerClient +} + +// NewMockSegmentControllerClient creates a new mock instance. +func NewMockSegmentControllerClient(ctrl *gomock.Controller) *MockSegmentControllerClient { + mock := &MockSegmentControllerClient{ctrl: ctrl} + mock.recorder = &MockSegmentControllerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSegmentControllerClient) EXPECT() *MockSegmentControllerClientMockRecorder { + return m.recorder +} + +// QuerySegmentRouteInfo mocks base method. +func (m *MockSegmentControllerClient) QuerySegmentRouteInfo(ctx context.Context, in *QuerySegmentRouteInfoRequest, opts ...grpc.CallOption) (*QuerySegmentRouteInfoResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "QuerySegmentRouteInfo", varargs...) + ret0, _ := ret[0].(*QuerySegmentRouteInfoResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QuerySegmentRouteInfo indicates an expected call of QuerySegmentRouteInfo. +func (mr *MockSegmentControllerClientMockRecorder) QuerySegmentRouteInfo(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QuerySegmentRouteInfo", reflect.TypeOf((*MockSegmentControllerClient)(nil).QuerySegmentRouteInfo), varargs...) +} + +// RegisterSegmentServer mocks base method. +func (m *MockSegmentControllerClient) RegisterSegmentServer(ctx context.Context, in *RegisterSegmentServerRequest, opts ...grpc.CallOption) (*RegisterSegmentServerResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RegisterSegmentServer", varargs...) + ret0, _ := ret[0].(*RegisterSegmentServerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RegisterSegmentServer indicates an expected call of RegisterSegmentServer. +func (mr *MockSegmentControllerClientMockRecorder) RegisterSegmentServer(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterSegmentServer", reflect.TypeOf((*MockSegmentControllerClient)(nil).RegisterSegmentServer), varargs...) +} + +// ReportSegmentBlockIsFull mocks base method. +func (m *MockSegmentControllerClient) ReportSegmentBlockIsFull(ctx context.Context, in *SegmentHeartbeatRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReportSegmentBlockIsFull", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportSegmentBlockIsFull indicates an expected call of ReportSegmentBlockIsFull. +func (mr *MockSegmentControllerClientMockRecorder) ReportSegmentBlockIsFull(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportSegmentBlockIsFull", reflect.TypeOf((*MockSegmentControllerClient)(nil).ReportSegmentBlockIsFull), varargs...) +} + +// ReportSegmentLeader mocks base method. +func (m *MockSegmentControllerClient) ReportSegmentLeader(ctx context.Context, in *ReportSegmentLeaderRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReportSegmentLeader", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportSegmentLeader indicates an expected call of ReportSegmentLeader. +func (mr *MockSegmentControllerClientMockRecorder) ReportSegmentLeader(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportSegmentLeader", reflect.TypeOf((*MockSegmentControllerClient)(nil).ReportSegmentLeader), varargs...) +} + +// SegmentHeartbeat mocks base method. +func (m *MockSegmentControllerClient) SegmentHeartbeat(ctx context.Context, opts ...grpc.CallOption) (SegmentController_SegmentHeartbeatClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SegmentHeartbeat", varargs...) + ret0, _ := ret[0].(SegmentController_SegmentHeartbeatClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SegmentHeartbeat indicates an expected call of SegmentHeartbeat. +func (mr *MockSegmentControllerClientMockRecorder) SegmentHeartbeat(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SegmentHeartbeat", reflect.TypeOf((*MockSegmentControllerClient)(nil).SegmentHeartbeat), varargs...) +} + +// UnregisterSegmentServer mocks base method. +func (m *MockSegmentControllerClient) UnregisterSegmentServer(ctx context.Context, in *UnregisterSegmentServerRequest, opts ...grpc.CallOption) (*UnregisterSegmentServerResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UnregisterSegmentServer", varargs...) + ret0, _ := ret[0].(*UnregisterSegmentServerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UnregisterSegmentServer indicates an expected call of UnregisterSegmentServer. +func (mr *MockSegmentControllerClientMockRecorder) UnregisterSegmentServer(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterSegmentServer", reflect.TypeOf((*MockSegmentControllerClient)(nil).UnregisterSegmentServer), varargs...) +} + +// MockSegmentController_SegmentHeartbeatClient is a mock of SegmentController_SegmentHeartbeatClient interface. +type MockSegmentController_SegmentHeartbeatClient struct { + ctrl *gomock.Controller + recorder *MockSegmentController_SegmentHeartbeatClientMockRecorder +} + +// MockSegmentController_SegmentHeartbeatClientMockRecorder is the mock recorder for MockSegmentController_SegmentHeartbeatClient. +type MockSegmentController_SegmentHeartbeatClientMockRecorder struct { + mock *MockSegmentController_SegmentHeartbeatClient +} + +// NewMockSegmentController_SegmentHeartbeatClient creates a new mock instance. +func NewMockSegmentController_SegmentHeartbeatClient(ctrl *gomock.Controller) *MockSegmentController_SegmentHeartbeatClient { + mock := &MockSegmentController_SegmentHeartbeatClient{ctrl: ctrl} + mock.recorder = &MockSegmentController_SegmentHeartbeatClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSegmentController_SegmentHeartbeatClient) EXPECT() *MockSegmentController_SegmentHeartbeatClientMockRecorder { + return m.recorder +} + +// CloseAndRecv mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) CloseAndRecv() (*SegmentHeartbeatResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseAndRecv") + ret0, _ := ret[0].(*SegmentHeartbeatResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CloseAndRecv indicates an expected call of CloseAndRecv. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) CloseAndRecv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAndRecv", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).CloseAndRecv)) +} + +// CloseSend mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).Header)) +} + +// RecvMsg mocks base method. +func (m_2 *MockSegmentController_SegmentHeartbeatClient) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).RecvMsg), m) +} + +// Send mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) Send(arg0 *SegmentHeartbeatRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockSegmentController_SegmentHeartbeatClient) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).SendMsg), m) +} + +// Trailer mocks base method. +func (m *MockSegmentController_SegmentHeartbeatClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockSegmentController_SegmentHeartbeatClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatClient)(nil).Trailer)) +} + +// MockSegmentControllerServer is a mock of SegmentControllerServer interface. +type MockSegmentControllerServer struct { + ctrl *gomock.Controller + recorder *MockSegmentControllerServerMockRecorder +} + +// MockSegmentControllerServerMockRecorder is the mock recorder for MockSegmentControllerServer. +type MockSegmentControllerServerMockRecorder struct { + mock *MockSegmentControllerServer +} + +// NewMockSegmentControllerServer creates a new mock instance. +func NewMockSegmentControllerServer(ctrl *gomock.Controller) *MockSegmentControllerServer { + mock := &MockSegmentControllerServer{ctrl: ctrl} + mock.recorder = &MockSegmentControllerServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSegmentControllerServer) EXPECT() *MockSegmentControllerServerMockRecorder { + return m.recorder +} + +// QuerySegmentRouteInfo mocks base method. +func (m *MockSegmentControllerServer) QuerySegmentRouteInfo(arg0 context.Context, arg1 *QuerySegmentRouteInfoRequest) (*QuerySegmentRouteInfoResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QuerySegmentRouteInfo", arg0, arg1) + ret0, _ := ret[0].(*QuerySegmentRouteInfoResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QuerySegmentRouteInfo indicates an expected call of QuerySegmentRouteInfo. +func (mr *MockSegmentControllerServerMockRecorder) QuerySegmentRouteInfo(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QuerySegmentRouteInfo", reflect.TypeOf((*MockSegmentControllerServer)(nil).QuerySegmentRouteInfo), arg0, arg1) +} + +// RegisterSegmentServer mocks base method. +func (m *MockSegmentControllerServer) RegisterSegmentServer(arg0 context.Context, arg1 *RegisterSegmentServerRequest) (*RegisterSegmentServerResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RegisterSegmentServer", arg0, arg1) + ret0, _ := ret[0].(*RegisterSegmentServerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RegisterSegmentServer indicates an expected call of RegisterSegmentServer. +func (mr *MockSegmentControllerServerMockRecorder) RegisterSegmentServer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterSegmentServer", reflect.TypeOf((*MockSegmentControllerServer)(nil).RegisterSegmentServer), arg0, arg1) +} + +// ReportSegmentBlockIsFull mocks base method. +func (m *MockSegmentControllerServer) ReportSegmentBlockIsFull(arg0 context.Context, arg1 *SegmentHeartbeatRequest) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReportSegmentBlockIsFull", arg0, arg1) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportSegmentBlockIsFull indicates an expected call of ReportSegmentBlockIsFull. +func (mr *MockSegmentControllerServerMockRecorder) ReportSegmentBlockIsFull(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportSegmentBlockIsFull", reflect.TypeOf((*MockSegmentControllerServer)(nil).ReportSegmentBlockIsFull), arg0, arg1) +} + +// ReportSegmentLeader mocks base method. +func (m *MockSegmentControllerServer) ReportSegmentLeader(arg0 context.Context, arg1 *ReportSegmentLeaderRequest) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReportSegmentLeader", arg0, arg1) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportSegmentLeader indicates an expected call of ReportSegmentLeader. +func (mr *MockSegmentControllerServerMockRecorder) ReportSegmentLeader(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportSegmentLeader", reflect.TypeOf((*MockSegmentControllerServer)(nil).ReportSegmentLeader), arg0, arg1) +} + +// SegmentHeartbeat mocks base method. +func (m *MockSegmentControllerServer) SegmentHeartbeat(arg0 SegmentController_SegmentHeartbeatServer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SegmentHeartbeat", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SegmentHeartbeat indicates an expected call of SegmentHeartbeat. +func (mr *MockSegmentControllerServerMockRecorder) SegmentHeartbeat(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SegmentHeartbeat", reflect.TypeOf((*MockSegmentControllerServer)(nil).SegmentHeartbeat), arg0) +} + +// UnregisterSegmentServer mocks base method. +func (m *MockSegmentControllerServer) UnregisterSegmentServer(arg0 context.Context, arg1 *UnregisterSegmentServerRequest) (*UnregisterSegmentServerResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UnregisterSegmentServer", arg0, arg1) + ret0, _ := ret[0].(*UnregisterSegmentServerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UnregisterSegmentServer indicates an expected call of UnregisterSegmentServer. +func (mr *MockSegmentControllerServerMockRecorder) UnregisterSegmentServer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterSegmentServer", reflect.TypeOf((*MockSegmentControllerServer)(nil).UnregisterSegmentServer), arg0, arg1) +} + +// MockSegmentController_SegmentHeartbeatServer is a mock of SegmentController_SegmentHeartbeatServer interface. +type MockSegmentController_SegmentHeartbeatServer struct { + ctrl *gomock.Controller + recorder *MockSegmentController_SegmentHeartbeatServerMockRecorder +} + +// MockSegmentController_SegmentHeartbeatServerMockRecorder is the mock recorder for MockSegmentController_SegmentHeartbeatServer. +type MockSegmentController_SegmentHeartbeatServerMockRecorder struct { + mock *MockSegmentController_SegmentHeartbeatServer +} + +// NewMockSegmentController_SegmentHeartbeatServer creates a new mock instance. +func NewMockSegmentController_SegmentHeartbeatServer(ctrl *gomock.Controller) *MockSegmentController_SegmentHeartbeatServer { + mock := &MockSegmentController_SegmentHeartbeatServer{ctrl: ctrl} + mock.recorder = &MockSegmentController_SegmentHeartbeatServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSegmentController_SegmentHeartbeatServer) EXPECT() *MockSegmentController_SegmentHeartbeatServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).Context)) +} + +// Recv mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) Recv() (*SegmentHeartbeatRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*SegmentHeartbeatRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m_2 *MockSegmentController_SegmentHeartbeatServer) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).RecvMsg), m) +} + +// SendAndClose mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) SendAndClose(arg0 *SegmentHeartbeatResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendAndClose", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendAndClose indicates an expected call of SendAndClose. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) SendAndClose(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAndClose", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).SendAndClose), arg0) +} + +// SendHeader mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockSegmentController_SegmentHeartbeatServer) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).SendMsg), m) +} + +// SetHeader mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockSegmentController_SegmentHeartbeatServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockSegmentController_SegmentHeartbeatServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockSegmentController_SegmentHeartbeatServer)(nil).SetTrailer), arg0) +} + +// MockTriggerControllerClient is a mock of TriggerControllerClient interface. +type MockTriggerControllerClient struct { + ctrl *gomock.Controller + recorder *MockTriggerControllerClientMockRecorder +} + +// MockTriggerControllerClientMockRecorder is the mock recorder for MockTriggerControllerClient. +type MockTriggerControllerClientMockRecorder struct { + mock *MockTriggerControllerClient +} + +// NewMockTriggerControllerClient creates a new mock instance. +func NewMockTriggerControllerClient(ctrl *gomock.Controller) *MockTriggerControllerClient { + mock := &MockTriggerControllerClient{ctrl: ctrl} + mock.recorder = &MockTriggerControllerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTriggerControllerClient) EXPECT() *MockTriggerControllerClientMockRecorder { + return m.recorder +} + +// CreateSubscription mocks base method. +func (m *MockTriggerControllerClient) CreateSubscription(ctx context.Context, in *CreateSubscriptionRequest, opts ...grpc.CallOption) (*meta.Subscription, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateSubscription", varargs...) + ret0, _ := ret[0].(*meta.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateSubscription indicates an expected call of CreateSubscription. +func (mr *MockTriggerControllerClientMockRecorder) CreateSubscription(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSubscription", reflect.TypeOf((*MockTriggerControllerClient)(nil).CreateSubscription), varargs...) +} + +// DeleteSubscription mocks base method. +func (m *MockTriggerControllerClient) DeleteSubscription(ctx context.Context, in *DeleteSubscriptionRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteSubscription", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteSubscription indicates an expected call of DeleteSubscription. +func (mr *MockTriggerControllerClientMockRecorder) DeleteSubscription(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubscription", reflect.TypeOf((*MockTriggerControllerClient)(nil).DeleteSubscription), varargs...) +} + +// GetSubscription mocks base method. +func (m *MockTriggerControllerClient) GetSubscription(ctx context.Context, in *GetSubscriptionRequest, opts ...grpc.CallOption) (*meta.Subscription, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetSubscription", varargs...) + ret0, _ := ret[0].(*meta.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSubscription indicates an expected call of GetSubscription. +func (mr *MockTriggerControllerClientMockRecorder) GetSubscription(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscription", reflect.TypeOf((*MockTriggerControllerClient)(nil).GetSubscription), varargs...) +} + +// ListSubscription mocks base method. +func (m *MockTriggerControllerClient) ListSubscription(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListSubscriptionResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListSubscription", varargs...) + ret0, _ := ret[0].(*ListSubscriptionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSubscription indicates an expected call of ListSubscription. +func (mr *MockTriggerControllerClientMockRecorder) ListSubscription(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSubscription", reflect.TypeOf((*MockTriggerControllerClient)(nil).ListSubscription), varargs...) +} + +// RegisterTriggerWorker mocks base method. +func (m *MockTriggerControllerClient) RegisterTriggerWorker(ctx context.Context, in *RegisterTriggerWorkerRequest, opts ...grpc.CallOption) (*RegisterTriggerWorkerResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RegisterTriggerWorker", varargs...) + ret0, _ := ret[0].(*RegisterTriggerWorkerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RegisterTriggerWorker indicates an expected call of RegisterTriggerWorker. +func (mr *MockTriggerControllerClientMockRecorder) RegisterTriggerWorker(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterTriggerWorker", reflect.TypeOf((*MockTriggerControllerClient)(nil).RegisterTriggerWorker), varargs...) +} + +// TriggerWorkerHeartbeat mocks base method. +func (m *MockTriggerControllerClient) TriggerWorkerHeartbeat(ctx context.Context, opts ...grpc.CallOption) (TriggerController_TriggerWorkerHeartbeatClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TriggerWorkerHeartbeat", varargs...) + ret0, _ := ret[0].(TriggerController_TriggerWorkerHeartbeatClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TriggerWorkerHeartbeat indicates an expected call of TriggerWorkerHeartbeat. +func (mr *MockTriggerControllerClientMockRecorder) TriggerWorkerHeartbeat(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerWorkerHeartbeat", reflect.TypeOf((*MockTriggerControllerClient)(nil).TriggerWorkerHeartbeat), varargs...) +} + +// UnregisterTriggerWorker mocks base method. +func (m *MockTriggerControllerClient) UnregisterTriggerWorker(ctx context.Context, in *UnregisterTriggerWorkerRequest, opts ...grpc.CallOption) (*UnregisterTriggerWorkerResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UnregisterTriggerWorker", varargs...) + ret0, _ := ret[0].(*UnregisterTriggerWorkerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UnregisterTriggerWorker indicates an expected call of UnregisterTriggerWorker. +func (mr *MockTriggerControllerClientMockRecorder) UnregisterTriggerWorker(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterTriggerWorker", reflect.TypeOf((*MockTriggerControllerClient)(nil).UnregisterTriggerWorker), varargs...) +} + +// MockTriggerController_TriggerWorkerHeartbeatClient is a mock of TriggerController_TriggerWorkerHeartbeatClient interface. +type MockTriggerController_TriggerWorkerHeartbeatClient struct { + ctrl *gomock.Controller + recorder *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder +} + +// MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder is the mock recorder for MockTriggerController_TriggerWorkerHeartbeatClient. +type MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder struct { + mock *MockTriggerController_TriggerWorkerHeartbeatClient +} + +// NewMockTriggerController_TriggerWorkerHeartbeatClient creates a new mock instance. +func NewMockTriggerController_TriggerWorkerHeartbeatClient(ctrl *gomock.Controller) *MockTriggerController_TriggerWorkerHeartbeatClient { + mock := &MockTriggerController_TriggerWorkerHeartbeatClient{ctrl: ctrl} + mock.recorder = &MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) EXPECT() *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder { + return m.recorder +} + +// CloseAndRecv mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) CloseAndRecv() (*TriggerWorkerHeartbeatResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseAndRecv") + ret0, _ := ret[0].(*TriggerWorkerHeartbeatResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CloseAndRecv indicates an expected call of CloseAndRecv. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) CloseAndRecv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAndRecv", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).CloseAndRecv)) +} + +// CloseSend mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).Header)) +} + +// RecvMsg mocks base method. +func (m_2 *MockTriggerController_TriggerWorkerHeartbeatClient) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).RecvMsg), m) +} + +// Send mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) Send(arg0 *TriggerWorkerHeartbeatRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockTriggerController_TriggerWorkerHeartbeatClient) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).SendMsg), m) +} + +// Trailer mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockTriggerController_TriggerWorkerHeartbeatClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatClient)(nil).Trailer)) +} + +// MockTriggerControllerServer is a mock of TriggerControllerServer interface. +type MockTriggerControllerServer struct { + ctrl *gomock.Controller + recorder *MockTriggerControllerServerMockRecorder +} + +// MockTriggerControllerServerMockRecorder is the mock recorder for MockTriggerControllerServer. +type MockTriggerControllerServerMockRecorder struct { + mock *MockTriggerControllerServer +} + +// NewMockTriggerControllerServer creates a new mock instance. +func NewMockTriggerControllerServer(ctrl *gomock.Controller) *MockTriggerControllerServer { + mock := &MockTriggerControllerServer{ctrl: ctrl} + mock.recorder = &MockTriggerControllerServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTriggerControllerServer) EXPECT() *MockTriggerControllerServerMockRecorder { + return m.recorder +} + +// CreateSubscription mocks base method. +func (m *MockTriggerControllerServer) CreateSubscription(arg0 context.Context, arg1 *CreateSubscriptionRequest) (*meta.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateSubscription", arg0, arg1) + ret0, _ := ret[0].(*meta.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateSubscription indicates an expected call of CreateSubscription. +func (mr *MockTriggerControllerServerMockRecorder) CreateSubscription(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSubscription", reflect.TypeOf((*MockTriggerControllerServer)(nil).CreateSubscription), arg0, arg1) +} + +// DeleteSubscription mocks base method. +func (m *MockTriggerControllerServer) DeleteSubscription(arg0 context.Context, arg1 *DeleteSubscriptionRequest) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteSubscription", arg0, arg1) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteSubscription indicates an expected call of DeleteSubscription. +func (mr *MockTriggerControllerServerMockRecorder) DeleteSubscription(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubscription", reflect.TypeOf((*MockTriggerControllerServer)(nil).DeleteSubscription), arg0, arg1) +} + +// GetSubscription mocks base method. +func (m *MockTriggerControllerServer) GetSubscription(arg0 context.Context, arg1 *GetSubscriptionRequest) (*meta.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscription", arg0, arg1) + ret0, _ := ret[0].(*meta.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSubscription indicates an expected call of GetSubscription. +func (mr *MockTriggerControllerServerMockRecorder) GetSubscription(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscription", reflect.TypeOf((*MockTriggerControllerServer)(nil).GetSubscription), arg0, arg1) +} + +// ListSubscription mocks base method. +func (m *MockTriggerControllerServer) ListSubscription(arg0 context.Context, arg1 *emptypb.Empty) (*ListSubscriptionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListSubscription", arg0, arg1) + ret0, _ := ret[0].(*ListSubscriptionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSubscription indicates an expected call of ListSubscription. +func (mr *MockTriggerControllerServerMockRecorder) ListSubscription(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSubscription", reflect.TypeOf((*MockTriggerControllerServer)(nil).ListSubscription), arg0, arg1) +} + +// RegisterTriggerWorker mocks base method. +func (m *MockTriggerControllerServer) RegisterTriggerWorker(arg0 context.Context, arg1 *RegisterTriggerWorkerRequest) (*RegisterTriggerWorkerResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RegisterTriggerWorker", arg0, arg1) + ret0, _ := ret[0].(*RegisterTriggerWorkerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RegisterTriggerWorker indicates an expected call of RegisterTriggerWorker. +func (mr *MockTriggerControllerServerMockRecorder) RegisterTriggerWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterTriggerWorker", reflect.TypeOf((*MockTriggerControllerServer)(nil).RegisterTriggerWorker), arg0, arg1) +} + +// TriggerWorkerHeartbeat mocks base method. +func (m *MockTriggerControllerServer) TriggerWorkerHeartbeat(arg0 TriggerController_TriggerWorkerHeartbeatServer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TriggerWorkerHeartbeat", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// TriggerWorkerHeartbeat indicates an expected call of TriggerWorkerHeartbeat. +func (mr *MockTriggerControllerServerMockRecorder) TriggerWorkerHeartbeat(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerWorkerHeartbeat", reflect.TypeOf((*MockTriggerControllerServer)(nil).TriggerWorkerHeartbeat), arg0) +} + +// UnregisterTriggerWorker mocks base method. +func (m *MockTriggerControllerServer) UnregisterTriggerWorker(arg0 context.Context, arg1 *UnregisterTriggerWorkerRequest) (*UnregisterTriggerWorkerResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UnregisterTriggerWorker", arg0, arg1) + ret0, _ := ret[0].(*UnregisterTriggerWorkerResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UnregisterTriggerWorker indicates an expected call of UnregisterTriggerWorker. +func (mr *MockTriggerControllerServerMockRecorder) UnregisterTriggerWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterTriggerWorker", reflect.TypeOf((*MockTriggerControllerServer)(nil).UnregisterTriggerWorker), arg0, arg1) +} + +// MockTriggerController_TriggerWorkerHeartbeatServer is a mock of TriggerController_TriggerWorkerHeartbeatServer interface. +type MockTriggerController_TriggerWorkerHeartbeatServer struct { + ctrl *gomock.Controller + recorder *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder +} + +// MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder is the mock recorder for MockTriggerController_TriggerWorkerHeartbeatServer. +type MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder struct { + mock *MockTriggerController_TriggerWorkerHeartbeatServer +} + +// NewMockTriggerController_TriggerWorkerHeartbeatServer creates a new mock instance. +func NewMockTriggerController_TriggerWorkerHeartbeatServer(ctrl *gomock.Controller) *MockTriggerController_TriggerWorkerHeartbeatServer { + mock := &MockTriggerController_TriggerWorkerHeartbeatServer{ctrl: ctrl} + mock.recorder = &MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) EXPECT() *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).Context)) +} + +// Recv mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) Recv() (*TriggerWorkerHeartbeatRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*TriggerWorkerHeartbeatRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m_2 *MockTriggerController_TriggerWorkerHeartbeatServer) RecvMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "RecvMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) RecvMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).RecvMsg), m) +} + +// SendAndClose mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) SendAndClose(arg0 *TriggerWorkerHeartbeatResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendAndClose", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendAndClose indicates an expected call of SendAndClose. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) SendAndClose(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAndClose", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).SendAndClose), arg0) +} + +// SendHeader mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m_2 *MockTriggerController_TriggerWorkerHeartbeatServer) SendMsg(m interface{}) error { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "SendMsg", m) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) SendMsg(m interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).SendMsg), m) +} + +// SetHeader mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockTriggerController_TriggerWorkerHeartbeatServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockTriggerController_TriggerWorkerHeartbeatServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockTriggerController_TriggerWorkerHeartbeatServer)(nil).SetTrailer), arg0) +} diff --git a/vsctl/command/event.go b/vsctl/command/event.go index 43aeaee4e..6415962aa 100644 --- a/vsctl/command/event.go +++ b/vsctl/command/event.go @@ -28,11 +28,9 @@ import ( ce "github.com/cloudevents/sdk-go/v2" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/fatih/color" - "github.com/golang/protobuf/ptypes/empty" "github.com/google/uuid" "github.com/jedib0t/go-pretty/v6/table" "github.com/jedib0t/go-pretty/v6/text" - ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" "github.com/spf13/cobra" ) @@ -101,20 +99,6 @@ func putEventCommand() *cobra.Command { return cmd } -func mustGetGatewayEndpoint(cmd *cobra.Command) string { - ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) - defer func() { - _ = grpcConn.Close() - }() - cli := ctrlpb.NewPingServerClient(grpcConn) - res, err := cli.Ping(ctx, &empty.Empty{}) - if err != nil { - cmdFailedf(cmd, "get Gateway endpoint from controller failed: %s", err) - } - return res.GatewayAddr -} - func sendOne(cmd *cobra.Command, ctx context.Context, ceClient ce.Client) { event := ce.NewEvent() if eventID == "" { diff --git a/vsctl/command/eventbus.go b/vsctl/command/eventbus.go index b523dc546..621cedcab 100644 --- a/vsctl/command/eventbus.go +++ b/vsctl/command/eventbus.go @@ -51,7 +51,7 @@ func createEventbusCommand() *cobra.Command { cmdFailedf(cmd, "the --name flag MUST be set") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -92,7 +92,7 @@ func deleteEventbusCommand() *cobra.Command { cmdFailedf(cmd, "the --name flag MUST be set") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -137,7 +137,7 @@ func getEventbusInfoCommand() *cobra.Command { buses = strings.Split(eventbus, ",") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -306,7 +306,7 @@ func listEventbusInfoCommand() *cobra.Command { Short: "list the eventbus", Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() diff --git a/vsctl/command/global.go b/vsctl/command/global.go index 4f78a4e55..cbae5b036 100644 --- a/vsctl/command/global.go +++ b/vsctl/command/global.go @@ -21,23 +21,18 @@ import ( ) type GlobalFlags struct { - Endpoints []string + Endpoint string Debug bool ConfigFile string OutputFormat string } -func mustEndpointsFromCmd(cmd *cobra.Command) []string { - eps, err := cmd.Flags().GetStringSlice("endpoints") +func mustGetGatewayEndpoint(cmd *cobra.Command) string { + endpoint, err := cmd.Flags().GetString("endpoint") if err != nil { - cmdFailedf(cmd, "get controller endpoints failed: %s", err) + cmdFailedf(cmd, "get gateway endpoint failed: %s", err) } - if err == nil { - for i, ip := range eps { - eps[i] = strings.TrimSpace(ip) - } - } - return eps + return endpoint } func isOutputFormatJSON(cmd *cobra.Command) bool { diff --git a/vsctl/command/grpc.go b/vsctl/command/grpc.go index dc16f8a85..06adddeb0 100644 --- a/vsctl/command/grpc.go +++ b/vsctl/command/grpc.go @@ -16,56 +16,26 @@ package command import ( "context" + "fmt" + "strconv" "strings" "time" "github.com/fatih/color" - ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/types/known/emptypb" ) -func mustGetLeaderControllerGRPCConn(ctx context.Context, - cmd *cobra.Command) *grpc.ClientConn { - endpoints := mustEndpointsFromCmd(cmd) - var leaderAddr string - var leaderConn *grpc.ClientConn - tryConnectLeaderOnce := false - for idx := range endpoints { - conn := createGRPCConn(ctx, endpoints[idx]) - if conn == nil { - continue - } - pingClient := ctrlpb.NewPingServerClient(conn) - res, err := pingClient.Ping(ctx, &emptypb.Empty{}) - if err != nil { - color.Red("ping controller: %s failed", endpoints[idx]) - continue - } - leaderAddr = res.LeaderAddr - if leaderAddr == endpoints[idx] { - leaderConn = conn - tryConnectLeaderOnce = false - } else { - _ = conn.Close() - } - break - } - - if leaderAddr == "" { - cmdFailedf(cmd, "the leader controller not found") +func mustGetControllerProxyConn(ctx context.Context, cmd *cobra.Command) *grpc.ClientConn { + splits := strings.Split(mustGetGatewayEndpoint(cmd), ":") + port, err := strconv.Atoi(splits[1]) + if err != nil { + cmdFailedf(cmd, "parsing gateway port failed") } - - if leaderConn != nil { - return leaderConn - } else if !tryConnectLeaderOnce { - leaderConn = createGRPCConn(ctx, mappingLeaderAddr(leaderAddr)) - } - + leaderConn := createGRPCConn(ctx, fmt.Sprintf("%s:%d", splits[0], port+2)) if leaderConn == nil { - cmdFailedf(cmd, "connect to leader: %s failed", leaderAddr) + cmdFailedf(cmd, "failed to connect to gateway") } return leaderConn } @@ -101,11 +71,3 @@ func createGRPCConn(ctx context.Context, addr string) *grpc.ClientConn { } return conn } - -func mappingLeaderAddr(addr string) string { - m := map[string]string{ - "vanus-controller-0.vanus-controller.vanus.svc:2048": "192.168.49.2:32000", - "vanus-controller-1.vanus-controller.vanus.svc:2048": "192.168.49.2:32100", - "vanus-controller-2.vanus-controller.vanus.svc:2048": "192.168.49.2:32200"} - return m[addr] -} diff --git a/vsctl/command/meta.go b/vsctl/command/meta.go index db2988721..f86d2bf7c 100644 --- a/vsctl/command/meta.go +++ b/vsctl/command/meta.go @@ -49,7 +49,7 @@ func getControllerTopology() *cobra.Command { Short: "get topology", Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() diff --git a/vsctl/command/subscription.go b/vsctl/command/subscription.go index cc0782c00..5c757ebe2 100644 --- a/vsctl/command/subscription.go +++ b/vsctl/command/subscription.go @@ -52,7 +52,7 @@ func createSubscriptionCommand() *cobra.Command { cmdFailedWithHelpNotice(cmd, "sink name can't be empty\n") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -126,7 +126,7 @@ func deleteSubscriptionCommand() *cobra.Command { cmdFailedWithHelpNotice(cmd, "subscriptionID name can't be empty\n") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -168,7 +168,7 @@ func getSubscriptionCommand() *cobra.Command { cmdFailedWithHelpNotice(cmd, "subscriptionID name can't be empty\n") } ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() @@ -212,7 +212,7 @@ func listSubscriptionCommand() *cobra.Command { Short: "list the subscription ", Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - grpcConn := mustGetLeaderControllerGRPCConn(ctx, cmd) + grpcConn := mustGetControllerProxyConn(ctx, cmd) defer func() { _ = grpcConn.Close() }() diff --git a/vsctl/main.go b/vsctl/main.go index f894fab4b..49cabe82e 100644 --- a/vsctl/main.go +++ b/vsctl/main.go @@ -16,12 +16,10 @@ package main import ( - "os" - "strings" - "github.com/fatih/color" "github.com/linkall-labs/vanus/vsctl/command" "github.com/spf13/cobra" + "os" ) const ( @@ -41,8 +39,8 @@ var ( func init() { cobra.EnablePrefixMatching = true cobra.EnableCommandSorting = false - rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", - []string{"127.0.0.1:2048"}, "the endpoints of vanus controller") + rootCmd.PersistentFlags().StringVar(&globalFlags.Endpoint, "endpoint", + "127.0.0.1:30001", "the endpoints of vanus controller") rootCmd.PersistentFlags().StringVarP(&globalFlags.ConfigFile, "config", "C", "~/.vanus/vanus.yml", "the config file of vsctl") rootCmd.PersistentFlags().BoolVarP(&globalFlags.Debug, "debug", "D", false, @@ -50,8 +48,8 @@ func init() { rootCmd.PersistentFlags().StringVar(&globalFlags.OutputFormat, "output-format", "table", "json or table") - if os.Getenv("VANUS_ENDPOINTS") != "" { - globalFlags.Endpoints = strings.Split(os.Getenv("VANUS_ENDPOINTS"), ",") + if os.Getenv("VANUS_GATEWAY") != "" { + globalFlags.Endpoint = os.Getenv("VANUS_GATEWAY") } rootCmd.AddCommand(