Skip to content

Commit 94bd4ac

Browse files
authored
Merge pull request #584 from jkh52/agent-drain
Agent drain: implement agent side. (Remaining: server)
2 parents 13c2a46 + f7a7f0c commit 94bd4ac

File tree

11 files changed

+397
-82
lines changed

11 files changed

+397
-82
lines changed

cmd/agent/app/server.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ import (
2424
"net"
2525
"net/http"
2626
"net/http/pprof"
27+
"os"
28+
"os/signal"
2729
"runtime"
2830
runpprof "runtime/pprof"
2931
"strconv"
3032
"strings"
33+
"syscall"
3134
"time"
3235

3336
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -49,8 +52,8 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command
4952
Use: "agent",
5053
Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`,
5154
RunE: func(cmd *cobra.Command, args []string) error {
52-
stopCh := make(chan struct{})
53-
return a.Run(o, stopCh)
55+
drainCh, stopCh := SetupSignalHandler()
56+
return a.Run(o, drainCh, stopCh)
5457
},
5558
}
5659

@@ -64,13 +67,13 @@ type Agent struct {
6467
cs *agent.ClientSet
6568
}
6669

67-
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error {
70+
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, drainCh, stopCh <-chan struct{}) error {
6871
o.Print()
6972
if err := o.Validate(); err != nil {
7073
return fmt.Errorf("failed to validate agent options with %v", err)
7174
}
7275

73-
cs, err := a.runProxyConnection(o, stopCh)
76+
cs, err := a.runProxyConnection(o, drainCh, stopCh)
7477
if err != nil {
7578
return fmt.Errorf("failed to run proxy connection with %v", err)
7679
}
@@ -92,7 +95,31 @@ func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) er
9295
return nil
9396
}
9497

95-
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (*agent.ClientSet, error) {
98+
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
99+
100+
func SetupSignalHandler() (drainCh, stopCh <-chan struct{}) {
101+
drain := make(chan struct{})
102+
stop := make(chan struct{})
103+
c := make(chan os.Signal, 2)
104+
signal.Notify(c, shutdownSignals...)
105+
labels := runpprof.Labels(
106+
"core", "signalHandler",
107+
)
108+
go runpprof.Do(context.Background(), labels, func(context.Context) { handleSignals(c, drain, stop) })
109+
110+
return drain, stop
111+
}
112+
113+
func handleSignals(signalCh chan os.Signal, drainCh, stopCh chan struct{}) {
114+
s := <-signalCh
115+
klog.V(2).InfoS("Received first signal", "signal", s)
116+
close(drainCh)
117+
s = <-signalCh
118+
klog.V(2).InfoS("Received second signal", "signal", s)
119+
close(stopCh)
120+
}
121+
122+
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, stopCh <-chan struct{}) (*agent.ClientSet, error) {
96123
var tlsConfig *tls.Config
97124
var err error
98125
if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil {
@@ -106,7 +133,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-ch
106133
}),
107134
}
108135
cc := o.ClientSetConfig(dialOptions...)
109-
cs := cc.NewAgentClientSet(stopCh)
136+
cs := cc.NewAgentClientSet(drainCh, stopCh)
110137
cs.Serve()
111138

112139
return cs, nil

konnectivity-client/proto/client/client.pb.go

Lines changed: 109 additions & 36 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

konnectivity-client/proto/client/client.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ enum PacketType {
3030
CLOSE_RSP = 3;
3131
DATA = 4;
3232
DIAL_CLS = 5;
33+
DRAIN = 6;
3334
}
3435

3536
message Packet {
@@ -42,6 +43,7 @@ message Packet {
4243
CloseRequest closeRequest = 5;
4344
CloseResponse closeResponse = 6;
4445
CloseDial closeDial = 7;
46+
Drain drain = 8;
4547
}
4648
}
4749

@@ -85,6 +87,11 @@ message CloseDial {
8587
int64 random = 1;
8688
}
8789

90+
message Drain {
91+
// A hint from an Agent to Server that it is pending termination.
92+
// A Server should prefer non-draining agents for new dials.
93+
}
94+
8895
message Data {
8996
// connectID to connect to
9097
int64 connectID = 1;

0 commit comments

Comments
 (0)