Skip to content

Commit 7a6b3f9

Browse files
committed
Refactor to use k8s client directly instead of an informer
1 parent 00ced05 commit 7a6b3f9

File tree

4 files changed

+58
-49
lines changed

4 files changed

+58
-49
lines changed

cmd/agent/app/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
153153
return nil, fmt.Errorf("invalid server count lease selector: %w", err)
154154
}
155155
serverLeaseCounter := agent.NewServerLeaseCounter(
156-
sharedInformerFactory.Coordination().V1().Leases().Lister(),
156+
sharedInformerFactory,
157157
serverLeaseSelector,
158158
)
159159
if err != nil {

pkg/agent/clientset.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,15 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
184184
}
185185

186186
// sync makes sure that #clients >= #proxy servers
187-
func (cs *ClientSet) sync() {
187+
func (cs *ClientSet) sync(ctx context.Context) {
188188
defer cs.shutdown()
189189
backoff := cs.resetBackoff()
190190
var duration time.Duration
191191
for {
192-
if err := cs.connectOnce(); err != nil {
192+
if err := cs.connectOnce(ctx); err != nil {
193193
if dse, ok := err.(*DuplicateServerError); ok {
194194
clientsCount := cs.ClientsCount()
195-
serverCount := cs.ServerCount()
195+
serverCount := cs.ServerCount(ctx)
196196
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
197197
if serverCount != 0 && clientsCount >= serverCount {
198198
duration = backoff.Step()
@@ -217,10 +217,10 @@ func (cs *ClientSet) sync() {
217217
}
218218
}
219219

220-
func (cs *ClientSet) ServerCount() int {
220+
func (cs *ClientSet) ServerCount(ctx context.Context) int {
221221
var serverCount int
222222
if cs.serverCounter != nil {
223-
serverCount = cs.serverCounter.Count()
223+
serverCount = cs.serverCounter.Count(ctx)
224224
if serverCount == 0 {
225225
klog.Warningf("server lease counter could not find any leases")
226226
}
@@ -232,8 +232,8 @@ func (cs *ClientSet) ServerCount() int {
232232
return serverCount
233233
}
234234

235-
func (cs *ClientSet) connectOnce() error {
236-
serverCount := cs.ServerCount()
235+
func (cs *ClientSet) connectOnce(ctx context.Context) error {
236+
serverCount := cs.ServerCount(ctx)
237237

238238
if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
239239
return nil
@@ -263,7 +263,7 @@ func (cs *ClientSet) Serve() {
263263
"agentIdentifiers", cs.agentIdentifiers,
264264
"serverAddress", cs.address,
265265
)
266-
go runpprof.Do(context.Background(), labels, func(context.Context) { cs.sync() })
266+
go runpprof.Do(context.Background(), labels, func(ctx context.Context) { cs.sync(ctx) })
267267
}
268268

269269
func (cs *ClientSet) shutdown() {

pkg/agent/lease_counter.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
11
package agent
22

33
import (
4+
"context"
45
"time"
56

67
"k8s.io/apimachinery/pkg/labels"
8+
"k8s.io/client-go/kubernetes"
79
"k8s.io/klog/v2"
810

911
coordinationv1api "k8s.io/api/coordination/v1"
10-
coordinationv1listers "k8s.io/client-go/listers/coordination/v1"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
1114
)
1215

1316
var timeNow = time.Now
1417

1518
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
1619
// current proxy server count.
1720
type ServerLeaseCounter struct {
18-
lister coordinationv1listers.LeaseLister
21+
leaseClient coordinationv1.LeaseInterface
1922
selector labels.Selector
2023
fallbackCount int
2124
}
2225

2326
// NewServerLeaseCounter creates a server counter that counts valid leases that match the label
2427
// selector and provides the fallback count (initially 0) if this fails.
25-
func NewServerLeaseCounter(lister coordinationv1listers.LeaseLister, labelSelector labels.Selector) *ServerLeaseCounter {
28+
func NewServerLeaseCounter(k8sClient kubernetes.Interface, labelSelector labels.Selector) *ServerLeaseCounter {
2629
return &ServerLeaseCounter{
27-
lister: lister,
30+
leaseClient: k8sClient.CoordinationV1().Leases(""),
2831
selector: labelSelector,
2932
fallbackCount: 0,
3033
}
@@ -36,20 +39,17 @@ func NewServerLeaseCounter(lister coordinationv1listers.LeaseLister, labelSelect
3639
// In the event that no valid leases are found or lease listing fails, the
3740
// fallback count is returned. This fallback count is updated upon successful
3841
// discovery of valid leases.
39-
func (lc *ServerLeaseCounter) Count() int {
42+
func (lc *ServerLeaseCounter) Count(ctx context.Context) int {
4043
// Since the number of proxy servers is generally small (1-10), we opted against
41-
// using a LIST and WATCH pattern and instead list all leases in the informer.
42-
// The informer still uses LIST and WATCH under the hood, so this doesn't result
43-
// in additional calls in the apiserver, and checking whether a lease is valid
44-
// is cheap.
45-
leases, err := lc.lister.List(lc.selector)
44+
// using a LIST and WATCH pattern and instead list all leases on each call.
45+
leases, err := lc.leaseClient.List(ctx, metav1.ListOptions{LabelSelector: lc.selector.String()})
4646
if err != nil {
4747
klog.Errorf("could not list leases to update server count, using fallback count (%v): %v", lc.fallbackCount, err)
4848
return lc.fallbackCount
4949
}
5050

5151
count := 0
52-
for _, lease := range leases {
52+
for _, lease := range leases.Items {
5353
if isLeaseValid(lease) {
5454
count++
5555
} else {
@@ -67,7 +67,7 @@ func (lc *ServerLeaseCounter) Count() int {
6767
return count
6868
}
6969

70-
func isLeaseValid(lease *coordinationv1api.Lease) bool {
70+
func isLeaseValid(lease coordinationv1api.Lease) bool {
7171
var lastRenewTime time.Time
7272
if lease.Spec.RenewTime != nil {
7373
lastRenewTime = lease.Spec.RenewTime.Time

pkg/agent/lease_counter_test.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package agent
22

33
import (
4+
"context"
45
"fmt"
56
"testing"
67
"time"
78

9+
"github.com/google/uuid"
10+
"k8s.io/apimachinery/pkg/labels"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/client-go/kubernetes/fake"
13+
k8stesting "k8s.io/client-go/testing"
14+
815
coordinationv1 "k8s.io/api/coordination/v1"
916
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/apimachinery/pkg/labels"
1117
coordinationv1listers "k8s.io/client-go/listers/coordination/v1"
1218
)
1319

@@ -34,6 +40,7 @@ func newLeaseFromTemplate(template leaseTemplate) *coordinationv1.Lease {
3440
lease := &coordinationv1.Lease{
3541
TypeMeta: metav1.TypeMeta{},
3642
ObjectMeta: metav1.ObjectMeta{
43+
Name: uuid.New().String(),
3744
Labels: template.labels,
3845
},
3946
Spec: coordinationv1.LeaseSpec{},
@@ -103,7 +110,7 @@ func TestIsLeaseValid(t *testing.T) {
103110
t.Run(tc.name, func(t *testing.T) {
104111
lease := newLeaseFromTemplate(tc.template)
105112

106-
got := isLeaseValid(lease)
113+
got := isLeaseValid(*lease)
107114
if got != tc.want {
108115
t.Errorf("incorrect lease validity (got: %v, want: %v)", got, tc.want)
109116
}
@@ -234,20 +241,18 @@ func TestServerLeaseCounter(t *testing.T) {
234241
t.Run(tc.name, func(t *testing.T) {
235242
ct := &controlledTime{t: time.Unix(10000000, 0)}
236243
timeNow = ct.Now
237-
leases := make([]*coordinationv1.Lease, len(tc.templates))
244+
leases := make([]runtime.Object, len(tc.templates))
238245
for i, template := range tc.templates {
239246
leases[i] = newLeaseFromTemplate(template)
240247
}
241-
lister := &fakeLeaseLister{
242-
leases: leases,
243-
err: tc.leaseListerError,
244-
}
245248
ct.Advance(time.Millisecond)
246249

250+
k8sClient := fake.NewSimpleClientset(leases...)
247251
selector, _ := labels.Parse(tc.labelSelector)
248-
counter := NewServerLeaseCounter(lister, selector)
249252

250-
got := counter.Count()
253+
counter := NewServerLeaseCounter(k8sClient, selector)
254+
255+
got := counter.Count(context.Background())
251256
if tc.want != got {
252257
t.Errorf("incorrect server count (got: %v, want: %v)", got, tc.want)
253258
}
@@ -269,37 +274,41 @@ func TestServerLeaseCounter_FallbackCount(t *testing.T) {
269274

270275
ct := &controlledTime{t: time.Unix(1000, 0)}
271276
timeNow = ct.Now
272-
leases := []*coordinationv1.Lease{}
273-
leases = append(leases, newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(invalidLease))
277+
leases := []runtime.Object{newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(invalidLease)}
274278
ct.Advance(time.Millisecond)
275279

276-
lister := &fakeLeaseLister{
277-
leases: leases,
278-
err: fmt.Errorf("dummy lister error"),
279-
}
280+
k8sClient := fake.NewSimpleClientset(leases...)
281+
callShouldFail := true
280282

281283
selector, _ := labels.Parse("label=value")
282-
counter := NewServerLeaseCounter(lister, selector)
283284

284-
// First call should return fallback count of 0 because of lister error.
285-
got := counter.Count()
285+
counter := NewServerLeaseCounter(k8sClient, selector)
286+
287+
// First call should return fallback count of 0 because of leaseClient error.
288+
k8sClient.PrependReactor("*", "*", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
289+
if callShouldFail {
290+
return true, nil, fmt.Errorf("dummy lease client error")
291+
}
292+
return false, nil, nil
293+
})
294+
ctx := context.Background()
295+
got := counter.Count(ctx)
286296
if got != 0 {
287-
t.Errorf("lease counter did not return fallback count on lister error (got: %v, want: 0", got)
297+
t.Errorf("lease counter did not return fallback count on leaseClient error (got: %v, want: 0)", got)
288298
}
289299

290-
// Second call should return the actual count (3) upon lister success.
300+
// Second call should return the actual count (3) upon leaseClient success.
301+
callShouldFail = false
291302
actualCount := 3
292-
lister.err = nil
293-
got = counter.Count()
303+
got = counter.Count(ctx)
294304
if got != actualCount {
295-
t.Errorf("lease counter did not return actual count on lister success (got: %v, want: %v)", got, actualCount)
305+
t.Errorf("lease counter did not return actual count on leaseClient success (got: %v, want: %v)", got, actualCount)
296306
}
297307

298-
// Third call should return updated fallback count (3) upon lister failure.
299-
lister.err = fmt.Errorf("dummy lister error")
300-
lister.leases = append(lister.leases, newLeaseFromTemplate(validLease)) // Change actual count just in case.
301-
got = counter.Count()
308+
// Third call should return updated fallback count (3) upon leaseClient failure.
309+
callShouldFail = true
310+
got = counter.Count(ctx)
302311
if got != actualCount {
303-
t.Errorf("lease counter did not update fallback count after lister success, returned incorrect count on subsequent lister error (got: %v, want: %v)", got, actualCount)
312+
t.Errorf("lease counter did not update fallback count after leaseClient success, returned incorrect count on subsequent leaseClient error (got: %v, want: %v)", got, actualCount)
304313
}
305314
}

0 commit comments

Comments
 (0)