Skip to content

Commit f7a7f0c

Browse files
committed
Agent drain: implement first half.
- Agent sends drain signal to Server. - Server only logs it.
1 parent d302a0a commit f7a7f0c

File tree

7 files changed

+165
-10
lines changed

7 files changed

+165
-10
lines changed

cmd/agent/app/server.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ import (
2424
"net"
2525
"net/http"
2626
"net/http/pprof"
27+
"os"
28+
"os/signal"
2729
"runtime"
2830
runpprof "runtime/pprof"
2931
"strconv"
3032
"strings"
33+
"syscall"
3134
"time"
3235

3336
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -49,8 +52,8 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command
4952
Use: "agent",
5053
Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`,
5154
RunE: func(cmd *cobra.Command, args []string) error {
52-
stopCh := make(chan struct{})
53-
return a.Run(o, stopCh)
55+
drainCh, stopCh := SetupSignalHandler()
56+
return a.Run(o, drainCh, stopCh)
5457
},
5558
}
5659

@@ -64,13 +67,13 @@ type Agent struct {
6467
cs *agent.ClientSet
6568
}
6669

67-
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error {
70+
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, drainCh, stopCh <-chan struct{}) error {
6871
o.Print()
6972
if err := o.Validate(); err != nil {
7073
return fmt.Errorf("failed to validate agent options with %v", err)
7174
}
7275

73-
cs, err := a.runProxyConnection(o, stopCh)
76+
cs, err := a.runProxyConnection(o, drainCh, stopCh)
7477
if err != nil {
7578
return fmt.Errorf("failed to run proxy connection with %v", err)
7679
}
@@ -92,7 +95,31 @@ func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) er
9295
return nil
9396
}
9497

95-
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (*agent.ClientSet, error) {
98+
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
99+
100+
func SetupSignalHandler() (drainCh, stopCh <-chan struct{}) {
101+
drain := make(chan struct{})
102+
stop := make(chan struct{})
103+
c := make(chan os.Signal, 2)
104+
signal.Notify(c, shutdownSignals...)
105+
labels := runpprof.Labels(
106+
"core", "signalHandler",
107+
)
108+
go runpprof.Do(context.Background(), labels, func(context.Context) { handleSignals(c, drain, stop) })
109+
110+
return drain, stop
111+
}
112+
113+
func handleSignals(signalCh chan os.Signal, drainCh, stopCh chan struct{}) {
114+
s := <-signalCh
115+
klog.V(2).InfoS("Received first signal", "signal", s)
116+
close(drainCh)
117+
s = <-signalCh
118+
klog.V(2).InfoS("Received second signal", "signal", s)
119+
close(stopCh)
120+
}
121+
122+
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, stopCh <-chan struct{}) (*agent.ClientSet, error) {
96123
var tlsConfig *tls.Config
97124
var err error
98125
if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil {
@@ -106,7 +133,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-ch
106133
}),
107134
}
108135
cc := o.ClientSetConfig(dialOptions...)
109-
cs := cc.NewAgentClientSet(stopCh)
136+
cs := cc.NewAgentClientSet(drainCh, stopCh)
110137
cs.Serve()
111138

112139
return cs, nil

pkg/agent/client.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ type Client struct {
137137
address string
138138
opts []grpc.DialOption
139139
conn *grpc.ClientConn
140-
stopCh chan struct{}
140+
141+
drainCh <-chan struct{}
142+
drainOnce sync.Once
143+
stopCh chan struct{}
144+
141145
// locks
142146
sendLock sync.Mutex
143147
recvLock sync.Mutex
@@ -158,6 +162,7 @@ func newAgentClient(address, agentID, agentIdentifiers string, cs *ClientSet, op
158162
agentIdentifiers: agentIdentifiers,
159163
opts: opts,
160164
probeInterval: cs.probeInterval,
165+
drainCh: cs.drainCh,
161166
stopCh: make(chan struct{}),
162167
serviceAccountTokenPath: cs.serviceAccountTokenPath,
163168
connManager: newConnectionManager(),
@@ -325,6 +330,19 @@ func (a *Client) Serve() {
325330
case <-a.stopCh:
326331
klog.V(2).InfoS("stop agent client.")
327332
return
333+
case <-a.drainCh:
334+
a.drainOnce.Do(func() {
335+
klog.V(2).InfoS("drain agent client", "serverID", a.serverID, "agentID", a.agentID)
336+
drainPkt := &client.Packet{
337+
Type: client.PacketType_DRAIN,
338+
Payload: &client.Packet_Drain{
339+
Drain: &client.Drain{},
340+
},
341+
}
342+
if err := a.Send(drainPkt); err != nil {
343+
klog.ErrorS(err, "drain failure", "")
344+
}
345+
})
328346
default:
329347
}
330348

pkg/agent/client_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,40 @@ func TestFailedSend_DialResp_GRPC(t *testing.T) {
343343
}()
344344
}
345345

346+
func TestDrain(t *testing.T) {
347+
var stream agent.AgentService_ConnectClient
348+
drainCh := make(chan struct{})
349+
stopCh := make(chan struct{})
350+
cs := &ClientSet{
351+
clients: make(map[string]*Client),
352+
drainCh: drainCh,
353+
stopCh: stopCh,
354+
}
355+
testClient := &Client{
356+
connManager: newConnectionManager(),
357+
drainCh: drainCh,
358+
stopCh: stopCh,
359+
cs: cs,
360+
}
361+
testClient.stream, stream = pipe()
362+
363+
// Start agent
364+
go testClient.Serve()
365+
defer close(stopCh)
366+
367+
// Simulate pod first shutdown signal
368+
close(drainCh)
369+
370+
// Expect to receive DRAIN packet from (Agent) Client
371+
pkt, err := stream.Recv()
372+
if err != nil {
373+
t.Fatal(err)
374+
}
375+
if pkt.Type != client.PacketType_DRAIN {
376+
t.Errorf("expect PacketType_DRAIN; got %v", pkt.Type)
377+
}
378+
}
379+
346380
// fakeStream implements AgentService_ConnectClient
347381
type fakeStream struct {
348382
grpc.ClientStream

pkg/agent/clientset.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type ClientSet struct {
5252
dialOptions []grpc.DialOption
5353
// file path contains service account token
5454
serviceAccountTokenPath string
55+
// channel to signal that the agent is pending termination.
56+
drainCh <-chan struct{}
5557
// channel to signal shutting down the client set. Primarily for test.
5658
stopCh <-chan struct{}
5759

@@ -141,7 +143,7 @@ type ClientSetConfig struct {
141143
SyncForever bool
142144
}
143145

144-
func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet {
146+
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
145147
return &ClientSet{
146148
clients: make(map[string]*Client),
147149
agentID: cc.AgentID,
@@ -154,6 +156,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
154156
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
155157
warnOnChannelLimit: cc.WarnOnChannelLimit,
156158
syncForever: cc.SyncForever,
159+
drainCh: drainCh,
157160
stopCh: stopCh,
158161
}
159162
}

pkg/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,8 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, agentID string, recvCh <
990990
klog.V(5).InfoS("CLOSE_RSP sent to frontend", "connectionID", resp.ConnectID)
991991
}
992992

993+
case client.PacketType_DRAIN:
994+
klog.V(2).InfoS("agent is draining", "agentID", agentID)
993995
default:
994996
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
995997
}

tests/framework/agent.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"path/filepath"
2828
"strconv"
2929
"sync"
30+
"syscall"
3031
"testing"
3132
"time"
3233

@@ -53,6 +54,7 @@ type AgentRunner interface {
5354
type Agent interface {
5455
GetConnectedServerCount() (int, error)
5556
Ready() bool
57+
Drain()
5658
Stop()
5759
Metrics() metricstest.AgentTester
5860
}
@@ -66,9 +68,10 @@ func (*InProcessAgentRunner) Start(t testing.TB, opts AgentOpts) (Agent, error)
6668
}
6769

6870
ctx, cancel := context.WithCancel(context.Background())
71+
drainCh := make(chan struct{})
6972
stopCh := make(chan struct{})
7073
go func() {
71-
if err := a.Run(o, stopCh); err != nil {
74+
if err := a.Run(o, drainCh, stopCh); err != nil {
7275
log.Printf("ERROR running agent: %v", err)
7376
cancel()
7477
}
@@ -84,6 +87,7 @@ func (*InProcessAgentRunner) Start(t testing.TB, opts AgentOpts) (Agent, error)
8487

8588
pa := &inProcessAgent{
8689
client: a.ClientSet(),
90+
drainCh: drainCh,
8791
stopCh: stopCh,
8892
healthAddr: healthAddr,
8993
}
@@ -94,12 +98,21 @@ func (*InProcessAgentRunner) Start(t testing.TB, opts AgentOpts) (Agent, error)
9498
type inProcessAgent struct {
9599
client *agent.ClientSet
96100

101+
drainOnce sync.Once
102+
drainCh chan struct{}
103+
97104
stopOnce sync.Once
98105
stopCh chan struct{}
99106

100107
healthAddr string
101108
}
102109

110+
func (a *inProcessAgent) Drain() {
111+
a.drainOnce.Do(func() {
112+
close(a.drainCh)
113+
})
114+
}
115+
103116
func (a *inProcessAgent) Stop() {
104117
a.stopOnce.Do(func() {
105118
close(a.stopCh)
@@ -160,7 +173,16 @@ type externalAgent struct {
160173
cmd *exec.Cmd
161174
metrics *metricstest.Tester
162175

163-
stopOnce sync.Once
176+
drainOnce sync.Once
177+
stopOnce sync.Once
178+
}
179+
180+
func (a *externalAgent) Drain() {
181+
a.drainOnce.Do(func() {
182+
if err := a.cmd.Process.Signal(syscall.SIGTERM); err != nil {
183+
log.Fatalf("Error draining agent process: %v", err)
184+
}
185+
})
164186
}
165187

166188
func (a *externalAgent) Stop() {

tests/proxy_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,55 @@ func TestFailedDial_HTTPCONN(t *testing.T) {
789789
resetAllMetrics() // For clean shutdown.
790790
}
791791

792+
func TestProxyHandle_AfterDrain(t *testing.T) {
793+
expectCleanShutdown(t)
794+
795+
server := httptest.NewServer(newEchoServer("hello"))
796+
defer server.Close()
797+
798+
ps := runGRPCProxyServer(t)
799+
defer ps.Stop()
800+
801+
a := runAgent(t, ps.AgentAddr())
802+
defer a.Stop()
803+
waitForConnectedServerCount(t, 1, a)
804+
805+
// Drain agent
806+
a.Drain()
807+
808+
ctx, cancel := context.WithCancel(context.Background())
809+
defer cancel()
810+
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
811+
if err != nil {
812+
t.Fatal(err)
813+
}
814+
815+
c := &http.Client{
816+
Transport: &http.Transport{
817+
DialContext: tunnel.DialContext,
818+
},
819+
}
820+
821+
req, err := http.NewRequest("GET", server.URL, nil)
822+
if err != nil {
823+
t.Fatal(err)
824+
}
825+
826+
r, err := c.Do(req)
827+
if err != nil {
828+
t.Fatal(err)
829+
}
830+
defer r.Body.Close()
831+
832+
data, err := io.ReadAll(r.Body)
833+
if err != nil {
834+
t.Fatal(err)
835+
}
836+
if string(data) != "hello" {
837+
t.Errorf("expect %v; got %v", "hello", string(data))
838+
}
839+
}
840+
792841
func runGRPCProxyServer(t testing.TB) framework.ProxyServer {
793842
return runGRPCProxyServerWithServerCount(t, 1)
794843
}

0 commit comments

Comments
 (0)