Skip to content

Commit 3c63f04

Browse files
committed
fixes server count logic
Signed-off-by: Imran Pochi <[email protected]>
1 parent dac431f commit 3c63f04

File tree

5 files changed

+201
-120
lines changed

5 files changed

+201
-120
lines changed

cmd/agent/app/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
143143
}
144144
cc := o.ClientSetConfig(dialOptions...)
145145

146+
var leaseCounter agent.ServerCounter
146147
if o.CountServerLeases {
147148
var config *rest.Config
148149
if o.KubeconfigPath != "" {
@@ -167,15 +168,20 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
167168
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
168169
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
169170
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
170-
serverLeaseCounter := agent.NewServerLeaseCounter(
171+
leaseCounter = agent.NewServerLeaseCounter(
171172
clock.RealClock{},
172173
leaseLister,
173174
serverLeaseSelector,
174175
)
175-
cc.ServerLeaseCounter = serverLeaseCounter
176176
}
177177

178178
cs := cc.NewAgentClientSet(drainCh, stopCh)
179+
// Always create the response-based counter.
180+
responseCounter := agent.NewResponseBasedCounter(cs)
181+
// Create the aggregate counter which acts as the master strategy.
182+
aggregateCounter := agent.NewAggregateServerCounter(leaseCounter, responseCounter, o.ServerCountSource)
183+
cs.SetServerCounter(aggregateCounter)
184+
179185
cs.Serve()
180186

181187
return cs, nil

pkg/agent/clientset.go

Lines changed: 41 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,12 @@ type ClientSet struct {
5151
// Address is the proxy server address. Assuming HA proxy server
5252
address string
5353

54-
// leaseCounter counts number of proxy server leases
55-
leaseCounter ServerCounter
54+
// serverCounter counts number of proxy server leases
55+
serverCounter ServerCounter
5656

5757
// lastReceivedServerCount is the last serverCount value received when connecting to a proxy server
5858
lastReceivedServerCount int
5959

60-
// lastServerCount is the most-recently observed serverCount value from either lease system or proxy server,
61-
// former takes priority unless it is an HA server.
62-
// Initialized when the ClientSet creates the first client.
63-
// When syncForever is set, it will be the most recently seen.
64-
lastServerCount int
65-
6660
// syncInterval is the interval at which the agent periodically checks
6761
// that it has connections to all instances of the proxy server.
6862
syncInterval time.Duration
@@ -108,6 +102,11 @@ func (cs *ClientSet) ClientsCount() int {
108102
return len(cs.clients)
109103
}
110104

105+
// SetServerCounter sets the strategy for determining the server count.
106+
func (cs *ClientSet) SetServerCounter(counter ServerCounter) {
107+
cs.serverCounter = counter
108+
}
109+
111110
func (cs *ClientSet) HealthyClientsCount() int {
112111
cs.mu.Lock()
113112
defer cs.mu.Unlock()
@@ -175,7 +174,6 @@ type ClientSetConfig struct {
175174
WarnOnChannelLimit bool
176175
SyncForever bool
177176
XfrChannelSize int
178-
ServerLeaseCounter ServerCounter
179177
ServerCountSource string
180178
}
181179

@@ -195,7 +193,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
195193
drainCh: drainCh,
196194
xfrChannelSize: cc.XfrChannelSize,
197195
stopCh: stopCh,
198-
leaseCounter: cc.ServerLeaseCounter,
199196
serverCountSource: cc.ServerCountSource,
200197
}
201198
}
@@ -214,30 +211,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
214211
}
215212
}
216213

217-
// sync makes sure that #clients >= #proxy servers
214+
// determineServerCount determines the number of proxy servers by delegating to its configured counter strategy.
215+
func (cs *ClientSet) determineServerCount() int {
216+
serverCount := cs.serverCounter.Count()
217+
metrics.Metrics.SetServerCount(serverCount)
218+
return serverCount
219+
}
220+
221+
// sync manages the backoff and the connection attempts to the proxy server.
222+
// sync runs until stopCh is closed
218223
func (cs *ClientSet) sync() {
219224
defer cs.shutdown()
220225
backoff := cs.resetBackoff()
221226
var duration time.Duration
222227
for {
223-
if serverCount, err := cs.connectOnce(); err != nil {
228+
if err := cs.connectOnce(); err != nil {
224229
if dse, ok := err.(*DuplicateServerError); ok {
225-
clientsCount := cs.ClientsCount()
226-
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
227-
if serverCount != 0 && clientsCount >= serverCount {
228-
duration = backoff.Step()
229-
} else {
230-
backoff = cs.resetBackoff()
231-
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
232-
}
230+
klog.V(4).InfoS("duplicate server connection attempt", "serverID", dse.ServerID)
231+
// We connected to a server we already have a connection to.
232+
// This is expected in syncForever mode. We just wait for the
233+
// next sync period to try again. No need for backoff.
234+
backoff = cs.resetBackoff()
235+
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
233236
} else {
237+
// A 'real' error, so we backoff.
234238
klog.ErrorS(err, "cannot connect once")
235239
duration = backoff.Step()
236240
}
237241
} else {
242+
// A successful connection was made, or no new connection was needed.
243+
// Reset the backoff and wait for the next sync period.
238244
backoff = cs.resetBackoff()
239245
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
240246
}
247+
241248
time.Sleep(duration)
242249
select {
243250
case <-cs.stopCh:
@@ -247,76 +254,37 @@ func (cs *ClientSet) sync() {
247254
}
248255
}
249256

250-
func (cs *ClientSet) ServerCount() int {
251-
252-
var serverCount int
253-
var countSourceLabel string
257+
func (cs *ClientSet) connectOnce() error {
258+
serverCount := cs.determineServerCount()
254259

255-
switch cs.serverCountSource {
256-
case "", "default":
257-
if cs.leaseCounter != nil {
258-
serverCount = cs.leaseCounter.Count()
259-
countSourceLabel = fromLeases
260-
} else {
261-
serverCount = cs.lastReceivedServerCount
262-
countSourceLabel = fromResponses
263-
}
264-
case "max":
265-
countFromLeases := 0
266-
if cs.leaseCounter != nil {
267-
countFromLeases = cs.leaseCounter.Count()
268-
}
269-
countFromResponses := cs.lastReceivedServerCount
270-
271-
serverCount = countFromLeases
272-
countSourceLabel = fromLeases
273-
if countFromResponses > serverCount {
274-
serverCount = countFromResponses
275-
countSourceLabel = fromResponses
276-
}
277-
if serverCount == 0 {
278-
serverCount = 1
279-
countSourceLabel = fromFallback
280-
}
281-
282-
}
283-
284-
if serverCount != cs.lastServerCount {
285-
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
286-
cs.lastServerCount = serverCount
260+
// If not in syncForever mode, we only connect if we have fewer connections than the server count.
261+
if !cs.syncForever && cs.ClientsCount() >= serverCount && serverCount > 0 {
262+
return nil // Nothing to do.
287263
}
288264

289-
metrics.Metrics.SetServerCount(serverCount)
290-
return serverCount
291-
}
292-
293-
func (cs *ClientSet) connectOnce() (int, error) {
294-
serverCount := cs.ServerCount()
295-
296-
if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
297-
return serverCount, nil
298-
}
265+
// In syncForever mode, we always try to connect, to discover new servers.
299266
c, receivedServerCount, err := cs.newAgentClient()
300267
if err != nil {
301-
return serverCount, err
268+
return err
302269
}
270+
303271
if err := cs.AddClient(c.serverID, c); err != nil {
304272
c.Close()
305-
return serverCount, err
273+
return err // likely *DuplicateServerError
306274
}
307-
// By moving the update to here, we only accept the server count from a server
308-
// that we have successfully added to our active client set, implicitly ignoring
309-
// stale data from duplicate connection attempts.
275+
// SUCCESS: We connected to a new, unique server.
276+
// Only now do we update our view of the server count.
310277
cs.lastReceivedServerCount = receivedServerCount
311-
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
278+
klog.V(2).InfoS("successfully connected to new proxy server", "serverID", c.serverID, "newServerCount", receivedServerCount)
312279

313280
labels := runpprof.Labels(
314281
"agentIdentifiers", cs.agentIdentifiers,
315282
"serverAddress", cs.address,
316283
"serverID", c.serverID,
317284
)
318285
go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() })
319-
return serverCount, nil
286+
287+
return nil
320288
}
321289

322290
func (cs *ClientSet) Serve() {

pkg/agent/clientset_test.go

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,65 +28,93 @@ func (f *FakeServerCounter) Count() int {
2828
return f.count
2929
}
3030

31-
func TestServerCount(t *testing.T) {
31+
func TestAggregateServerCounter(t *testing.T) {
3232
testCases := []struct {
33-
name string
34-
serverCountSource string
35-
leaseCounter ServerCounter
36-
responseCount int
37-
want int
33+
name string
34+
source string
35+
leaseCounter ServerCounter
36+
responseCounter ServerCounter
37+
want int
3838
}{
3939
{
40-
name: "higher from response",
41-
serverCountSource: "max",
42-
responseCount: 42,
43-
leaseCounter: &FakeServerCounter{24},
44-
want: 42,
40+
name: "max: higher from response",
41+
source: "max",
42+
leaseCounter: &FakeServerCounter{count: 24},
43+
responseCounter: &FakeServerCounter{count: 42},
44+
want: 42,
4545
},
4646
{
47-
name: "higher from leases",
48-
serverCountSource: "max",
49-
responseCount: 3,
50-
leaseCounter: &FakeServerCounter{6},
51-
want: 6,
47+
name: "max: higher from leases",
48+
source: "max",
49+
leaseCounter: &FakeServerCounter{count: 6},
50+
responseCounter: &FakeServerCounter{count: 3},
51+
want: 6,
5252
},
5353
{
54-
name: "both zero",
55-
serverCountSource: "max",
56-
responseCount: 0,
57-
leaseCounter: &FakeServerCounter{0},
58-
want: 1,
54+
name: "max: both zero",
55+
source: "max",
56+
leaseCounter: &FakeServerCounter{count: 0},
57+
responseCounter: &FakeServerCounter{count: 0},
58+
want: 1, // fallback
59+
},
60+
{
61+
name: "default: lease counter is nil",
62+
source: "default",
63+
leaseCounter: nil,
64+
responseCounter: &FakeServerCounter{count: 3},
65+
want: 3,
5966
},
60-
6167
{
62-
name: "response picked by default when no lease counter",
63-
serverCountSource: "default",
64-
responseCount: 3,
65-
leaseCounter: nil,
66-
want: 3,
68+
name: "default: lease counter is present",
69+
source: "default",
70+
leaseCounter: &FakeServerCounter{count: 3},
71+
responseCounter: &FakeServerCounter{count: 6},
72+
want: 3, // lease count is preferred
6773
},
6874
{
69-
name: "lease counter always picked when present",
70-
serverCountSource: "default",
71-
responseCount: 6,
72-
leaseCounter: &FakeServerCounter{3},
73-
want: 3,
75+
name: "default: lease count is zero",
76+
source: "default",
77+
leaseCounter: &FakeServerCounter{count: 0},
78+
responseCounter: &FakeServerCounter{count: 6},
79+
want: 1, // fallback
7480
},
7581
}
7682

7783
for _, tc := range testCases {
7884
t.Run(tc.name, func(t *testing.T) {
79-
80-
cs := &ClientSet{
81-
clients: make(map[string]*Client),
82-
leaseCounter: tc.leaseCounter,
83-
serverCountSource: tc.serverCountSource,
84-
}
85-
cs.lastReceivedServerCount = tc.responseCount
86-
if got := cs.ServerCount(); got != tc.want {
87-
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
85+
agg := NewAggregateServerCounter(tc.leaseCounter, tc.responseCounter, tc.source)
86+
if got := agg.Count(); got != tc.want {
87+
t.Errorf("agg.Count() = %v, want: %v", got, tc.want)
8888
}
8989
})
9090
}
91+
}
9192

93+
func TestResponseBasedCounter(t *testing.T) {
94+
testCases := []struct {
95+
name string
96+
responseCount int
97+
want int
98+
}{
99+
{
100+
name: "non-zero count",
101+
responseCount: 5,
102+
want: 5,
103+
},
104+
{
105+
name: "zero count",
106+
responseCount: 0,
107+
want: 1, // fallback
108+
},
109+
}
110+
111+
for _, tc := range testCases {
112+
t.Run(tc.name, func(t *testing.T) {
113+
cs := &ClientSet{lastReceivedServerCount: tc.responseCount}
114+
rbc := NewResponseBasedCounter(cs)
115+
if got := rbc.Count(); got != tc.want {
116+
t.Errorf("rbc.Count() = %v, want: %v", got, tc.want)
117+
}
118+
})
119+
}
92120
}

pkg/agent/lease_counter.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ import (
3636
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
3737
)
3838

39-
type ServerCounter interface {
40-
Count() int
41-
}
42-
4339
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
4440
// current proxy server count.
4541
type ServerLeaseCounter struct {

0 commit comments

Comments
 (0)