Skip to content

Commit ac05e1a

Browse files
authored
Merge pull request kubernetes#130533 from Henrywu573/parall
Parallelize lease candidate ping
2 parents 309c4c1 + 5917343 commit ac05e1a

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

pkg/controlplane/controller/leaderelection/leaderelection_controller.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"reflect"
2323
"time"
2424

25+
"golang.org/x/sync/errgroup"
2526
v1 "k8s.io/api/coordination/v1"
2627
v1alpha2 "k8s.io/api/coordination/v1alpha2"
2728
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -264,6 +265,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
264265

265266
now := c.clock.Now()
266267
canVoteYet := true
268+
g, gCtx := errgroup.WithContext(ctx)
267269
for _, candidate := range candidates {
268270
if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) &&
269271
candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Before(candidate.Spec.PingTime) {
@@ -280,17 +282,18 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
280282
// If PingTime is outdated, send another PingTime only if it already acked the first one.
281283
// This checks for pingTime <= renewTime because equality is possible in unit tests using a fake clock.
282284
(candidate.Spec.PingTime.Add(electionDuration).Before(now) && !candidate.Spec.RenewTime.Before(candidate.Spec.PingTime)) {
283-
// TODO(jefftree): We should randomize the order of sending pings and do them in parallel
284-
// so that all candidates have equal opportunity to ack.
285285
clone := candidate.DeepCopy()
286286
clone.Spec.PingTime = &metav1.MicroTime{Time: now}
287-
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
288-
if err != nil {
289-
return defaultRequeueInterval, err
290-
}
287+
g.Go(func() error {
288+
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(gCtx, clone, metav1.UpdateOptions{})
289+
return err
290+
})
291291
canVoteYet = false
292292
}
293293
}
294+
if err := g.Wait(); err != nil {
295+
return defaultRequeueInterval, err
296+
}
294297
if !canVoteYet {
295298
return defaultRequeueInterval, nil
296299
}

0 commit comments

Comments
 (0)