@@ -51,8 +51,8 @@ type ClientSet struct {
5151 // Address is the proxy server address. Assuming HA proxy server
5252 address string
5353
54- // leaseCounter counts number of proxy server leases
55- leaseCounter ServerCounter
54+ // serverCounter counts number of proxy server leases
55+ serverCounter ServerCounter
5656
5757 // lastReceivedServerCount is the last serverCount value received when connecting to a proxy server
5858 lastReceivedServerCount int
@@ -108,6 +108,11 @@ func (cs *ClientSet) ClientsCount() int {
108108 return len (cs .clients )
109109}
110110
111+ // SetServerCounter sets the strategy for determining the server count.
112+ func (cs * ClientSet ) SetServerCounter (counter ServerCounter ) {
113+ cs .serverCounter = counter
114+ }
115+
111116func (cs * ClientSet ) HealthyClientsCount () int {
112117 cs .mu .Lock ()
113118 defer cs .mu .Unlock ()
@@ -175,7 +180,6 @@ type ClientSetConfig struct {
175180 WarnOnChannelLimit bool
176181 SyncForever bool
177182 XfrChannelSize int
178- ServerLeaseCounter ServerCounter
179183 ServerCountSource string
180184}
181185
@@ -195,7 +199,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
195199 drainCh : drainCh ,
196200 xfrChannelSize : cc .XfrChannelSize ,
197201 stopCh : stopCh ,
198- leaseCounter : cc .ServerLeaseCounter ,
199202 serverCountSource : cc .ServerCountSource ,
200203 }
201204}
@@ -214,30 +217,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
214217 }
215218}
216219
217- // sync makes sure that #clients >= #proxy servers
220+ // determineServerCount determines the number of proxy servers by delegating to its configured counter strategy.
221+ func (cs * ClientSet ) determineServerCount () int {
222+ serverCount := cs .serverCounter .Count ()
223+ metrics .Metrics .SetServerCount (serverCount )
224+ return serverCount
225+ }
226+
227+ // sync manages the backoff and the connection attempts to the proxy server.
228+ // sync runs until stopCh is closed
218229func (cs * ClientSet ) sync () {
219230 defer cs .shutdown ()
220231 backoff := cs .resetBackoff ()
221232 var duration time.Duration
222233 for {
223- if serverCount , err := cs .connectOnce (); err != nil {
234+ if err := cs .connectOnce (); err != nil {
224235 if dse , ok := err .(* DuplicateServerError ); ok {
225- clientsCount := cs .ClientsCount ()
226- klog .V (4 ).InfoS ("duplicate server" , "serverID" , dse .ServerID , "serverCount" , serverCount , "clientsCount" , clientsCount )
227- if serverCount != 0 && clientsCount >= serverCount {
228- duration = backoff .Step ()
229- } else {
230- backoff = cs .resetBackoff ()
231- duration = wait .Jitter (backoff .Duration , backoff .Jitter )
232- }
236+ klog .V (4 ).InfoS ("duplicate server connection attempt" , "serverID" , dse .ServerID )
237+ // We connected to a server we already have a connection to.
238+ // This is expected in syncForever mode. We just wait for the
239+ // next sync period to try again. No need for backoff.
240+ backoff = cs .resetBackoff ()
241+ duration = wait .Jitter (backoff .Duration , backoff .Jitter )
233242 } else {
243+ // A 'real' error, so we backoff.
234244 klog .ErrorS (err , "cannot connect once" )
235245 duration = backoff .Step ()
236246 }
237247 } else {
248+ // A successful connection was made, or no new connection was needed.
249+ // Reset the backoff and wait for the next sync period.
238250 backoff = cs .resetBackoff ()
239251 duration = wait .Jitter (backoff .Duration , backoff .Jitter )
240252 }
253+
241254 time .Sleep (duration )
242255 select {
243256 case <- cs .stopCh :
@@ -247,76 +260,37 @@ func (cs *ClientSet) sync() {
247260 }
248261}
249262
250- func (cs * ClientSet ) ServerCount () int {
251-
252- var serverCount int
253- var countSourceLabel string
254-
255- switch cs .serverCountSource {
256- case "" , "default" :
257- if cs .leaseCounter != nil {
258- serverCount = cs .leaseCounter .Count ()
259- countSourceLabel = fromLeases
260- } else {
261- serverCount = cs .lastReceivedServerCount
262- countSourceLabel = fromResponses
263- }
264- case "max" :
265- countFromLeases := 0
266- if cs .leaseCounter != nil {
267- countFromLeases = cs .leaseCounter .Count ()
268- }
269- countFromResponses := cs .lastReceivedServerCount
270-
271- serverCount = countFromLeases
272- countSourceLabel = fromLeases
273- if countFromResponses > serverCount {
274- serverCount = countFromResponses
275- countSourceLabel = fromResponses
276- }
277- if serverCount == 0 {
278- serverCount = 1
279- countSourceLabel = fromFallback
280- }
281-
282- }
263+ func (cs * ClientSet ) connectOnce () error {
264+ serverCount := cs .determineServerCount ()
283265
284- if serverCount != cs . lastServerCount {
285- klog . Warningf ( "change detected in proxy server count (was: %d, now: %d, source: %q)" , cs . lastServerCount , serverCount , countSourceLabel )
286- cs . lastServerCount = serverCount
266+ // If not in syncForever mode, we only connect if we have fewer connections than the server count.
267+ if ! cs . syncForever && cs . ClientsCount () >= serverCount && serverCount > 0 {
268+ return nil // Nothing to do.
287269 }
288270
289- metrics .Metrics .SetServerCount (serverCount )
290- return serverCount
291- }
292-
293- func (cs * ClientSet ) connectOnce () (int , error ) {
294- serverCount := cs .ServerCount ()
295-
296- if ! cs .syncForever && serverCount != 0 && cs .ClientsCount () >= serverCount {
297- return serverCount , nil
298- }
271+ // In syncForever mode, we always try to connect, to discover new servers.
299272 c , receivedServerCount , err := cs .newAgentClient ()
300273 if err != nil {
301- return serverCount , err
274+ return err
302275 }
276+
303277 if err := cs .AddClient (c .serverID , c ); err != nil {
304278 c .Close ()
305- return serverCount , err
279+ return err // likely *DuplicateServerError
306280 }
307- // By moving the update to here, we only accept the server count from a server
308- // that we have successfully added to our active client set, implicitly ignoring
309- // stale data from duplicate connection attempts.
281+ // SUCCESS: We connected to a new, unique server.
282+ // Only now do we update our view of the server count.
310283 cs .lastReceivedServerCount = receivedServerCount
311- klog .V (2 ).InfoS ("sync added client connecting to proxy server" , "serverID" , c .serverID )
284+ klog .V (2 ).InfoS ("successfully connected to new proxy server" , "serverID" , c .serverID , "newServerCount" , receivedServerCount )
312285
313286 labels := runpprof .Labels (
314287 "agentIdentifiers" , cs .agentIdentifiers ,
315288 "serverAddress" , cs .address ,
316289 "serverID" , c .serverID ,
317290 )
318291 go runpprof .Do (context .Background (), labels , func (context.Context ) { c .Serve () })
319- return serverCount , nil
292+
293+ return nil
320294}
321295
322296func (cs * ClientSet ) Serve () {
0 commit comments