Skip to content

Commit 0c774d0

Browse files
committed
Change PingTime to be persistent
1 parent a738daa commit 0c774d0

File tree

9 files changed

+94
-98
lines changed

9 files changed

+94
-98
lines changed

pkg/apis/coordination/types.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,12 @@ type LeaseCandidateSpec struct {
117117
// LeaseCandidate will respond by updating RenewTime.
118118
// +optional
119119
PingTime *metav1.MicroTime
120-
// RenewTime is the time that the LeaseCandidate was last updated.
121-
// Any time a Lease needs to do leader election, the PingTime field
122-
// is updated to signal to the LeaseCandidate that they should update
123-
// the RenewTime.
124-
// Old LeaseCandidate objects are also garbage collected if it has been hours
125-
// since the last renew. The PingTime field is updated regularly to prevent
126-
// garbage collection for still active LeaseCandidates.
120+
// RenewTime is the time that the LeaseCandidate was last updated. Any time
121+
// a Lease needs to do leader election, the PingTime field is updated to
122+
// signal to the LeaseCandidate that they should update the RenewTime. The
123+
// PingTime field is also updated regularly and LeaseCandidates must update
124+
// RenewTime to prevent garbage collection for still active LeaseCandidates.
125+
// Old LeaseCandidate objects are periodically garbage collected.
127126
// +optional
128127
RenewTime *metav1.MicroTime
129128
// BinaryVersion is the binary version. It must be in a semver format without leading `v`.

pkg/controlplane/apiserver/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
169169
)
170170
return func(ctx context.Context, workers int) {
171171
go controller.Run(ctx, workers)
172-
go gccontroller.Run(ctx.Done())
172+
go gccontroller.Run(ctx)
173173
}, err
174174
})
175175
return nil

pkg/controlplane/controller/leaderelection/leaderelection_controller.go

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
126126
c.enqueueLease(oldObj)
127127
},
128128
})
129-
130129
if err != nil {
131130
return nil, err
132131
}
@@ -141,7 +140,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
141140
c.enqueueCandidate(oldObj)
142141
},
143142
})
144-
145143
if err != nil {
146144
return nil, err
147145
}
@@ -179,7 +177,7 @@ func (c *Controller) enqueueCandidate(obj any) {
179177
return
180178
}
181179
// Ignore candidates that transitioned to Pending because reelection is already in progress
182-
if lc.Spec.PingTime != nil {
180+
if lc.Spec.PingTime != nil && lc.Spec.RenewTime.Before(lc.Spec.PingTime) {
183181
return
184182
}
185183
c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName})
@@ -205,6 +203,7 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
205203
return true, nil
206204
}
207205

206+
// every 15min enforce an election to update all candidates. Every 30min we garbage collect.
208207
for _, candidate := range candidates {
209208
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(leaseCandidateValidDuration/2).Before(time.Now()) {
210209
return true, nil
@@ -241,7 +240,6 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
241240
// PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election.
242241
// RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election.
243242
func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue time.Duration, err error) {
244-
now := time.Now()
245243

246244
candidates, err := c.listAdmissableCandidates(leaseNN)
247245
if err != nil {
@@ -254,17 +252,52 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
254252
// Check if an election is really needed by looking at the current lease and candidates
255253
needElection, err := c.electionNeeded(candidates, leaseNN)
256254
if !needElection {
257-
return noRequeue, err
255+
return defaultRequeueInterval, err
258256
}
259257
if err != nil {
260258
return defaultRequeueInterval, err
261259
}
262260

261+
now := time.Now()
262+
canVoteYet := true
263+
for _, candidate := range candidates {
264+
if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) &&
265+
candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Before(candidate.Spec.PingTime) {
266+
267+
// continue waiting for the election to timeout
268+
canVoteYet = false
269+
continue
270+
}
271+
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).After(now) {
272+
continue
273+
}
274+
275+
if candidate.Spec.PingTime == nil ||
276+
// If PingTime is outdated, send another PingTime only if it already acked the first one.
277+
(candidate.Spec.PingTime.Add(electionDuration).Before(now) && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime)) {
278+
// TODO(jefftree): We should randomize the order of sending pings and do them in parallel
279+
// so that all candidates have equal opportunity to ack.
280+
clone := candidate.DeepCopy()
281+
clone.Spec.PingTime = &metav1.MicroTime{Time: now}
282+
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
283+
if err != nil {
284+
return defaultRequeueInterval, err
285+
}
286+
canVoteYet = false
287+
}
288+
}
289+
if !canVoteYet {
290+
return defaultRequeueInterval, nil
291+
}
292+
263293
// election is ongoing as long as unexpired PingTimes exist
264-
atLeastOnePingExpired := false
265294
for _, candidate := range candidates {
266295
if candidate.Spec.PingTime == nil {
267-
continue
296+
continue // shouldn't be the case after the above
297+
}
298+
299+
if candidate.Spec.RenewTime != nil && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime) {
300+
continue // this has renewed already
268301
}
269302

270303
// If a candidate has a PingTime within the election duration, they have not acked
@@ -273,39 +306,6 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
273306
// continue waiting for the election to timeout
274307
return noRequeue, nil
275308
}
276-
277-
// election timed out without ack (for one of the candidate). Clear and start election.
278-
// TODO(sttts): this seems to be wrong. One candidate might get a lot more time to vote, while others are starving because they got a late ping. We have to give all of them a chance.
279-
atLeastOnePingExpired = true
280-
clone := candidate.DeepCopy()
281-
clone.Spec.PingTime = nil
282-
if _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil {
283-
return noRequeue, err
284-
}
285-
break
286-
}
287-
288-
if !atLeastOnePingExpired {
289-
continueElection := true
290-
for _, candidate := range candidates {
291-
// if renewTime of a candidate is longer ago than electionDuration old, we have to ping.
292-
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) {
293-
continueElection = false
294-
break
295-
}
296-
}
297-
if !continueElection {
298-
// Send an "are you alive" signal to all candidates
299-
for _, candidate := range candidates {
300-
clone := candidate.DeepCopy()
301-
clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()}
302-
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
303-
if err != nil {
304-
return noRequeue, err
305-
}
306-
}
307-
return defaultRequeueInterval, nil
308-
}
309309
}
310310

311311
var ackedCandidates []*v1alpha1.LeaseCandidate
@@ -398,7 +398,8 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
398398

399399
if reflect.DeepEqual(existing, orig) {
400400
klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, *existing.Spec.HolderIdentity)
401-
return noRequeue, nil
401+
// We need to requeue to ensure that we are aware of an expired lease
402+
return defaultRequeueInterval, nil
402403
}
403404

404405
_, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, existing, metav1.UpdateOptions{})

pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func TestReconcileElectionStep(t *testing.T) {
160160
EmulationVersion: "1.19.0",
161161
BinaryVersion: "1.19.0",
162162
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
163-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
163+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-4 * electionDuration))),
164164
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
165165
},
166166
},
@@ -173,6 +173,7 @@ func TestReconcileElectionStep(t *testing.T) {
173173
LeaseName: "component-A",
174174
EmulationVersion: "1.20.0",
175175
BinaryVersion: "1.20.0",
176+
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
176177
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
177178
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
178179
},
@@ -221,7 +222,7 @@ func TestReconcileElectionStep(t *testing.T) {
221222
expectedError: false,
222223
},
223224
{
224-
name: "candidates exist, lease exists, lease expired, 3rdparty strategy",
225+
name: "candidates exist, no acked candidates should return error",
225226
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
226227
candidates: []*v1alpha1.LeaseCandidate{
227228
{
@@ -233,33 +234,20 @@ func TestReconcileElectionStep(t *testing.T) {
233234
LeaseName: "component-A",
234235
EmulationVersion: "1.19.0",
235236
BinaryVersion: "1.19.0",
236-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
237-
PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"},
237+
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
238+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * time.Minute))),
239+
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
238240
},
239241
},
240242
},
241-
existingLease: &v1.Lease{
242-
ObjectMeta: metav1.ObjectMeta{
243-
Namespace: "default",
244-
Name: "component-A",
245-
Annotations: map[string]string{
246-
electedByAnnotationName: controllerName,
247-
},
248-
},
249-
Spec: v1.LeaseSpec{
250-
HolderIdentity: ptr.To("component-identity-expired"),
251-
LeaseDurationSeconds: ptr.To(int32(10)),
252-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
253-
},
254-
},
255-
expectLease: true,
256-
expectedHolderIdentity: ptr.To("component-identity-expired"),
257-
expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"),
258-
expectedRequeue: true,
259-
expectedError: false,
243+
existingLease: nil,
244+
expectLease: false,
245+
expectedHolderIdentity: nil,
246+
expectedRequeue: false,
247+
expectedError: true,
260248
},
261249
{
262-
name: "candidates exist, no acked candidates should return error",
250+
name: "candidates exist, should ping on election",
263251
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
264252
candidates: []*v1alpha1.LeaseCandidate{
265253
{
@@ -271,20 +259,21 @@ func TestReconcileElectionStep(t *testing.T) {
271259
LeaseName: "component-A",
272260
EmulationVersion: "1.19.0",
273261
BinaryVersion: "1.19.0",
274-
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
275-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
262+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
276263
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
277264
},
278265
},
279266
},
280267
existingLease: nil,
281268
expectLease: false,
282269
expectedHolderIdentity: nil,
283-
expectedRequeue: false,
284-
expectedError: true,
270+
expectedStrategy: nil,
271+
expectedRequeue: true,
272+
expectedError: false,
273+
candidatesPinged: true,
285274
},
286275
{
287-
name: "candidates exist, should ping on election",
276+
name: "candidate exist, pinged candidate should have until electionDuration until election decision is made",
288277
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
289278
candidates: []*v1alpha1.LeaseCandidate{
290279
{
@@ -296,21 +285,20 @@ func TestReconcileElectionStep(t *testing.T) {
296285
LeaseName: "component-A",
297286
EmulationVersion: "1.19.0",
298287
BinaryVersion: "1.19.0",
299-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
288+
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
289+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
300290
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
301291
},
302292
},
303293
},
304294
existingLease: nil,
305295
expectLease: false,
306296
expectedHolderIdentity: nil,
307-
expectedStrategy: nil,
308297
expectedRequeue: true,
309298
expectedError: false,
310-
candidatesPinged: true,
311299
},
312300
{
313-
name: "candidates exist, ping within electionDuration should cause no state change",
301+
name: "candidates exist, lease exists, lease expired, 3rdparty strategy",
314302
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
315303
candidates: []*v1alpha1.LeaseCandidate{
316304
{
@@ -322,17 +310,26 @@ func TestReconcileElectionStep(t *testing.T) {
322310
LeaseName: "component-A",
323311
EmulationVersion: "1.19.0",
324312
BinaryVersion: "1.19.0",
325-
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
326-
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
327-
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
313+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
314+
PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"},
328315
},
329316
},
330317
},
331-
existingLease: nil,
332-
expectLease: false,
333-
expectedHolderIdentity: nil,
334-
expectedStrategy: nil,
335-
expectedRequeue: false,
318+
existingLease: &v1.Lease{
319+
ObjectMeta: metav1.ObjectMeta{
320+
Namespace: "default",
321+
Name: "component-A",
322+
},
323+
Spec: v1.LeaseSpec{
324+
HolderIdentity: ptr.To("component-identity-expired"),
325+
LeaseDurationSeconds: ptr.To(int32(10)),
326+
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
327+
},
328+
},
329+
expectLease: true,
330+
expectedHolderIdentity: ptr.To("component-identity-expired"),
331+
expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"),
332+
expectedRequeue: true,
336333
expectedError: false,
337334
},
338335
}
@@ -683,7 +680,6 @@ func TestController(t *testing.T) {
683680
if err == nil {
684681
if lease.Spec.PingTime != nil {
685682
c := lease.DeepCopy()
686-
c.Spec.PingTime = nil
687683
c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
688684
_, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{})
689685
if err != nil {

pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *LeaseCandidateGCController) gc(ctx context.Context) {
8383
if !isLeaseCandidateExpired(leaseCandidate) {
8484
continue
8585
}
86-
lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(context.TODO(), leaseCandidate.Name, metav1.GetOptions{})
86+
lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(ctx, leaseCandidate.Name, metav1.GetOptions{})
8787
if err != nil {
8888
klog.ErrorS(err, "Error getting lc")
8989
continue

staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func NewCandidate(clientset kubernetes.Interface,
103103
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
104104
UpdateFunc: func(oldObj, newObj interface{}) {
105105
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
106-
if leasecandidate.Spec.PingTime != nil {
106+
if leasecandidate.Spec.PingTime != nil && leasecandidate.Spec.PingTime.After(leasecandidate.Spec.RenewTime.Time) {
107107
lc.enqueueLease()
108108
}
109109
}
@@ -177,7 +177,6 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
177177
klog.V(2).Infof("lease candidate exists. Renewing.")
178178
clone := lease.DeepCopy()
179179
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
180-
clone.Spec.PingTime = nil
181180
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})
182181
if err != nil {
183182
return err

staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *m
130130
if lc.Spec.BinaryVersion == tc.binaryVersion &&
131131
lc.Spec.EmulationVersion == tc.emulationVersion &&
132132
lc.Spec.LeaseName == tc.leaseName &&
133-
lc.Spec.PingTime == nil &&
134133
lc.Spec.RenewTime != nil {
135134
// Ensure that if a time is provided, the renewTime occurred after the provided time.
136135
if t != nil && t.After(lc.Spec.RenewTime.Time) {

0 commit comments

Comments
 (0)