diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index 83ee3afbb..2aaa3a828 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -143,6 +143,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st } cc := o.ClientSetConfig(dialOptions...) + var leaseCounter agent.ServerCounter if o.CountServerLeases { var config *rest.Config if o.KubeconfigPath != "" { @@ -167,15 +168,20 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced) leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer()) serverLeaseSelector, _ := labels.Parse(o.LeaseLabel) - serverLeaseCounter := agent.NewServerLeaseCounter( + leaseCounter = agent.NewServerLeaseCounter( clock.RealClock{}, leaseLister, serverLeaseSelector, ) - cc.ServerLeaseCounter = serverLeaseCounter } cs := cc.NewAgentClientSet(drainCh, stopCh) + // Always create the response-based counter. + responseCounter := agent.NewResponseBasedCounter(cs) + // Create the aggregate counter which acts as the master strategy. + aggregateCounter := agent.NewAggregateServerCounter(leaseCounter, responseCounter, o.ServerCountSource) + cs.SetServerCounter(aggregateCounter) + cs.Serve() return cs, nil diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 886bf85b7..db44b5e4a 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -51,18 +51,12 @@ type ClientSet struct { // Address is the proxy server address. Assuming HA proxy server address string - // leaseCounter counts number of proxy server leases - leaseCounter ServerCounter + // serverCounter counts number of proxy server leases + serverCounter ServerCounter // lastReceivedServerCount is the last serverCount value received when connecting to a proxy server lastReceivedServerCount int - // lastServerCount is the most-recently observed serverCount value from either lease system or proxy server, - // former takes priority unless it is an HA server. - // Initialized when the ClientSet creates the first client. - // When syncForever is set, it will be the most recently seen. - lastServerCount int - // syncInterval is the interval at which the agent periodically checks // that it has connections to all instances of the proxy server. syncInterval time.Duration @@ -108,6 +102,11 @@ func (cs *ClientSet) ClientsCount() int { return len(cs.clients) } +// SetServerCounter sets the strategy for determining the server count. +func (cs *ClientSet) SetServerCounter(counter ServerCounter) { + cs.serverCounter = counter +} + func (cs *ClientSet) HealthyClientsCount() int { cs.mu.Lock() defer cs.mu.Unlock() @@ -175,7 +174,6 @@ type ClientSetConfig struct { WarnOnChannelLimit bool SyncForever bool XfrChannelSize int - ServerLeaseCounter ServerCounter ServerCountSource string } @@ -195,7 +193,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C drainCh: drainCh, xfrChannelSize: cc.XfrChannelSize, stopCh: stopCh, - leaseCounter: cc.ServerLeaseCounter, serverCountSource: cc.ServerCountSource, } } @@ -214,30 +211,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff { } } -// sync makes sure that #clients >= #proxy servers +// determineServerCount determines the number of proxy servers by delegating to its configured counter strategy. +func (cs *ClientSet) determineServerCount() int { + serverCount := cs.serverCounter.Count() + metrics.Metrics.SetServerCount(serverCount) + return serverCount +} + +// sync manages the backoff and the connection attempts to the proxy server. +// sync runs until stopCh is closed func (cs *ClientSet) sync() { defer cs.shutdown() backoff := cs.resetBackoff() var duration time.Duration for { - if serverCount, err := cs.connectOnce(); err != nil { + if err := cs.connectOnce(); err != nil { if dse, ok := err.(*DuplicateServerError); ok { - clientsCount := cs.ClientsCount() - klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount) - if serverCount != 0 && clientsCount >= serverCount { - duration = backoff.Step() - } else { - backoff = cs.resetBackoff() - duration = wait.Jitter(backoff.Duration, backoff.Jitter) - } + klog.V(4).InfoS("duplicate server connection attempt", "serverID", dse.ServerID) + // We connected to a server we already have a connection to. + // This is expected in syncForever mode. We just wait for the + // next sync period to try again. No need for backoff. + backoff = cs.resetBackoff() + duration = wait.Jitter(backoff.Duration, backoff.Jitter) } else { + // A 'real' error, so we backoff. klog.ErrorS(err, "cannot connect once") duration = backoff.Step() } } else { + // A successful connection was made, or no new connection was needed. + // Reset the backoff and wait for the next sync period. backoff = cs.resetBackoff() duration = wait.Jitter(backoff.Duration, backoff.Jitter) } + time.Sleep(duration) select { case <-cs.stopCh: @@ -247,68 +254,28 @@ func (cs *ClientSet) sync() { } } -func (cs *ClientSet) ServerCount() int { - - var serverCount int - var countSourceLabel string +func (cs *ClientSet) connectOnce() error { + serverCount := cs.determineServerCount() - switch cs.serverCountSource { - case "", "default": - if cs.leaseCounter != nil { - serverCount = cs.leaseCounter.Count() - countSourceLabel = fromLeases - } else { - serverCount = cs.lastReceivedServerCount - countSourceLabel = fromResponses - } - case "max": - countFromLeases := 0 - if cs.leaseCounter != nil { - countFromLeases = cs.leaseCounter.Count() - } - countFromResponses := cs.lastReceivedServerCount - - serverCount = countFromLeases - countSourceLabel = fromLeases - if countFromResponses > serverCount { - serverCount = countFromResponses - countSourceLabel = fromResponses - } - if serverCount == 0 { - serverCount = 1 - countSourceLabel = fromFallback - } - - } - - if serverCount != cs.lastServerCount { - klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel) - cs.lastServerCount = serverCount + // If not in syncForever mode, we only connect if we have fewer connections than the server count. + if !cs.syncForever && cs.ClientsCount() >= serverCount && serverCount > 0 { + return nil // Nothing to do. } - metrics.Metrics.SetServerCount(serverCount) - return serverCount -} - -func (cs *ClientSet) connectOnce() (int, error) { - serverCount := cs.ServerCount() - - if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount { - return serverCount, nil - } + // In syncForever mode, we always try to connect, to discover new servers. c, receivedServerCount, err := cs.newAgentClient() if err != nil { - return serverCount, err + return err } + if err := cs.AddClient(c.serverID, c); err != nil { c.Close() - return serverCount, err + return err // likely *DuplicateServerError } - // By moving the update to here, we only accept the server count from a server - // that we have successfully added to our active client set, implicitly ignoring - // stale data from duplicate connection attempts. + // SUCCESS: We connected to a new, unique server. + // Only now do we update our view of the server count. cs.lastReceivedServerCount = receivedServerCount - klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) + klog.V(2).InfoS("successfully connected to new proxy server", "serverID", c.serverID, "newServerCount", receivedServerCount) labels := runpprof.Labels( "agentIdentifiers", cs.agentIdentifiers, @@ -316,7 +283,8 @@ func (cs *ClientSet) connectOnce() (int, error) { "serverID", c.serverID, ) go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() }) - return serverCount, nil + + return nil } func (cs *ClientSet) Serve() { diff --git a/pkg/agent/clientset_test.go b/pkg/agent/clientset_test.go index ed7c25814..f35254323 100644 --- a/pkg/agent/clientset_test.go +++ b/pkg/agent/clientset_test.go @@ -28,65 +28,93 @@ func (f *FakeServerCounter) Count() int { return f.count } -func TestServerCount(t *testing.T) { +func TestAggregateServerCounter(t *testing.T) { testCases := []struct { - name string - serverCountSource string - leaseCounter ServerCounter - responseCount int - want int + name string + source string + leaseCounter ServerCounter + responseCounter ServerCounter + want int }{ { - name: "higher from response", - serverCountSource: "max", - responseCount: 42, - leaseCounter: &FakeServerCounter{24}, - want: 42, + name: "max: higher from response", + source: "max", + leaseCounter: &FakeServerCounter{count: 24}, + responseCounter: &FakeServerCounter{count: 42}, + want: 42, }, { - name: "higher from leases", - serverCountSource: "max", - responseCount: 3, - leaseCounter: &FakeServerCounter{6}, - want: 6, + name: "max: higher from leases", + source: "max", + leaseCounter: &FakeServerCounter{count: 6}, + responseCounter: &FakeServerCounter{count: 3}, + want: 6, }, { - name: "both zero", - serverCountSource: "max", - responseCount: 0, - leaseCounter: &FakeServerCounter{0}, - want: 1, + name: "max: both zero", + source: "max", + leaseCounter: &FakeServerCounter{count: 0}, + responseCounter: &FakeServerCounter{count: 0}, + want: 1, // fallback + }, + { + name: "default: lease counter is nil", + source: "default", + leaseCounter: nil, + responseCounter: &FakeServerCounter{count: 3}, + want: 3, }, - { - name: "response picked by default when no lease counter", - serverCountSource: "default", - responseCount: 3, - leaseCounter: nil, - want: 3, + name: "default: lease counter is present", + source: "default", + leaseCounter: &FakeServerCounter{count: 3}, + responseCounter: &FakeServerCounter{count: 6}, + want: 3, // lease count is preferred }, { - name: "lease counter always picked when present", - serverCountSource: "default", - responseCount: 6, - leaseCounter: &FakeServerCounter{3}, - want: 3, + name: "default: lease count is zero", + source: "default", + leaseCounter: &FakeServerCounter{count: 0}, + responseCounter: &FakeServerCounter{count: 6}, + want: 1, // fallback }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - - cs := &ClientSet{ - clients: make(map[string]*Client), - leaseCounter: tc.leaseCounter, - serverCountSource: tc.serverCountSource, - } - cs.lastReceivedServerCount = tc.responseCount - if got := cs.ServerCount(); got != tc.want { - t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want) + agg := NewAggregateServerCounter(tc.leaseCounter, tc.responseCounter, tc.source) + if got := agg.Count(); got != tc.want { + t.Errorf("agg.Count() = %v, want: %v", got, tc.want) } }) } +} +func TestResponseBasedCounter(t *testing.T) { + testCases := []struct { + name string + responseCount int + want int + }{ + { + name: "non-zero count", + responseCount: 5, + want: 5, + }, + { + name: "zero count", + responseCount: 0, + want: 1, // fallback + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cs := &ClientSet{lastReceivedServerCount: tc.responseCount} + rbc := NewResponseBasedCounter(cs) + if got := rbc.Count(); got != tc.want { + t.Errorf("rbc.Count() = %v, want: %v", got, tc.want) + } + }) + } } diff --git a/pkg/agent/lease_counter.go b/pkg/agent/lease_counter.go index cdc74ce70..4175d4ffe 100644 --- a/pkg/agent/lease_counter.go +++ b/pkg/agent/lease_counter.go @@ -36,10 +36,6 @@ import ( coordinationv1lister "k8s.io/client-go/listers/coordination/v1" ) -type ServerCounter interface { - Count() int -} - // A ServerLeaseCounter counts leases in the k8s apiserver to determine the // current proxy server count. type ServerLeaseCounter struct { diff --git a/pkg/agent/server_counter.go b/pkg/agent/server_counter.go new file mode 100644 index 000000000..3fcec7266 --- /dev/null +++ b/pkg/agent/server_counter.go @@ -0,0 +1,83 @@ +package agent + +import "k8s.io/klog/v2" + +// ServerCounter is the interface for determining the server count. +type ServerCounter interface { + Count() int +} + +// responseBasedCounter determines server count based on server responses only. +type responseBasedCounter struct { + cs *ClientSet +} + +// NewResponseBasedCounter creates a new responseBasedCounter. +func NewResponseBasedCounter(cs *ClientSet) ServerCounter { + return &responseBasedCounter{cs: cs} +} + +func (rbc *responseBasedCounter) Count() int { + serverCount := rbc.cs.lastReceivedServerCount + if serverCount == 0 { + return 1 // Fallback + } + return serverCount +} + +// AggregateServerCounter determines server count based on leases and/or server responses. +type aggregateServerCounter struct { + leaseCounter ServerCounter // may be nil + responseCounter ServerCounter + source string +} + +// NewAggregateServerCounter creates a new aggregateServerCounter. +func NewAggregateServerCounter(leaseCounter, responseCounter ServerCounter, source string) ServerCounter { + return &aggregateServerCounter{ + leaseCounter: leaseCounter, + responseCounter: responseCounter, + source: source, + } +} + +func (asc *aggregateServerCounter) Count() int { + var serverCount int + var countSourceLabel string + + countFromResponses := asc.responseCounter.Count() + + // If lease counting is not enabled, we can only use responses. + if asc.leaseCounter == nil { + serverCount = countFromResponses + countSourceLabel = fromResponses + if serverCount == 0 { + countSourceLabel = fromFallback + } + klog.V(4).InfoS("Determined server count", "serverCount", serverCount, "source", countSourceLabel) + return serverCount + } + + countFromLeases := asc.leaseCounter.Count() + + switch asc.source { + case "max": + serverCount = countFromLeases + countSourceLabel = fromLeases + if countFromResponses > serverCount { + serverCount = countFromResponses + countSourceLabel = fromResponses + } + default: // "default" or empty string + serverCount = countFromLeases + countSourceLabel = fromLeases + } + + if serverCount == 0 { + serverCount = 1 + countSourceLabel = fromFallback + } + + klog.V(4).InfoS("Determined server count", "serverCount", serverCount, "source", countSourceLabel) + return serverCount +}