Skip to content

Commit 26b4ab0

Browse files
author
Chao Xu
committed
Add a unit test for ha proxy server
1 parent f330515 commit 26b4ab0

File tree

7 files changed

+318
-54
lines changed

7 files changed

+318
-54
lines changed

cmd/agent/main.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,21 @@ type GrpcProxyAgentOptions struct {
6464
proxyServerHost string
6565
proxyServerPort int
6666

67-
agentID string
68-
syncInterval int
67+
agentID string
68+
syncInterval int
69+
probeInterval int
70+
reconnectInterval int
71+
}
72+
73+
func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOption grpc.DialOption) *agentclient.ClientSetConfig {
74+
return &agentclient.ClientSetConfig{
75+
Address: fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort),
76+
AgentID: o.agentID,
77+
SyncInterval: time.Duration(o.syncInterval) * time.Second,
78+
ProbeInterval: time.Duration(o.probeInterval) * time.Second,
79+
ReconnectInterval: time.Duration(o.reconnectInterval) * time.Second,
80+
DialOption: dialOption,
81+
}
6982
}
7083

7184
func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
@@ -77,6 +90,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
7790
flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.")
7891
flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.")
7992
flags.IntVar(&o.syncInterval, "sync-interval", o.syncInterval, "The seconds by which the agent periodically checks that it has connections to all instances of the proxy server.")
93+
flags.IntVar(&o.probeInterval, "probe-interval", o.probeInterval, "The seconds by which the agent periodically checks if its connections to the proxy server are ready.")
94+
flags.IntVar(&o.reconnectInterval, "reconnect-interval", o.reconnectInterval, "The seconds by which the agent tries to reconnect.")
8095
return flags
8196
}
8297

@@ -88,6 +103,8 @@ func (o *GrpcProxyAgentOptions) Print() {
88103
klog.Warningf("ProxyServerPort set to %d.\n", o.proxyServerPort)
89104
klog.Warningf("AgentID set to %s.\n", o.agentID)
90105
klog.Warningf("SyncInterval set to %d.\n", o.syncInterval)
106+
klog.Warningf("ProbeInterval set to %d.\n", o.probeInterval)
107+
klog.Warningf("ReconnectInterval set to %d.\n", o.reconnectInterval)
91108
}
92109

93110
func (o *GrpcProxyAgentOptions) Validate() error {
@@ -120,13 +137,15 @@ func (o *GrpcProxyAgentOptions) Validate() error {
120137

121138
func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
122139
o := GrpcProxyAgentOptions{
123-
agentCert: "",
124-
agentKey: "",
125-
caCert: "",
126-
proxyServerHost: "127.0.0.1",
127-
proxyServerPort: 8091,
128-
agentID: uuid.New().String(),
129-
syncInterval: 5,
140+
agentCert: "",
141+
agentKey: "",
142+
caCert: "",
143+
proxyServerHost: "127.0.0.1",
144+
proxyServerPort: 8091,
145+
agentID: uuid.New().String(),
146+
syncInterval: 5,
147+
probeInterval: 5,
148+
reconnectInterval: 5,
130149
}
131150
return &o
132151
}
@@ -173,7 +192,8 @@ func (a *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
173192
return err
174193
}
175194
dialOption := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
176-
cs := agentclient.NewAgentClientSet(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), o.agentID, time.Duration(o.syncInterval)*time.Second, dialOption)
195+
cc := o.ClientSetConfig(dialOption)
196+
cs := cc.NewAgentClientSet()
177197
cs.Serve()
178198

179199
return nil

pkg/agent/agentclient/clientset.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"google.golang.org/grpc"
26+
"google.golang.org/grpc/connectivity"
2627
"k8s.io/klog"
2728
)
2829

@@ -40,14 +41,30 @@ type ClientSet struct {
4041
syncInterval time.Duration // The interval by which the agent
4142
// periodically checks that it has connections to all instances of the
4243
// proxy server.
44+
probeInterval time.Duration // The interval by which the agent
45+
// periodically checks if its connections to the proxy server is ready.
46+
reconnectInterval time.Duration // The interval by which the agent
47+
// tries to reconnect.
4348
dialOption grpc.DialOption
4449
}
4550

46-
func (cs *ClientSet) clientsCount() int {
51+
func (cs *ClientSet) ClientsCount() int {
4752
cs.mu.Lock()
4853
defer cs.mu.Unlock()
4954
return len(cs.clients)
5055
}
56+
func (cs *ClientSet) HealthyClientsCount() int {
57+
cs.mu.Lock()
58+
defer cs.mu.Unlock()
59+
var count int
60+
for _, c := range cs.clients {
61+
if c.stream.conn.GetState() == connectivity.Ready {
62+
count++
63+
}
64+
}
65+
return count
66+
67+
}
5168

5269
func (cs *ClientSet) hasIDLocked(serverID string) bool {
5370
_, ok := cs.clients[serverID]
@@ -81,14 +98,26 @@ func (cs *ClientSet) RemoveClient(serverID string) {
8198
delete(cs.clients, serverID)
8299
}
83100

84-
func NewAgentClientSet(address, agentID string, syncInterval time.Duration, dialOption grpc.DialOption) *ClientSet {
101+
type ClientSetConfig struct {
102+
Address string
103+
AgentID string
104+
SyncInterval time.Duration
105+
ProbeInterval time.Duration
106+
ReconnectInterval time.Duration
107+
DialOption grpc.DialOption
108+
}
109+
110+
func (cc *ClientSetConfig) NewAgentClientSet() *ClientSet {
85111
return &ClientSet{
86-
clients: make(map[string]*AgentClient),
87-
agentID: agentID,
88-
address: address,
89-
syncInterval: syncInterval,
90-
dialOption: dialOption,
112+
clients: make(map[string]*AgentClient),
113+
agentID: cc.AgentID,
114+
address: cc.Address,
115+
syncInterval: cc.SyncInterval,
116+
probeInterval: cc.ProbeInterval,
117+
reconnectInterval: cc.ReconnectInterval,
118+
dialOption: cc.DialOption,
91119
}
120+
92121
}
93122

94123
func (cs *ClientSet) newAgentClient() (*AgentClient, error) {
@@ -103,7 +132,7 @@ func (cs *ClientSet) sync() {
103132
sleep := cs.syncInterval + time.Duration(rand.Float64()*jitter*float64(cs.syncInterval))
104133
time.Sleep(sleep)
105134
}
106-
if cs.serverCount == 0 || cs.clientsCount() < cs.serverCount {
135+
if cs.serverCount == 0 || cs.ClientsCount() < cs.serverCount {
107136
c, err := cs.newAgentClient()
108137
if err != nil {
109138
klog.Error(err)

pkg/agent/agentclient/stream.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
)
3434

3535
const (
36-
defaultInterval = 5 * time.Second
36+
defaultReconnectInterval = 5 * time.Second
3737
)
3838

3939
type ReconnectError struct {
@@ -71,8 +71,8 @@ type RedialableAgentClient struct {
7171
recvLock sync.Mutex
7272
reconnLock sync.Mutex
7373

74-
// Interval between every reconnect
75-
Interval time.Duration
74+
reconnectInterval time.Duration // interval between recoonects
75+
probeInterval time.Duration // interval between probe pings
7676
}
7777

7878
func copyRedialableAgentClient(in RedialableAgentClient) RedialableAgentClient {
@@ -88,12 +88,13 @@ func copyRedialableAgentClient(in RedialableAgentClient) RedialableAgentClient {
8888

8989
func NewRedialableAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOption) (*RedialableAgentClient, error) {
9090
c := &RedialableAgentClient{
91-
cs: cs,
92-
address: address,
93-
agentID: agentID,
94-
opts: opts,
95-
Interval: defaultInterval,
96-
stopCh: make(chan struct{}),
91+
cs: cs,
92+
address: address,
93+
agentID: agentID,
94+
opts: opts,
95+
probeInterval: cs.probeInterval,
96+
reconnectInterval: cs.reconnectInterval,
97+
stopCh: make(chan struct{}),
9798
}
9899
serverID, err := c.Connect()
99100
if err != nil {
@@ -108,7 +109,7 @@ func (c *RedialableAgentClient) probe() {
108109
select {
109110
case <-c.stopCh:
110111
return
111-
case <-time.After(c.Interval):
112+
case <-time.After(c.probeInterval):
112113
if c.conn == nil {
113114
continue
114115
}
@@ -253,8 +254,8 @@ func (c *RedialableAgentClient) reconnect() {
253254
r, err := c.tryConnect()
254255
if err != nil {
255256
retry++
256-
klog.Infof("Failed to connect to proxy server, retry %d in %v: %v", retry, c.Interval, err)
257-
time.Sleep(c.Interval)
257+
klog.Infof("Failed to connect to proxy server, retry %d in %v: %v", retry, c.reconnectInterval, err)
258+
time.Sleep(c.reconnectInterval)
258259
continue
259260
}
260261
switch {
@@ -272,8 +273,8 @@ func (c *RedialableAgentClient) reconnect() {
272273
klog.Infof("failed to close connection to %s: %v", r.serverID, err)
273274
}
274275
retry++
275-
klog.Infof("Trying to reconnect to proxy server %s, got connected to proxy server %s, for which there is already a connection, retry %d in %v", c.serverID, r.serverID, retry, c.Interval)
276-
time.Sleep(c.Interval)
276+
klog.Infof("Trying to reconnect to proxy server %s, got connected to proxy server %s, for which there is already a connection, retry %d in %v", c.serverID, r.serverID, retry, c.reconnectInterval)
277+
time.Sleep(c.reconnectInterval)
277278
case r.serverID != c.serverID && !c.cs.HasID(r.serverID):
278279
// create a new client
279280
cc := copyRedialableAgentClient(*c)
@@ -287,8 +288,8 @@ func (c *RedialableAgentClient) reconnect() {
287288
}
288289
go ac.Serve()
289290
retry++
290-
klog.Infof("Trying to reconnect to proxy server %s, got connected to proxy server %s. We will add this connection to the client set, but keep retrying connecting to proxy server %s, retry %d in %v", c.serverID, r.serverID, c.serverID, retry, c.Interval)
291-
time.Sleep(c.Interval)
291+
klog.Infof("Trying to reconnect to proxy server %s, got connected to proxy server %s. We will add this connection to the client set, but keep retrying connecting to proxy server %s, retry %d in %v", c.serverID, r.serverID, c.serverID, retry, c.reconnectInterval)
292+
time.Sleep(c.reconnectInterval)
292293
}
293294
}
294295

tests/concurrent_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ func TestProxy_Concurrency(t *testing.T) {
2727
}
2828
defer cleanup()
2929

30-
if err := runAgent(proxy.agent, stopCh); err != nil {
31-
t.Fatal(err)
32-
}
30+
runAgent(proxy.agent, stopCh)
3331

3432
// Wait for agent to register on proxy server
3533
time.Sleep(time.Second)

0 commit comments

Comments
 (0)