Skip to content

Commit dfa7cd1

Browse files
committed
Agent supports reconnect
When the dialback is interrupted, the agent connects back to the proxy server and continue to serve the proxy requests. Fixes #21
1 parent 8ebdad5 commit dfa7cd1

File tree

3 files changed

+162
-29
lines changed

3 files changed

+162
-29
lines changed

cmd/agent/main.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import (
3030
"github.com/spf13/pflag"
3131
"google.golang.org/grpc"
3232
"google.golang.org/grpc/credentials"
33+
"k8s.io/klog"
3334
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/agentclient"
3435
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
35-
"k8s.io/klog"
3636
)
3737

3838
func main() {
@@ -113,9 +113,9 @@ func (o *GrpcProxyAgentOptions) Validate() error {
113113

114114
func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
115115
o := GrpcProxyAgentOptions{
116-
agentCert: "",
117-
agentKey: "",
118-
caCert: "",
116+
agentCert: "",
117+
agentKey: "",
118+
caCert: "",
119119
proxyServerHost: "127.0.0.1",
120120
proxyServerPort: 8091,
121121
}
@@ -178,11 +178,7 @@ func (p *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
178178
RootCAs: certPool,
179179
})
180180
dialOption := grpc.WithTransportCredentials(transportCreds)
181-
client := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort))
182-
183-
if err := client.Connect(dialOption); err != nil {
184-
return fmt.Errorf("failed to connect to proxy-server: %v", err)
185-
}
181+
client := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
186182

187183
stopCh := make(chan struct{})
188184

pkg/agent/agentclient/client.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package agentclient
1818

1919
import (
20-
"context"
2120
"io"
2221
"net"
2322
"sync"
@@ -33,16 +32,15 @@ import (
3332
type AgentClient struct {
3433
nextConnID int64
3534
connContext map[int64]*connContext
36-
address string
3735

38-
stream agent.AgentService_ConnectClient
36+
stream *RedialableAgentClient
3937
}
4038

4139
// NewAgentClient creates an AgentClient
42-
func NewAgentClient(address string) *AgentClient {
40+
func NewAgentClient(address string, opts ...grpc.DialOption) *AgentClient {
4341
a := &AgentClient{
4442
connContext: make(map[int64]*connContext),
45-
address: address,
43+
stream: NewRedialableAgentClient(address, opts...),
4644
}
4745

4846
return a
@@ -68,21 +66,6 @@ func (c *connContext) cleanup() {
6866
//
6967
// The caller needs to call Serve to start serving proxy requests
7068
// coming from proxy server.
71-
func (a *AgentClient) Connect(opts ...grpc.DialOption) error {
72-
c, err := grpc.Dial(a.address, opts...)
73-
if err != nil {
74-
return err
75-
}
76-
77-
client := agent.NewAgentServiceClient(c)
78-
79-
a.stream, err = client.Connect(context.Background())
80-
if err != nil {
81-
return err
82-
}
83-
84-
return nil
85-
}
8669

8770
// Serve starts to serve proxied requests from proxy server over the
8871
// gRPC stream. Successful Connect is required before Serve. The

pkg/agent/agentclient/stream.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package agentclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/connectivity"
11+
"k8s.io/klog"
12+
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
13+
)
14+
15+
const (
16+
defaultRetry = 20
17+
defaultInterval = 5 * time.Second
18+
)
19+
20+
type RedialableAgentClient struct {
21+
stream agent.AgentService_ConnectClient
22+
23+
// connect opts
24+
address string
25+
opts []grpc.DialOption
26+
conn *grpc.ClientConn
27+
stopCh chan struct{}
28+
reconnTrigger chan struct{}
29+
30+
// locks
31+
sendLock sync.Mutex
32+
recvLock sync.Mutex
33+
reconnLock sync.Mutex
34+
35+
// Retry times to reconnect to proxy server
36+
Retry int
37+
38+
// Interval between every reconnect
39+
Interval time.Duration
40+
}
41+
42+
func NewRedialableAgentClient(address string, opts ...grpc.DialOption) *RedialableAgentClient {
43+
c := &RedialableAgentClient{
44+
address: address,
45+
opts: opts,
46+
Retry: defaultRetry,
47+
Interval: defaultInterval,
48+
stopCh: make(chan struct{}),
49+
}
50+
51+
c.reconnect()
52+
53+
go c.probe()
54+
55+
return c
56+
}
57+
58+
func (c *RedialableAgentClient) probe() {
59+
for {
60+
select {
61+
case <-c.stopCh:
62+
return
63+
case <-time.After(c.Interval):
64+
// health check
65+
if c.conn != nil && c.conn.GetState() == connectivity.Ready {
66+
continue
67+
} else {
68+
klog.Infof("Connection state %v", c.conn.GetState())
69+
}
70+
}
71+
72+
c.reconnect()
73+
}
74+
}
75+
76+
func (c *RedialableAgentClient) Send(pkt *agent.Packet) error {
77+
c.sendLock.Lock()
78+
defer c.sendLock.Unlock()
79+
80+
if err := c.stream.Send(pkt); err != nil {
81+
if c.conn.GetState() == connectivity.Ready {
82+
return err
83+
}
84+
85+
if err2 := c.reconnect(); err2 != nil {
86+
return err2
87+
}
88+
}
89+
90+
return c.stream.Send(pkt)
91+
}
92+
93+
func (c *RedialableAgentClient) Recv() (*agent.Packet, error) {
94+
c.recvLock.Lock()
95+
defer c.recvLock.Unlock()
96+
97+
// this just get block..
98+
if pkt, err := c.stream.Recv(); err != nil {
99+
klog.Infof("error recving: %v", err)
100+
klog.Info("start reconnecting")
101+
102+
if err2 := c.reconnect(); err2 != nil {
103+
klog.Infof("reconnect failed: %v", err2)
104+
return pkt, err2
105+
}
106+
107+
return c.stream.Recv()
108+
} else {
109+
return pkt, err
110+
}
111+
}
112+
113+
func (c *RedialableAgentClient) reconnect() error {
114+
c.reconnLock.Lock()
115+
defer c.reconnLock.Unlock()
116+
117+
if c.conn != nil && c.conn.GetState() == connectivity.Ready {
118+
return nil
119+
}
120+
121+
klog.Info("start to connect...")
122+
123+
var err error
124+
var retry int
125+
126+
for retry < c.Retry {
127+
if err = c.tryConnect(); err == nil {
128+
klog.Info("connected")
129+
return nil
130+
}
131+
retry++
132+
klog.V(5).Infof("Failed to connect to proxy server, retry %d in %v: %v", retry, c.Interval, err)
133+
time.Sleep(c.Interval)
134+
}
135+
136+
return fmt.Errorf("Failed to connect to proxy server: %v", err)
137+
}
138+
139+
func (c *RedialableAgentClient) tryConnect() error {
140+
var err error
141+
142+
c.conn, err = grpc.Dial(c.address, c.opts...)
143+
if err != nil {
144+
return err
145+
}
146+
147+
c.stream, err = agent.NewAgentServiceClient(c.conn).Connect(context.Background())
148+
return err
149+
}
150+
151+
// interrupt interrupt the stream connection. (For testing purpose)
152+
func (c *RedialableAgentClient) interrupt() {
153+
c.conn.Close()
154+
}

0 commit comments

Comments
 (0)