Skip to content

Commit 2336254

Browse files
authored
Merge pull request #46 from caesarxuchao/agent-connect-to-all-servers
Make proxy work even if proxy server is HA and is behind a LB
2 parents edfd341 + 1aa2e5f commit 2336254

File tree

17 files changed

+918
-132
lines changed

17 files changed

+918
-132
lines changed

cmd/agent/main.go

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import (
2222
"fmt"
2323
"net/http"
2424
"os"
25+
"time"
2526

27+
"github.com/google/uuid"
2628
"github.com/prometheus/client_golang/prometheus"
2729
"github.com/spf13/cobra"
2830
"github.com/spf13/pflag"
@@ -61,6 +63,22 @@ type GrpcProxyAgentOptions struct {
6163
// Configuration for connecting to the proxy-server
6264
proxyServerHost string
6365
proxyServerPort int
66+
67+
agentID string
68+
syncInterval int
69+
probeInterval int
70+
reconnectInterval int
71+
}
72+
73+
func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOption grpc.DialOption) *agentclient.ClientSetConfig {
74+
return &agentclient.ClientSetConfig{
75+
Address: fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort),
76+
AgentID: o.agentID,
77+
SyncInterval: time.Duration(o.syncInterval) * time.Second,
78+
ProbeInterval: time.Duration(o.probeInterval) * time.Second,
79+
ReconnectInterval: time.Duration(o.reconnectInterval) * time.Second,
80+
DialOption: dialOption,
81+
}
6482
}
6583

6684
func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
@@ -70,6 +88,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
7088
flags.StringVar(&o.caCert, "ca-cert", o.caCert, "If non-empty the CAs we use to validate clients.")
7189
flags.StringVar(&o.proxyServerHost, "proxy-server-host", o.proxyServerHost, "The hostname to use to connect to the proxy-server.")
7290
flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.")
91+
flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.")
92+
flags.IntVar(&o.syncInterval, "sync-interval", o.syncInterval, "The seconds by which the agent periodically checks that it has connections to all instances of the proxy server.")
93+
flags.IntVar(&o.probeInterval, "probe-interval", o.probeInterval, "The seconds by which the agent periodically checks if its connections to the proxy server are ready.")
94+
flags.IntVar(&o.reconnectInterval, "reconnect-interval", o.reconnectInterval, "The seconds by which the agent tries to reconnect.")
7395
return flags
7496
}
7597

@@ -79,6 +101,10 @@ func (o *GrpcProxyAgentOptions) Print() {
79101
klog.Warningf("CACert set to \"%s\".\n", o.caCert)
80102
klog.Warningf("ProxyServerHost set to \"%s\".\n", o.proxyServerHost)
81103
klog.Warningf("ProxyServerPort set to %d.\n", o.proxyServerPort)
104+
klog.Warningf("AgentID set to %s.\n", o.agentID)
105+
klog.Warningf("SyncInterval set to %d seconds.\n", o.syncInterval)
106+
klog.Warningf("ProbeInterval set to %d.\n", o.probeInterval)
107+
klog.Warningf("ReconnectInterval set to %d.\n", o.reconnectInterval)
82108
}
83109

84110
func (o *GrpcProxyAgentOptions) Validate() error {
@@ -111,11 +137,15 @@ func (o *GrpcProxyAgentOptions) Validate() error {
111137

112138
func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
113139
o := GrpcProxyAgentOptions{
114-
agentCert: "",
115-
agentKey: "",
116-
caCert: "",
117-
proxyServerHost: "127.0.0.1",
118-
proxyServerPort: 8091,
140+
agentCert: "",
141+
agentKey: "",
142+
caCert: "",
143+
proxyServerHost: "127.0.0.1",
144+
proxyServerPort: 8091,
145+
agentID: uuid.New().String(),
146+
syncInterval: 5,
147+
probeInterval: 5,
148+
reconnectInterval: 5,
119149
}
120150
return &o
121151
}
@@ -162,14 +192,9 @@ func (a *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
162192
return err
163193
}
164194
dialOption := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
165-
client, err := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
166-
if err != nil {
167-
return err
168-
}
169-
170-
stopCh := make(chan struct{})
171-
172-
go client.Serve(stopCh)
195+
cc := o.ClientSetConfig(dialOption)
196+
cs := cc.NewAgentClientSet()
197+
cs.Serve()
173198

174199
return nil
175200
}

cmd/proxy/main.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ import (
2323
"flag"
2424
"fmt"
2525
"io/ioutil"
26-
"k8s.io/klog"
2726
"net"
2827
"net/http"
2928
"os"
3029
"os/signal"
3130
"syscall"
3231

32+
"k8s.io/klog"
33+
34+
"github.com/google/uuid"
3335
"github.com/prometheus/client_golang/prometheus"
3436
"github.com/spf13/cobra"
3537
"github.com/spf13/pflag"
@@ -79,6 +81,11 @@ type ProxyRunOptions struct {
7981
agentPort uint
8082
// Port we listen for admin connections on.
8183
adminPort uint
84+
85+
// ID of this server.
86+
serverID string
87+
// Number of proxy server instances, should be 1 unless it is a HA server.
88+
serverCount uint
8289
}
8390

8491
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -94,6 +101,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
94101
flags.UintVar(&o.serverPort, "server-port", o.serverPort, "Port we listen for server connections on. Set to 0 for UDS.")
95102
flags.UintVar(&o.agentPort, "agent-port", o.agentPort, "Port we listen for agent connections on.")
96103
flags.UintVar(&o.adminPort, "admin-port", o.adminPort, "Port we listen for admin connections on.")
104+
flags.StringVar(&o.serverID, "server-id", o.serverID, "The unique ID of this server.")
105+
flags.UintVar(&o.serverCount, "server-count", o.serverCount, "The number of proxy server instances, should be 1 unless it is an HA server.")
97106
return flags
98107
}
99108

@@ -109,6 +118,8 @@ func (o *ProxyRunOptions) Print() {
109118
klog.Warningf("Server port set to %d.\n", o.serverPort)
110119
klog.Warningf("Agent port set to %d.\n", o.agentPort)
111120
klog.Warningf("Admin port set to %d.\n", o.adminPort)
121+
klog.Warningf("ServerID set to %s.\n", o.serverID)
122+
klog.Warningf("ServerCount set to %d.\n", o.serverCount)
112123
}
113124

114125
func (o *ProxyRunOptions) Validate() error {
@@ -207,6 +218,8 @@ func newProxyRunOptions() *ProxyRunOptions {
207218
serverPort: 8090,
208219
agentPort: 8091,
209220
adminPort: 8092,
221+
serverID: uuid.New().String(),
222+
serverCount: 1,
210223
}
211224
return &o
212225
}
@@ -234,7 +247,7 @@ func (p *Proxy) run(o *ProxyRunOptions) error {
234247
return fmt.Errorf("failed to validate server options with %v", err)
235248
}
236249
ctx, cancel := context.WithCancel(context.Background())
237-
server := agentserver.NewProxyServer()
250+
server := agentserver.NewProxyServer(o.serverID, int(o.serverCount))
238251

239252
klog.Info("Starting master server for client connections.")
240253
masterStop, err := p.runMasterServer(ctx, o, server)

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ go 1.12
55
require (
66
github.com/beorn7/perks v1.0.0 // indirect
77
github.com/golang/protobuf v1.3.2
8+
github.com/google/uuid v1.1.1
89
github.com/inconshreveable/mousetrap v1.0.0 // indirect
910
github.com/prometheus/client_golang v0.9.2
1011
github.com/prometheus/common v0.4.0 // indirect
1112
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 // indirect
1213
github.com/spf13/cobra v0.0.3
1314
github.com/spf13/pflag v1.0.3
15+
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
1416
golang.org/x/sys v0.0.0-20190225065934-cc5685c2db12 // indirect
17+
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
1518
google.golang.org/grpc v1.26.0
19+
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
1620
k8s.io/klog v0.3.0
1721
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
2121
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
2222
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
2323
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
24-
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
2524
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
26-
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
25+
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
26+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
2727
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
2828
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
2929
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=

pkg/agent/agentclient/client.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,32 @@ type AgentClient struct {
3434
connContext map[int64]*connContext
3535

3636
stream *RedialableAgentClient
37+
stopCh <-chan struct{}
3738
}
3839

39-
// NewAgentClient creates an AgentClient
40-
func NewAgentClient(address string, opts ...grpc.DialOption) (*AgentClient, error) {
41-
stream, err := NewRedialableAgentClient(address, opts...)
40+
func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOption) (*AgentClient, error) {
41+
stream, err := NewRedialableAgentClient(address, agentID, cs, opts...)
4242
if err != nil {
4343
return nil, err
4444
}
45+
return newAgentClientWithRedialableAgentClient(stream), nil
46+
}
4547

46-
a := &AgentClient{
48+
func newAgentClientWithRedialableAgentClient(rac *RedialableAgentClient) *AgentClient {
49+
return &AgentClient{
4750
connContext: make(map[int64]*connContext),
48-
stream: stream,
51+
stream: rac,
52+
stopCh: rac.stopCh,
53+
}
54+
}
55+
56+
// Close closes the underlying stream.
57+
func (c *AgentClient) Close() {
58+
if c.stream == nil {
59+
klog.Warning("Unexpected empty AgentClient.stream")
60+
return
4961
}
50-
return a, nil
62+
c.stream.Close()
5163
}
5264

5365
// connContext tracks a connection from agent to node network.
@@ -75,10 +87,12 @@ func (c *connContext) cleanup() {
7587
// gRPC stream. Successful Connect is required before Serve. The
7688
// The requests include things like opening a connection to a server,
7789
// streaming data and close the connection.
78-
func (a *AgentClient) Serve(stopCh <-chan struct{}) {
90+
func (a *AgentClient) Serve() {
91+
klog.Infof("Start serving for serverID %s", a.stream.serverID)
92+
go a.stream.probe()
7993
for {
8094
select {
81-
case <-stopCh:
95+
case <-a.stopCh:
8296
klog.Info("stop agent client.")
8397
return
8498
default:

pkg/agent/agentclient/client_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ import (
1717
func TestServeData_HTTP(t *testing.T) {
1818
var err error
1919
var stream agent.AgentService_ConnectClient
20+
stopCh := make(chan struct{})
2021
client := &AgentClient{
2122
connContext: make(map[int64]*connContext),
23+
stopCh: stopCh,
2224
}
2325
client.stream, stream = pipe2()
24-
stopCh := make(chan struct{})
2526

2627
// Start agent
27-
go client.Serve(stopCh)
28+
go client.Serve()
2829
defer close(stopCh)
2930

3031
// Start test http server as remote service
@@ -112,14 +113,15 @@ func TestServeData_HTTP(t *testing.T) {
112113

113114
func TestClose_Client(t *testing.T) {
114115
var stream agent.AgentService_ConnectClient
116+
stopCh := make(chan struct{})
115117
client := &AgentClient{
116118
connContext: make(map[int64]*connContext),
119+
stopCh: stopCh,
117120
}
118121
client.stream, stream = pipe2()
119-
stopCh := make(chan struct{})
120122

121123
// Start agent
122-
go client.Serve(stopCh)
124+
go client.Serve()
123125
defer close(stopCh)
124126

125127
// Start test http server as remote service

0 commit comments

Comments
 (0)