11/*
22Copyright 2015 The Kubernetes Authors.
3-
43Licensed under the Apache License, Version 2.0 (the "License");
54you may not use this file except in compliance with the License.
65You may obtain a copy of the License at
7-
86 http://www.apache.org/licenses/LICENSE-2.0
9-
107Unless required by applicable law or agreed to in writing, software
118distributed under the License is distributed on an "AS IS" BASIS,
129WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -58,18 +55,16 @@ import (
5855 "fmt"
5956 "time"
6057
58+ log "github.com/sirupsen/logrus"
6159 "k8s.io/apimachinery/pkg/api/errors"
6260 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6361 "k8s.io/apimachinery/pkg/util/clock"
6462 "k8s.io/apimachinery/pkg/util/runtime"
6563 "k8s.io/apimachinery/pkg/util/wait"
6664 rl "k8s.io/client-go/tools/leaderelection/resourcelock"
67-
68- "k8s.io/klog"
6965)
7066
7167const (
72- // JitterFactor is a multiplier used to add jitter to leader renewal times.
7368 JitterFactor = 1.2
7469)
7570
@@ -107,8 +102,6 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
107102 return & le , nil
108103}
109104
110- // LeaderElectionConfig contains the settings associated with a leader
111- // election process.
112105type LeaderElectionConfig struct {
113106 // Lock is the resource that will be used for locking
114107 Lock rl.Interface
@@ -183,17 +176,17 @@ type LeaderElector struct {
183176
184177 // clock is wrapper around time to allow for less flaky testing
185178 clock clock.Clock
186-
187- // name is the name of the resource lock for debugging
188- name string
189179}
190180
191- // Run starts the leader election loop
181+ // Run starts the leader election loop. Run will not return
182+ // before leader election loop is stopped by ctx or it has
183+ // stopped holding the leader lease
192184func (le * LeaderElector ) Run (ctx context.Context ) {
185+ defer runtime .HandleCrash ()
193186 defer func () {
194- runtime .HandleCrash ()
195187 le .config .Callbacks .OnStoppedLeading ()
196188 }()
189+
197190 if ! le .acquire (ctx ) {
198191 return // ctx signalled done
199192 }
@@ -204,7 +197,8 @@ func (le *LeaderElector) Run(ctx context.Context) {
204197}
205198
206199// RunOrDie starts a client with the provided config or panics if the config
207- // fails to validate.
200+ // fails to validate. RunOrDie blocks until leader election loop is
201+ // stopped by ctx or it has stopped holding the leader lease
208202func RunOrDie (ctx context.Context , lec LeaderElectionConfig ) {
209203 le , err := NewLeaderElector (lec )
210204 if err != nil {
@@ -231,16 +225,16 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
231225 defer cancel ()
232226 succeeded := false
233227 desc := le .config .Lock .Describe ()
234- klog .Infof ("attempting to acquire leader lease %v..." , desc )
228+ log .Infof ("Attempting to acquire leader lease %v..." , desc )
235229 wait .JitterUntil (func () {
236- succeeded = le .tryAcquireOrRenew ()
230+ succeeded = le .tryAcquireOrRenew (ctx )
237231 le .maybeReportTransition ()
238232 if ! succeeded {
239- klog . V ( 4 ). Infof ("failed to acquire lease %v" , desc )
233+ log . Infof ("Failed to acquire lease %v" , desc )
240234 return
241235 }
242236 le .config .Lock .RecordEvent ("became leader" )
243- klog .Infof ("successfully acquired lease %v" , desc )
237+ log .Infof ("Successfully acquired lease %v" , desc )
244238 cancel ()
245239 }, le .config .RetryPeriod , JitterFactor , true , ctx .Done ())
246240 return succeeded
@@ -254,48 +248,40 @@ func (le *LeaderElector) renew(ctx context.Context) {
254248 timeoutCtx , timeoutCancel := context .WithTimeout (ctx , le .config .RenewDeadline )
255249 defer timeoutCancel ()
256250 err := wait .PollImmediateUntil (le .config .RetryPeriod , func () (bool , error ) {
257- done := make (chan bool , 1 )
258- go func () {
259- defer close (done )
260- done <- le .tryAcquireOrRenew ()
261- }()
262-
263- select {
264- case <- timeoutCtx .Done ():
265- return false , fmt .Errorf ("failed to tryAcquireOrRenew %s" , timeoutCtx .Err ())
266- case result := <- done :
267- return result , nil
268- }
251+ return le .tryAcquireOrRenew (timeoutCtx ), nil
269252 }, timeoutCtx .Done ())
270253
271254 le .maybeReportTransition ()
272255 desc := le .config .Lock .Describe ()
273256 if err == nil {
274- klog . V ( 5 ). Infof ( "successfully renewed lease %v" , desc )
257+ log . Debugf ( "Successfully renewed lease %v" , desc )
275258 return
276259 }
277260 le .config .Lock .RecordEvent ("stopped leading" )
278- klog .Infof ("failed to renew lease %v: %v" , desc , err )
261+ log .Infof ("Failed to renew lease %v: %v" , desc , err )
279262 cancel ()
280263 }, le .config .RetryPeriod , ctx .Done ())
281264
282265 // if we hold the lease, give it up
283266 if le .config .ReleaseOnCancel {
284- le .release ()
267+ le .release (ctx )
285268 }
286269}
287270
288271// release attempts to release the leader lease if we have acquired it.
289- func (le * LeaderElector ) release () bool {
272+ func (le * LeaderElector ) release (ctx context. Context ) bool {
290273 if ! le .IsLeader () {
291274 return true
292275 }
276+ now := metav1 .Now ()
293277 leaderElectionRecord := rl.LeaderElectionRecord {
294278 LeaderTransitions : le .observedRecord .LeaderTransitions ,
295- LeaseDurationSeconds : int (le .config .LeaseDuration .Seconds ()),
279+ LeaseDurationSeconds : 1 ,
280+ RenewTime : now ,
281+ AcquireTime : now ,
296282 }
297- if err := le .config .Lock .Update (leaderElectionRecord ); err != nil {
298- klog .Errorf ("Failed to release lock: %v" , err )
283+ if err := le .config .Lock .Update (ctx , leaderElectionRecord ); err != nil {
284+ log .Errorf ("Failed to release lock: %v" , err )
299285 return false
300286 }
301287 le .observedRecord = leaderElectionRecord
@@ -306,7 +292,7 @@ func (le *LeaderElector) release() bool {
306292// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
307293// else it tries to renew the lease if it has already been acquired. Returns true
308294// on success else returns false.
309- func (le * LeaderElector ) tryAcquireOrRenew () bool {
295+ func (le * LeaderElector ) tryAcquireOrRenew (ctx context. Context ) bool {
310296 now := metav1 .Now ()
311297 leaderElectionRecord := rl.LeaderElectionRecord {
312298 HolderIdentity : le .config .Lock .Identity (),
@@ -316,14 +302,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
316302 }
317303
318304 // 1. obtain or create the ElectionRecord
319- oldLeaderElectionRecord , oldLeaderElectionRawRecord , err := le .config .Lock .Get ()
305+ oldLeaderElectionRecord , oldLeaderElectionRawRecord , err := le .config .Lock .Get (ctx )
320306 if err != nil {
321307 if ! errors .IsNotFound (err ) {
322- klog .Errorf ("error retrieving resource lock %v: %v" , le .config .Lock .Describe (), err )
308+ log .Errorf ("Error retrieving resource lock %v: %v" , le .config .Lock .Describe (), err )
323309 return false
324310 }
325- if err = le .config .Lock .Create (leaderElectionRecord ); err != nil {
326- klog .Errorf ("error initially creating leader election record: %v" , err )
311+ if err = le .config .Lock .Create (ctx , leaderElectionRecord ); err != nil {
312+ log .Errorf ("Error initially creating leader election record: %v" , err )
327313 return false
328314 }
329315 le .observedRecord = leaderElectionRecord
@@ -337,10 +323,16 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
337323 le .observedRawRecord = oldLeaderElectionRawRecord
338324 le .observedTime = le .clock .Now ()
339325 }
326+
327+ // If the renew time is more than 2x the lease duration in the past, don't worry
328+ // about clock skew and just take the lock.
329+ thresholdTime := now .Time .Add (- 2 * le .config .LeaseDuration )
330+
340331 if len (oldLeaderElectionRecord .HolderIdentity ) > 0 &&
341332 le .observedTime .Add (le .config .LeaseDuration ).After (now .Time ) &&
333+ oldLeaderElectionRecord .RenewTime .Time .After (thresholdTime ) &&
342334 ! le .IsLeader () {
343- klog . V ( 4 ). Infof ("lock is held by %v and has not yet expired" , oldLeaderElectionRecord .HolderIdentity )
335+ log . Infof ("Lock is held by %v and has not yet expired" , oldLeaderElectionRecord .HolderIdentity )
344336 return false
345337 }
346338
@@ -354,8 +346,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
354346 }
355347
356348 // update the lock itself
357- if err = le .config .Lock .Update (leaderElectionRecord ); err != nil {
358- klog .Errorf ("Failed to update lock: %v" , err )
349+ if err = le .config .Lock .Update (ctx , leaderElectionRecord ); err != nil {
350+ log .Errorf ("Failed to update lock: %v" , err )
359351 return false
360352 }
361353
0 commit comments