Skip to content

Commit 84c137f

Browse files
committed
Server lease counter refactor
1 parent 20e5bf9 commit 84c137f

File tree

6 files changed

+100
-105
lines changed

6 files changed

+100
-105
lines changed

cmd/agent/app/server.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,11 @@ import (
3838
"google.golang.org/grpc"
3939
"google.golang.org/grpc/credentials"
4040
"google.golang.org/grpc/keepalive"
41+
"k8s.io/apimachinery/pkg/labels"
4142
"k8s.io/client-go/informers"
4243
"k8s.io/client-go/kubernetes"
4344
"k8s.io/client-go/tools/clientcmd"
4445
"k8s.io/klog/v2"
45-
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"
46-
4746
"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
4847
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
4948
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
@@ -138,7 +137,6 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
138137
}
139138
cc := o.ClientSetConfig(dialOptions...)
140139

141-
var serverCounter servercounter.ServerCounter
142140
if o.ServerCountLeaseSelector != "" {
143141
var k8sClient *kubernetes.Clientset
144142
config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
@@ -150,21 +148,22 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
150148
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
151149
}
152150
sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, o.InformerResync)
153-
serverLeaseCounter, err := servercounter.NewServerLeaseCounter(
151+
serverLeaseSelector, err := labels.Parse(o.ServerCountLeaseSelector)
152+
if err != nil {
153+
return nil, fmt.Errorf("invalid server count lease selector: %w", err)
154+
}
155+
serverLeaseCounter := agent.NewServerLeaseCounter(
154156
sharedInformerFactory.Coordination().V1().Leases().Lister(),
155-
o.ServerCountLeaseSelector,
156-
int(o.ServerCount),
157+
serverLeaseSelector,
157158
)
158159
if err != nil {
159160
return nil, fmt.Errorf("failed to create server lease counter: %w", err)
160161
}
161-
serverCounter = serverLeaseCounter
162+
cc.ServerLeaseCounter = serverLeaseCounter
162163
sharedInformerFactory.Start(context.Background().Done())
163-
} else {
164-
serverCounter = servercounter.StaticServerCounter(o.ServerCount)
165164
}
166165

167-
cs := cc.NewAgentClientSet(drainCh, stopCh, serverCounter)
166+
cs := cc.NewAgentClientSet(drainCh, stopCh)
168167
cs.Serve()
169168

170169
return cs, nil

pkg/agent/clientset.go

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"k8s.io/apimachinery/pkg/util/wait"
2929
"k8s.io/klog/v2"
3030
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
31-
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"
3231
)
3332

3433
// ClientSet consists of clients connected to each instance of an HA proxy server.
@@ -37,9 +36,12 @@ type ClientSet struct {
3736
clients map[string]*Client // map between serverID and the client
3837
// connects to this server.
3938

40-
agentID string // ID of this agent
41-
address string // proxy server address. Assuming HA proxy server
42-
serverCounter servercounter.ServerCounter // counts number of proxy servers
39+
agentID string // ID of this agent
40+
address string // proxy server address. Assuming HA proxy server
41+
42+
serverCounter *ServerLeaseCounter // counts number of proxy server leases
43+
lastReceivedServerCount int // last server count received from a proxy server
44+
4345
// unless it is an HA server. Initialized when the ClientSet creates
4446
// the first client. When syncForever is set, it will be the most recently seen.
4547
syncInterval time.Duration // The interval by which the agent
@@ -146,26 +148,29 @@ type ClientSetConfig struct {
146148
WarnOnChannelLimit bool
147149
SyncForever bool
148150
XfrChannelSize int
151+
ServerLeaseCounter *ServerLeaseCounter
149152
}
150153

151-
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}, serverCounter servercounter.ServerCounter) *ClientSet {
154+
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
155+
if cc.SyncForever && cc.ServerLeaseCounter != nil {
156+
klog.Warning("sync forever was enabled even though a server lease counter was provided; this may result in unnecessary connection requests")
157+
}
152158
return &ClientSet{
153-
clients: make(map[string]*Client),
154-
agentID: cc.AgentID,
155-
agentIdentifiers: cc.AgentIdentifiers,
156-
address: cc.Address,
157-
syncInterval: cc.SyncInterval,
158-
probeInterval: cc.ProbeInterval,
159-
syncIntervalCap: cc.SyncIntervalCap,
160-
dialOptions: cc.DialOptions,
161-
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
162-
warnOnChannelLimit: cc.WarnOnChannelLimit,
163-
syncForever: cc.SyncForever,
164-
drainCh: drainCh,
165-
xfrChannelSize: cc.XfrChannelSize,
166-
stopCh: stopCh,
167-
respectReceivedServerCount: false,
168-
serverCounter: serverCounter,
159+
clients: make(map[string]*Client),
160+
agentID: cc.AgentID,
161+
agentIdentifiers: cc.AgentIdentifiers,
162+
address: cc.Address,
163+
syncInterval: cc.SyncInterval,
164+
probeInterval: cc.ProbeInterval,
165+
syncIntervalCap: cc.SyncIntervalCap,
166+
dialOptions: cc.DialOptions,
167+
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
168+
warnOnChannelLimit: cc.WarnOnChannelLimit,
169+
syncForever: cc.SyncForever,
170+
drainCh: drainCh,
171+
xfrChannelSize: cc.XfrChannelSize,
172+
stopCh: stopCh,
173+
serverCounter: cc.ServerLeaseCounter,
169174
}
170175
}
171176

@@ -192,7 +197,7 @@ func (cs *ClientSet) sync() {
192197
if err := cs.connectOnce(); err != nil {
193198
if dse, ok := err.(*DuplicateServerError); ok {
194199
clientsCount := cs.ClientsCount()
195-
serverCount := cs.serverCounter.CountServers()
200+
serverCount := cs.ServerCount()
196201
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
197202
if serverCount != 0 && clientsCount >= serverCount {
198203
duration = backoff.Step()
@@ -217,25 +222,32 @@ func (cs *ClientSet) sync() {
217222
}
218223
}
219224

225+
func (cs *ClientSet) ServerCount() int {
226+
var serverCount int
227+
if cs.serverCounter != nil {
228+
serverCount = cs.serverCounter.Count()
229+
if serverCount == 0 {
230+
klog.Warningf("server lease counter could not find any leases")
231+
}
232+
} else {
233+
serverCount = cs.lastReceivedServerCount
234+
}
235+
236+
metrics.Metrics.SetServerCount(serverCount)
237+
return serverCount
238+
}
239+
220240
func (cs *ClientSet) connectOnce() error {
221-
agentServerCount := cs.serverCounter.CountServers()
222-
if !cs.syncForever && agentServerCount != 0 && cs.ClientsCount() >= agentServerCount {
241+
serverCount := cs.ServerCount()
242+
243+
if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
223244
return nil
224245
}
225-
c, newServerCount, err := cs.newAgentClient()
246+
c, receivedServerCount, err := cs.newAgentClient()
226247
if err != nil {
227248
return err
228249
}
229-
if agentServerCount != 0 && agentServerCount != newServerCount {
230-
klog.V(2).InfoS("Server count change suggestion by server",
231-
"current", agentServerCount, "serverID", c.serverID, "actual", newServerCount)
232-
if cs.respectReceivedServerCount {
233-
cs.serverCounter = servercounter.StaticServerCounter(newServerCount)
234-
klog.V(2).Infof("respecting server count change suggestion, new count: %v", newServerCount)
235-
} else {
236-
klog.V(2).Infof("ignoring server count change suggestion of %v", newServerCount)
237-
}
238-
}
250+
cs.lastReceivedServerCount = receivedServerCount
239251
if err := cs.AddClient(c.serverID, c); err != nil {
240252
c.Close()
241253
return err

pkg/servercounter/lease_counter.go renamed to pkg/agent/lease_counter.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
package servercounter
1+
package agent
22

33
import (
4-
"fmt"
54
"time"
65

76
"k8s.io/apimachinery/pkg/labels"
@@ -22,26 +21,22 @@ type ServerLeaseCounter struct {
2221
}
2322

2423
// NewServerLeaseCounter creates a server counter that counts valid leases that match the label
25-
// selector and provides the fallback count if this fails.
26-
func NewServerLeaseCounter(lister coordinationv1listers.LeaseLister, labelSelector string, fallbackCount int) (*ServerLeaseCounter, error) {
27-
selector, err := labels.Parse(labelSelector)
28-
if err != nil {
29-
return nil, fmt.Errorf("could not parse label selector %v: %w", labelSelector, err)
30-
}
24+
// selector and provides the fallback count (initially 0) if this fails.
25+
func NewServerLeaseCounter(lister coordinationv1listers.LeaseLister, labelSelector labels.Selector) *ServerLeaseCounter {
3126
return &ServerLeaseCounter{
3227
lister: lister,
33-
selector: selector,
34-
fallbackCount: fallbackCount,
35-
}, nil
28+
selector: labelSelector,
29+
fallbackCount: 0,
30+
}
3631
}
3732

38-
// CountServers counts the number of leases in the apiserver matching the provided
33+
// Count counts the number of leases in the apiserver matching the provided
3934
// label selector.
4035
//
4136
// In the event that no valid leases are found or lease listing fails, the
4237
// fallback count is returned. This fallback count is updated upon successful
4338
// discovery of valid leases.
44-
func (lc *ServerLeaseCounter) CountServers() int {
39+
func (lc *ServerLeaseCounter) Count() int {
4540
// Since the number of proxy servers is generally small (1-10), we opted against
4641
// using a LIST and WATCH pattern and instead list all leases in the informer.
4742
// The informer still uses LIST and WATCH under the hood, so this doesn't result

pkg/servercounter/lease_counter_test.go renamed to pkg/agent/lease_counter_test.go

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package servercounter
1+
package agent
22

33
import (
44
"fmt"
@@ -160,18 +160,16 @@ func TestServerLeaseCounter(t *testing.T) {
160160
leaseListerError error
161161

162162
labelSelector string
163-
fallbackCount int
164163

165164
want int
166165
}{
167166
{
168-
name: "returns fallback count when no leases exist",
167+
name: "returns fallback count (0) when no leases exist",
169168
templates: []leaseTemplate{},
170169
labelSelector: "label=value",
171-
fallbackCount: 999,
172-
want: 999,
170+
want: 0,
173171
}, {
174-
name: "returns fallback count when no leases matching selector exist",
172+
name: "returns fallback count (0) when no leases matching selector exist",
175173
templates: []leaseTemplate{
176174
{
177175
durationSecs: 1000,
@@ -185,10 +183,9 @@ func TestServerLeaseCounter(t *testing.T) {
185183
},
186184
},
187185
labelSelector: "label=value",
188-
fallbackCount: 999,
189-
want: 999,
186+
want: 0,
190187
}, {
191-
name: "returns fallback count when no leases matching selector are still valid",
188+
name: "returns fallback count (0) when no leases matching selector are still valid",
192189
templates: []leaseTemplate{
193190
{
194191
durationSecs: 1000,
@@ -202,15 +199,13 @@ func TestServerLeaseCounter(t *testing.T) {
202199
},
203200
},
204201
labelSelector: "label=value",
205-
fallbackCount: 999,
206-
want: 999,
202+
want: 0,
207203
}, {
208-
name: "returns fallbackCount when LeaseLister returns an error",
204+
name: "returns fallback count (0) when LeaseLister returns an error",
209205
templates: []leaseTemplate{},
210206
labelSelector: "label=value",
211-
fallbackCount: 999,
212207
leaseListerError: fmt.Errorf("test error"),
213-
want: 999,
208+
want: 0,
214209
}, {
215210
name: "counts only valid leases matching label selector",
216211
templates: []leaseTemplate{
@@ -231,7 +226,6 @@ func TestServerLeaseCounter(t *testing.T) {
231226
},
232227
},
233228
labelSelector: "label=value",
234-
fallbackCount: 999,
235229
want: 2,
236230
},
237231
}
@@ -250,12 +244,10 @@ func TestServerLeaseCounter(t *testing.T) {
250244
}
251245
ct.Advance(time.Millisecond)
252246

253-
counter, err := NewServerLeaseCounter(lister, tc.labelSelector, tc.fallbackCount)
254-
if err != nil {
255-
t.Fatalf("server counter creation failed: %v", err)
256-
}
247+
selector, _ := labels.Parse(tc.labelSelector)
248+
counter := NewServerLeaseCounter(lister, selector)
257249

258-
got := counter.CountServers()
250+
got := counter.Count()
259251
if tc.want != got {
260252
t.Errorf("incorrect server count (got: %v, want: %v)", got, tc.want)
261253
}
@@ -286,30 +278,27 @@ func TestServerLeaseCounter_FallbackCount(t *testing.T) {
286278
err: fmt.Errorf("dummy lister error"),
287279
}
288280

289-
initialFallback := 999
290-
counter, err := NewServerLeaseCounter(lister, "label=value", initialFallback)
291-
if err != nil {
292-
t.Fatalf("server counter creation failed: %v", err)
293-
}
281+
selector, _ := labels.Parse("label=value")
282+
counter := NewServerLeaseCounter(lister, selector)
294283

295-
// First call should return fallback count of 999 because of lister error.
296-
got := counter.CountServers()
297-
if got != initialFallback {
298-
t.Errorf("lease counter did not return fallback count on lister error (got: %v, want: %v)", got, initialFallback)
284+
// First call should return fallback count of 0 because of lister error.
285+
got := counter.Count()
286+
if got != 0 {
287+
t.Errorf("lease counter did not return fallback count on lister error (got: %v, want: 0", got)
299288
}
300289

301290
// Second call should return the actual count (3) upon lister success.
302291
actualCount := 3
303292
lister.err = nil
304-
got = counter.CountServers()
293+
got = counter.Count()
305294
if got != actualCount {
306295
t.Errorf("lease counter did not return actual count on lister success (got: %v, want: %v)", got, actualCount)
307296
}
308297

309298
// Third call should return updated fallback count (3) upon lister failure.
310299
lister.err = fmt.Errorf("dummy lister error")
311300
lister.leases = append(lister.leases, newLeaseFromTemplate(validLease)) // Change actual count just in case.
312-
got = counter.CountServers()
301+
got = counter.Count()
313302
if got != actualCount {
314303
t.Errorf("lease counter did not update fallback count after lister success, returned incorrect count on subsequent lister error (got: %v, want: %v)", got, actualCount)
315304
}

pkg/agent/metrics/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type AgentMetrics struct {
5353
serverFailures *prometheus.CounterVec
5454
dialFailures *prometheus.CounterVec
5555
serverConnections *prometheus.GaugeVec
56+
serverCount *prometheus.GaugeVec
5657
endpointConnections *prometheus.GaugeVec
5758
streamPackets *prometheus.CounterVec
5859
streamErrors *prometheus.CounterVec
@@ -97,6 +98,15 @@ func newAgentMetrics() *AgentMetrics {
9798
},
9899
[]string{},
99100
)
101+
serverCount := prometheus.NewGaugeVec(
102+
prometheus.GaugeOpts{
103+
Namespace: Namespace,
104+
Subsystem: Subsystem,
105+
Name: "known_server_count",
106+
Help: "Current number of servers agent is trying to connect to.",
107+
},
108+
[]string{},
109+
)
100110
endpointConnections := prometheus.NewGaugeVec(
101111
prometheus.GaugeOpts{
102112
Namespace: Namespace,
@@ -115,6 +125,7 @@ func newAgentMetrics() *AgentMetrics {
115125
prometheus.MustRegister(endpointConnections)
116126
prometheus.MustRegister(streamPackets)
117127
prometheus.MustRegister(streamErrors)
128+
prometheus.MustRegister(serverCount)
118129
return &AgentMetrics{
119130
dialLatencies: dialLatencies,
120131
serverFailures: serverFailures,
@@ -123,6 +134,7 @@ func newAgentMetrics() *AgentMetrics {
123134
endpointConnections: endpointConnections,
124135
streamPackets: streamPackets,
125136
streamErrors: streamErrors,
137+
serverCount: serverCount,
126138
}
127139

128140
}
@@ -165,6 +177,10 @@ func (a *AgentMetrics) SetServerConnectionsCount(count int) {
165177
a.serverConnections.WithLabelValues().Set(float64(count))
166178
}
167179

180+
func (a *AgentMetrics) SetServerCount(count int) {
181+
a.serverCount.WithLabelValues().Set(float64(count))
182+
}
183+
168184
// EndpointConnectionInc increments a new endpoint connection.
169185
func (a *AgentMetrics) EndpointConnectionInc() {
170186
a.endpointConnections.WithLabelValues().Inc()

0 commit comments

Comments
 (0)