Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
}
cc := o.ClientSetConfig(dialOptions...)

var leaseCounter agent.ServerCounter
if o.CountServerLeases {
var config *rest.Config
if o.KubeconfigPath != "" {
Expand All @@ -167,15 +168,20 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
serverLeaseCounter := agent.NewServerLeaseCounter(
leaseCounter = agent.NewServerLeaseCounter(
clock.RealClock{},
leaseLister,
serverLeaseSelector,
)
cc.ServerLeaseCounter = serverLeaseCounter
}

cs := cc.NewAgentClientSet(drainCh, stopCh)
// Always create the response-based counter.
responseCounter := agent.NewResponseBasedCounter(cs)
// Create the aggregate counter which acts as the master strategy.
aggregateCounter := agent.NewAggregateServerCounter(leaseCounter, responseCounter, o.ServerCountSource)
cs.SetServerCounter(aggregateCounter)

cs.Serve()

return cs, nil
Expand Down
114 changes: 41 additions & 73 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,12 @@ type ClientSet struct {
// Address is the proxy server address. Assuming HA proxy server
address string

// leaseCounter counts number of proxy server leases
leaseCounter ServerCounter
// serverCounter counts number of proxy server leases
serverCounter ServerCounter

// lastReceivedServerCount is the last serverCount value received when connecting to a proxy server
lastReceivedServerCount int

// lastServerCount is the most-recently observed serverCount value from either lease system or proxy server,
// former takes priority unless it is an HA server.
// Initialized when the ClientSet creates the first client.
// When syncForever is set, it will be the most recently seen.
lastServerCount int

// syncInterval is the interval at which the agent periodically checks
// that it has connections to all instances of the proxy server.
syncInterval time.Duration
Expand Down Expand Up @@ -108,6 +102,11 @@ func (cs *ClientSet) ClientsCount() int {
return len(cs.clients)
}

// SetServerCounter sets the strategy for determining the server count.
func (cs *ClientSet) SetServerCounter(counter ServerCounter) {
cs.serverCounter = counter
}

func (cs *ClientSet) HealthyClientsCount() int {
cs.mu.Lock()
defer cs.mu.Unlock()
Expand Down Expand Up @@ -175,7 +174,6 @@ type ClientSetConfig struct {
WarnOnChannelLimit bool
SyncForever bool
XfrChannelSize int
ServerLeaseCounter ServerCounter
ServerCountSource string
}

Expand All @@ -195,7 +193,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
drainCh: drainCh,
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
leaseCounter: cc.ServerLeaseCounter,
serverCountSource: cc.ServerCountSource,
}
}
Expand All @@ -214,30 +211,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
}
}

// sync makes sure that #clients >= #proxy servers
// determineServerCount determines the number of proxy servers by delegating to its configured counter strategy.
func (cs *ClientSet) determineServerCount() int {
serverCount := cs.serverCounter.Count()
metrics.Metrics.SetServerCount(serverCount)
return serverCount
}

// sync manages the backoff and the connection attempts to the proxy server.
// sync runs until stopCh is closed
func (cs *ClientSet) sync() {
defer cs.shutdown()
backoff := cs.resetBackoff()
var duration time.Duration
for {
if serverCount, err := cs.connectOnce(); err != nil {
if err := cs.connectOnce(); err != nil {
if dse, ok := err.(*DuplicateServerError); ok {
clientsCount := cs.ClientsCount()
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
if serverCount != 0 && clientsCount >= serverCount {
duration = backoff.Step()
} else {
backoff = cs.resetBackoff()
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
}
klog.V(4).InfoS("duplicate server connection attempt", "serverID", dse.ServerID)
// We connected to a server we already have a connection to.
// This is expected in syncForever mode. We just wait for the
// next sync period to try again. No need for backoff.
backoff = cs.resetBackoff()
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
} else {
// A 'real' error, so we backoff.
klog.ErrorS(err, "cannot connect once")
duration = backoff.Step()
}
} else {
// A successful connection was made, or no new connection was needed.
// Reset the backoff and wait for the next sync period.
backoff = cs.resetBackoff()
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
}

time.Sleep(duration)
select {
case <-cs.stopCh:
Expand All @@ -247,76 +254,37 @@ func (cs *ClientSet) sync() {
}
}

func (cs *ClientSet) ServerCount() int {

var serverCount int
var countSourceLabel string
func (cs *ClientSet) connectOnce() error {
serverCount := cs.determineServerCount()

switch cs.serverCountSource {
case "", "default":
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
countSourceLabel = fromLeases
} else {
serverCount = cs.lastReceivedServerCount
countSourceLabel = fromResponses
}
case "max":
countFromLeases := 0
if cs.leaseCounter != nil {
countFromLeases = cs.leaseCounter.Count()
}
countFromResponses := cs.lastReceivedServerCount

serverCount = countFromLeases
countSourceLabel = fromLeases
if countFromResponses > serverCount {
serverCount = countFromResponses
countSourceLabel = fromResponses
}
if serverCount == 0 {
serverCount = 1
countSourceLabel = fromFallback
}

}

if serverCount != cs.lastServerCount {
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
cs.lastServerCount = serverCount
// If not in syncForever mode, we only connect if we have fewer connections than the server count.
if !cs.syncForever && cs.ClientsCount() >= serverCount && serverCount > 0 {
return nil // Nothing to do.
}

metrics.Metrics.SetServerCount(serverCount)
return serverCount
}

func (cs *ClientSet) connectOnce() (int, error) {
serverCount := cs.ServerCount()

if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
return serverCount, nil
}
// In syncForever mode, we always try to connect, to discover new servers.
c, receivedServerCount, err := cs.newAgentClient()
if err != nil {
return serverCount, err
return err
}

if err := cs.AddClient(c.serverID, c); err != nil {
c.Close()
return serverCount, err
return err // likely *DuplicateServerError
}
// By moving the update to here, we only accept the server count from a server
// that we have successfully added to our active client set, implicitly ignoring
// stale data from duplicate connection attempts.
// SUCCESS: We connected to a new, unique server.
// Only now do we update our view of the server count.
cs.lastReceivedServerCount = receivedServerCount
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
klog.V(2).InfoS("successfully connected to new proxy server", "serverID", c.serverID, "newServerCount", receivedServerCount)

labels := runpprof.Labels(
"agentIdentifiers", cs.agentIdentifiers,
"serverAddress", cs.address,
"serverID", c.serverID,
)
go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() })
return serverCount, nil

return nil
}

func (cs *ClientSet) Serve() {
Expand Down
110 changes: 69 additions & 41 deletions pkg/agent/clientset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,65 +28,93 @@ func (f *FakeServerCounter) Count() int {
return f.count
}

func TestServerCount(t *testing.T) {
func TestAggregateServerCounter(t *testing.T) {
testCases := []struct {
name string
serverCountSource string
leaseCounter ServerCounter
responseCount int
want int
name string
source string
leaseCounter ServerCounter
responseCounter ServerCounter
want int
}{
{
name: "higher from response",
serverCountSource: "max",
responseCount: 42,
leaseCounter: &FakeServerCounter{24},
want: 42,
name: "max: higher from response",
source: "max",
leaseCounter: &FakeServerCounter{count: 24},
responseCounter: &FakeServerCounter{count: 42},
want: 42,
},
{
name: "higher from leases",
serverCountSource: "max",
responseCount: 3,
leaseCounter: &FakeServerCounter{6},
want: 6,
name: "max: higher from leases",
source: "max",
leaseCounter: &FakeServerCounter{count: 6},
responseCounter: &FakeServerCounter{count: 3},
want: 6,
},
{
name: "both zero",
serverCountSource: "max",
responseCount: 0,
leaseCounter: &FakeServerCounter{0},
want: 1,
name: "max: both zero",
source: "max",
leaseCounter: &FakeServerCounter{count: 0},
responseCounter: &FakeServerCounter{count: 0},
want: 1, // fallback
},
{
name: "default: lease counter is nil",
source: "default",
leaseCounter: nil,
responseCounter: &FakeServerCounter{count: 3},
want: 3,
},

{
name: "response picked by default when no lease counter",
serverCountSource: "default",
responseCount: 3,
leaseCounter: nil,
want: 3,
name: "default: lease counter is present",
source: "default",
leaseCounter: &FakeServerCounter{count: 3},
responseCounter: &FakeServerCounter{count: 6},
want: 3, // lease count is preferred
},
{
name: "lease counter always picked when present",
serverCountSource: "default",
responseCount: 6,
leaseCounter: &FakeServerCounter{3},
want: 3,
name: "default: lease count is zero",
source: "default",
leaseCounter: &FakeServerCounter{count: 0},
responseCounter: &FakeServerCounter{count: 6},
want: 1, // fallback
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

cs := &ClientSet{
clients: make(map[string]*Client),
leaseCounter: tc.leaseCounter,
serverCountSource: tc.serverCountSource,
}
cs.lastReceivedServerCount = tc.responseCount
if got := cs.ServerCount(); got != tc.want {
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
agg := NewAggregateServerCounter(tc.leaseCounter, tc.responseCounter, tc.source)
if got := agg.Count(); got != tc.want {
t.Errorf("agg.Count() = %v, want: %v", got, tc.want)
}
})
}
}

func TestResponseBasedCounter(t *testing.T) {
testCases := []struct {
name string
responseCount int
want int
}{
{
name: "non-zero count",
responseCount: 5,
want: 5,
},
{
name: "zero count",
responseCount: 0,
want: 1, // fallback
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := &ClientSet{lastReceivedServerCount: tc.responseCount}
rbc := NewResponseBasedCounter(cs)
if got := rbc.Count(); got != tc.want {
t.Errorf("rbc.Count() = %v, want: %v", got, tc.want)
}
})
}
}
4 changes: 0 additions & 4 deletions pkg/agent/lease_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ import (
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
)

type ServerCounter interface {
Count() int
}

// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
// current proxy server count.
type ServerLeaseCounter struct {
Expand Down
Loading