Skip to content

Commit 0af05b1

Browse files
author
Chao Xu
committed
Add ClientSet to handle HA proxy servers behind a proxy.
The ClientSet contains clients for each proxy server instance. When a connection breaks, the client tries to connect to the same server instance. The ClientSet has a sync loop to make sure that the number of clients is greater than the number of proxy servers.
1 parent edfd341 commit 0af05b1

File tree

6 files changed

+368
-51
lines changed

6 files changed

+368
-51
lines changed

cmd/agent/main.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import (
2222
"fmt"
2323
"net/http"
2424
"os"
25+
"time"
2526

27+
"github.com/google/uuid"
2628
"github.com/prometheus/client_golang/prometheus"
2729
"github.com/spf13/cobra"
2830
"github.com/spf13/pflag"
@@ -61,6 +63,9 @@ type GrpcProxyAgentOptions struct {
6163
// Configuration for connecting to the proxy-server
6264
proxyServerHost string
6365
proxyServerPort int
66+
67+
agentID string
68+
syncInterval int
6469
}
6570

6671
func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
@@ -70,6 +75,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
7075
flags.StringVar(&o.caCert, "ca-cert", o.caCert, "If non-empty the CAs we use to validate clients.")
7176
flags.StringVar(&o.proxyServerHost, "proxy-server-host", o.proxyServerHost, "The hostname to use to connect to the proxy-server.")
7277
flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.")
78+
flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.")
79+
flags.IntVar(&o.syncInterval, "sync-interval", o.syncInterval, "The seconds by which the agent periodically checks that it has connections to all instances of the proxy server.")
7380
return flags
7481
}
7582

@@ -79,6 +86,8 @@ func (o *GrpcProxyAgentOptions) Print() {
7986
klog.Warningf("CACert set to \"%s\".\n", o.caCert)
8087
klog.Warningf("ProxyServerHost set to \"%s\".\n", o.proxyServerHost)
8188
klog.Warningf("ProxyServerPort set to %d.\n", o.proxyServerPort)
89+
klog.Warningf("AgentID set to %s.\n", o.agentID)
90+
klog.Warningf("SyncInterval set to %d.\n", o.syncInterval)
8291
}
8392

8493
func (o *GrpcProxyAgentOptions) Validate() error {
@@ -116,6 +125,8 @@ func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
116125
caCert: "",
117126
proxyServerHost: "127.0.0.1",
118127
proxyServerPort: 8091,
128+
agentID: uuid.New().String(),
129+
syncInterval: 5,
119130
}
120131
return &o
121132
}
@@ -162,14 +173,8 @@ func (a *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
162173
return err
163174
}
164175
dialOption := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
165-
client, err := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
166-
if err != nil {
167-
return err
168-
}
169-
170-
stopCh := make(chan struct{})
171-
172-
go client.Serve(stopCh)
176+
cs := agentclient.NewAgentClientSet(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), o.agentID, time.Duration(o.syncInterval)*time.Second, dialOption)
177+
cs.Serve()
173178

174179
return nil
175180
}

pkg/agent/agentclient/client.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,33 @@ type AgentClient struct {
3333
nextConnID int64
3434
connContext map[int64]*connContext
3535

36-
stream *RedialableAgentClient
36+
stream *RedialableAgentClient
37+
serverID string // the proxy server this client connects to
38+
serverCount int // number of proxy servers, 1 if server is not HA.
39+
stopCh <-chan struct{}
3740
}
3841

39-
// NewAgentClient creates an AgentClient
40-
func NewAgentClient(address string, opts ...grpc.DialOption) (*AgentClient, error) {
41-
stream, err := NewRedialableAgentClient(address, opts...)
42+
func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, error) {
43+
stream, err := NewRedialableAgentClient(address, agentID, cs, opts...)
4244
if err != nil {
4345
return nil, err
4446
}
47+
return newAgentClientWithRedialableAgentClient(stream), nil
48+
}
4549

46-
a := &AgentClient{
50+
func newAgentClientWithRedialableAgentClient(rac *RedialableAgentClient) *AgentClient {
51+
return &AgentClient{
4752
connContext: make(map[int64]*connContext),
48-
stream: stream,
53+
stream: rac,
54+
serverID: rac.serverID,
55+
serverCount: rac.serverCount,
56+
stopCh: rac.stopCh,
4957
}
50-
return a, nil
58+
}
59+
60+
// Close closes the underlying stream.
61+
func (c *AgentClient) Close() {
62+
c.stream.Close()
5163
}
5264

5365
// connContext tracks a connection from agent to node network.
@@ -75,10 +87,12 @@ func (c *connContext) cleanup() {
7587
// gRPC stream. Successful Connect is required before Serve. The
7688
// The requests include things like opening a connection to a server,
7789
// streaming data and close the connection.
78-
func (a *AgentClient) Serve(stopCh <-chan struct{}) {
90+
func (a *AgentClient) Serve() {
91+
klog.Infof("Start serving for serverID %s", a.serverID)
92+
go a.stream.probe()
7993
for {
8094
select {
81-
case <-stopCh:
95+
case <-a.stopCh:
8296
klog.Info("stop agent client.")
8397
return
8498
default:

pkg/agent/agentclient/clientset.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agentclient
18+
19+
import (
20+
"fmt"
21+
"math/rand"
22+
"sync"
23+
"time"
24+
25+
"google.golang.org/grpc"
26+
"k8s.io/klog"
27+
)
28+
29+
// ClientSet consists of clients connected to each instance of an HA proxy server.
30+
type ClientSet struct {
31+
mu sync.Mutex //protects the following
32+
clients map[string]*AgentClient // map between serverID and the client
33+
// connects to this server.
34+
35+
agentID string // ID of this agent
36+
address string // proxy server address. Assuming HA proxy server
37+
serverCount int // number of proxy server instances, should be 1
38+
// unless it is an HA server. Initialized when the ClientSet creates
39+
// the first client.
40+
syncInterval time.Duration // The interval by which the agent
41+
// periodically checks that it has connections to all instances of the
42+
// proxy server.
43+
dialOption grpc.DialOption
44+
}
45+
46+
func (cs *ClientSet) clientsCount() int {
47+
cs.mu.Lock()
48+
defer cs.mu.Unlock()
49+
return len(cs.clients)
50+
}
51+
52+
func (cs *ClientSet) hasIDLocked(serverID string) bool {
53+
_, ok := cs.clients[serverID]
54+
return ok
55+
}
56+
57+
func (cs *ClientSet) HasID(serverID string) bool {
58+
cs.mu.Lock()
59+
defer cs.mu.Unlock()
60+
return cs.hasIDLocked(serverID)
61+
}
62+
63+
func (cs *ClientSet) addClientLocked(serverID string, c *AgentClient) error {
64+
if cs.hasIDLocked(serverID) {
65+
return fmt.Errorf("client for proxy server %s already exists", serverID)
66+
}
67+
cs.clients[serverID] = c
68+
return nil
69+
70+
}
71+
72+
func (cs *ClientSet) AddClient(serverID string, c *AgentClient) error {
73+
cs.mu.Lock()
74+
defer cs.mu.Unlock()
75+
return cs.addClientLocked(serverID, c)
76+
}
77+
78+
func (cs *ClientSet) RemoveClient(serverID string) {
79+
cs.mu.Lock()
80+
defer cs.mu.Unlock()
81+
delete(cs.clients, serverID)
82+
}
83+
84+
func NewAgentClientSet(address, agentID string, syncInterval time.Duration, dialOption grpc.DialOption) *ClientSet {
85+
return &ClientSet{
86+
clients: make(map[string]*AgentClient),
87+
agentID: agentID,
88+
address: address,
89+
syncInterval: syncInterval,
90+
dialOption: dialOption,
91+
}
92+
}
93+
94+
func (cs *ClientSet) newAgentClient() (*AgentClient, error) {
95+
return newAgentClient(cs.address, cs.agentID, cs, cs.dialOption)
96+
}
97+
98+
// sync makes sure that #clients >= #proxy servers
99+
func (cs *ClientSet) sync() {
100+
jitter := float64(0.2)
101+
for {
102+
if cs.serverCount != 0 {
103+
sleep := cs.syncInterval + time.Duration(rand.Float64()*jitter*float64(cs.syncInterval))
104+
time.Sleep(sleep)
105+
}
106+
if cs.serverCount == 0 || cs.clientsCount() < cs.serverCount {
107+
c, err := cs.newAgentClient()
108+
if err != nil {
109+
klog.Error(err)
110+
continue
111+
}
112+
cs.serverCount = c.serverCount
113+
if err := cs.AddClient(c.serverID, c); err != nil {
114+
klog.Infof("closing connection: %v", err)
115+
c.Close()
116+
continue
117+
}
118+
klog.Infof("sync added client connecting to proxy server %s", c.serverID)
119+
go c.Serve()
120+
}
121+
}
122+
}
123+
124+
func (cs *ClientSet) Serve() {
125+
go cs.sync()
126+
}

0 commit comments

Comments
 (0)