Skip to content

Commit dc8c349

Browse files
committed
Start,stop,exit the grpc server gracefully
1 parent 8fc7c83 commit dc8c349

File tree

4 files changed

+43
-23
lines changed

4 files changed

+43
-23
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ WORKDIR /root/
2525
RUN apk add gcompat
2626
COPY --from=consul-dataplane-binary /cdp/consul-dataplane /usr/local/bin/consul-dataplane
2727
COPY --from=envoy-binary /usr/local/bin/envoy /usr/local/bin/envoy
28-
ENTRYPOINT [ "./consul-dataplane" ]
28+
ENTRYPOINT [ "consul-dataplane" ]

pkg/consuldp/bootstrap.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ const (
2525

2626
// bootstrapConfig generates the Envoy bootstrap config in JSON format.
2727
func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) {
28-
cdpFullAddr := strings.Split(cdp.gRPCListener.Addr().String(), ":")
29-
cdpAddr := cdpFullAddr[0]
30-
cdpPort := cdpFullAddr[1]
28+
cdpGRPCFullAddr := strings.Split(cdp.gRPCServer.listener.Addr().String(), ":")
29+
cdpGRPCAddr := cdpGRPCFullAddr[0]
30+
cdpGRPCPort := cdpGRPCFullAddr[1]
3131
svc := cdp.cfg.Service
3232
envoy := cdp.cfg.Envoy
3333

@@ -54,9 +54,8 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error)
5454

5555
args := &bootstrap.BootstrapTplArgs{
5656
GRPC: bootstrap.GRPC{
57-
// For now we just give the server address directly.
58-
AgentAddress: cdpAddr,
59-
AgentPort: cdpPort,
57+
AgentAddress: cdpGRPCAddr,
58+
AgentPort: cdpGRPCPort,
6059
AgentTLS: false,
6160
},
6261
ProxyCluster: rsp.Service,

pkg/consuldp/consul_dataplane.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,23 @@ type consulServer struct {
2525
// supportedFeatures is a map of the dataplane features supported by the Consul server
2626
supportedFeatures map[pbdataplane.DataplaneFeatures]bool
2727

28+
// grpcClientConn is the gRPC connection to the Consul server
2829
grpcClientConn *grpc.ClientConn
2930
}
3031

32+
type gRPCServer struct {
33+
listener net.Listener
34+
server *grpc.Server
35+
exitedCh chan struct{}
36+
}
37+
3138
// ConsulDataplane represents the consul-dataplane process
3239
type ConsulDataplane struct {
3340
logger hclog.Logger
3441
cfg *Config
3542
consulServer *consulServer
3643
dpServiceClient pbdataplane.DataplaneServiceClient
37-
38-
gRPCListener net.Listener
39-
gRPCServer *grpc.Server
44+
gRPCServer *gRPCServer
4045
}
4146

4247
// NewConsulDP creates a new instance of ConsulDataplane
@@ -137,6 +142,7 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
137142
return err
138143
}
139144
defer grpcClientConn.Close()
145+
// TODO (NET-148): Ensure the server connection here is the one acquired via the server discovery library
140146
cdp.consulServer.grpcClientConn = grpcClientConn
141147
cdp.logger.Info("connected to consul server over grpc", "grpc-target", gRPCTarget)
142148

@@ -184,9 +190,16 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
184190
if err := proxy.Stop(); err != nil {
185191
cdp.logger.Error("failed to stop proxy", "error", err)
186192
}
193+
cdp.stopGRPCServer()
187194
doneCh <- nil
188195
case <-proxy.Exited():
196+
cdp.stopGRPCServer()
189197
doneCh <- errors.New("envoy proxy exited unexpectedly")
198+
case <-cdp.gRPCServerExited():
199+
if err := proxy.Stop(); err != nil {
200+
cdp.logger.Error("failed to stop proxy", "error", err)
201+
}
202+
doneCh <- errors.New("gRPC server exited unexpectedly")
190203
}
191204
}()
192205
return <-doneCh

pkg/consuldp/grpc.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,44 @@ func (cdp *ConsulDataplane) setupGRPCServer() error {
2323
cdp.logger.Error("failed to create gRPC/TCP listener: %v", err)
2424
return err
2525
}
26-
cdp.gRPCListener = lis
2726

2827
// create gRPC server
2928
// one main role of this gRPC server in consul-dataplane is to proxy envoy ADS requests
3029
// to the connected Consul server.
3130
director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
31+
if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") {
32+
return ctx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName))
33+
}
34+
3235
md, _ := metadata.FromIncomingContext(ctx)
3336
mdCopy := md.Copy()
3437
// TODO (NET-148): Inject the ACL token acquired from the server discovery library
3538
mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token}
3639
outCtx := metadata.NewOutgoingContext(ctx, mdCopy)
37-
if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") {
38-
return outCtx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName))
39-
}
40-
// TODO (NET-148): Ensure the server connection here is the one acquired via the server discovery library
4140
return outCtx, cdp.consulServer.grpcClientConn, nil
4241
}
43-
gRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
44-
cdp.gRPCServer = gRPCServer
42+
newGRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
4543

46-
cdp.logger.Info("created gRPC server", "address", lis.Addr().String())
44+
cdp.gRPCServer = &gRPCServer{listener: lis, server: newGRPCServer, exitedCh: make(chan struct{})}
45+
cdp.logger.Trace("created gRPC server", "address", lis.Addr().String())
4746
return nil
4847
}
4948

5049
func (cdp *ConsulDataplane) startGRPCServer() {
51-
cdp.logger.Trace("starting gRPC server")
50+
cdp.logger.Info("starting gRPC server", "address", cdp.gRPCServer.listener.Addr().String())
5251

53-
if err := cdp.gRPCServer.Serve(cdp.gRPCListener); err != nil {
54-
cdp.logger.Error("failed to serve gRPC requests: %v", err)
55-
cdp.gRPCListener.Close()
56-
// TODO: gracefully exit
52+
if err := cdp.gRPCServer.server.Serve(cdp.gRPCServer.listener); err != nil {
53+
cdp.logger.Error("failed to serve gRPC requests", "error", err)
54+
cdp.gRPCServer.listener.Close()
55+
close(cdp.gRPCServer.exitedCh)
5756
}
5857
}
58+
59+
func (cdp *ConsulDataplane) stopGRPCServer() {
60+
if cdp.gRPCServer != nil {
61+
cdp.logger.Debug("stopping gRPC server")
62+
cdp.gRPCServer.server.Stop()
63+
}
64+
}
65+
66+
func (cdp *ConsulDataplane) gRPCServerExited() chan struct{} { return cdp.gRPCServer.exitedCh }

0 commit comments

Comments
 (0)