From ca34c4f8da97b2dd09a6a686c33b111867266b56 Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Thu, 4 Jul 2024 22:02:47 +0700 Subject: [PATCH 1/8] fix leader election bug --- pkg/manager/internal.go | 95 +++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 51 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 2ce02b105c..eb73cc8472 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -351,6 +351,39 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: cm.resourceLock, + LeaseDuration: cm.leaseDuration, + RenewDeadline: cm.renewDeadline, + RetryPeriod: cm.retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } + close(cm.elected) + }, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, + }, + ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, + Name: cm.leaderElectionID, + }) + if err != nil { + return err + } + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) @@ -433,19 +466,22 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel - go func() { - if cm.resourceLock != nil { - if err := cm.startLeaderElection(ctx); err != nil { - cm.errChan <- err - } - } else { + if cm.resourceLock != nil { + // Start the leader elector process + go func() { + leaderElector.Run(ctx) + <-ctx.Done() + close(cm.leaderElectionStopped) + }() + } else { + go func() { // Treat not having leader election enabled the same as being elected. if err := cm.startLeaderElectionRunnables(); err != nil { cm.errChan <- err } close(cm.elected) - } - }() + }() + } } ready = true @@ -568,49 +604,6 @@ func (cm *controllerManager) startLeaderElectionRunnables() error { return cm.runnables.LeaderElection.Start(cm.internalCtx) } -func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) { - l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: cm.resourceLock, - LeaseDuration: cm.leaseDuration, - RenewDeadline: cm.renewDeadline, - RetryPeriod: cm.retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - if err := cm.startLeaderElectionRunnables(); err != nil { - cm.errChan <- err - return - } - close(cm.elected) - }, - OnStoppedLeading: func() { - if cm.onStoppedLeading != nil { - cm.onStoppedLeading() - } - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - }, - }, - ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, - Name: cm.leaderElectionID, - }) - if err != nil { - return err - } - - // Start the leader elector process - go func() { - l.Run(ctx) - <-ctx.Done() - close(cm.leaderElectionStopped) - }() - return nil -} - func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } From a5bde5d027fa6c55dc38a696a3554898bdd4315b Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Thu, 4 Jul 2024 22:44:05 +0700 Subject: [PATCH 2/8] fix tests --- pkg/manager/internal.go | 66 ++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index eb73cc8472..aef4f83cf0 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -351,39 +351,6 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) - leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: cm.resourceLock, - LeaseDuration: cm.leaseDuration, - RenewDeadline: cm.renewDeadline, - RetryPeriod: cm.retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - if err := cm.startLeaderElectionRunnables(); err != nil { - cm.errChan <- err - return - } - close(cm.elected) - }, - OnStoppedLeading: func() { - if cm.onStoppedLeading != nil { - cm.onStoppedLeading() - } - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - }, - }, - ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, - Name: cm.leaderElectionID, - }) - if err != nil { - return err - } - // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) @@ -467,6 +434,39 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel if cm.resourceLock != nil { + leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: cm.resourceLock, + LeaseDuration: cm.leaseDuration, + RenewDeadline: cm.renewDeadline, + RetryPeriod: cm.retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } + close(cm.elected) + }, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, + }, + ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, + Name: cm.leaderElectionID, + }) + if err != nil { + return fmt.Errorf("failed during initialization leader election process: %w", err) + } + // Start the leader elector process go func() { leaderElector.Run(ctx) From 48659d949d2badd42d8876096b0ee3d667b50bec Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Thu, 4 Jul 2024 23:30:22 +0700 Subject: [PATCH 3/8] fix --- pkg/manager/internal.go | 71 +++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index aef4f83cf0..8182969e6d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -351,6 +351,42 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + var leaderElector *leaderelection.LeaderElector + if cm.resourceLock != nil { + leaderElector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: cm.resourceLock, + LeaseDuration: cm.leaseDuration, + RenewDeadline: cm.renewDeadline, + RetryPeriod: cm.retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } + close(cm.elected) + }, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, + }, + ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, + Name: cm.leaderElectionID, + }) + if err != nil { + return fmt.Errorf("failed during initialization leader election process: %w", err) + } + } + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) @@ -433,40 +469,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel - if cm.resourceLock != nil { - leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: cm.resourceLock, - LeaseDuration: cm.leaseDuration, - RenewDeadline: cm.renewDeadline, - RetryPeriod: cm.retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - if err := cm.startLeaderElectionRunnables(); err != nil { - cm.errChan <- err - return - } - close(cm.elected) - }, - OnStoppedLeading: func() { - if cm.onStoppedLeading != nil { - cm.onStoppedLeading() - } - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - }, - }, - ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, - Name: cm.leaderElectionID, - }) - if err != nil { - return fmt.Errorf("failed during initialization leader election process: %w", err) - } - + if leaderElector != nil { // Start the leader elector process go func() { leaderElector.Run(ctx) From 49d60ff1b4e898fa7c99372b523c519ecadde0cb Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Sun, 7 Jul 2024 02:04:11 +0700 Subject: [PATCH 4/8] add function initLeaderElector --- pkg/manager/internal.go | 67 +++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 8182969e6d..d2f8c4bac3 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -353,35 +353,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { var leaderElector *leaderelection.LeaderElector if cm.resourceLock != nil { - leaderElector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: cm.resourceLock, - LeaseDuration: cm.leaseDuration, - RenewDeadline: cm.renewDeadline, - RetryPeriod: cm.retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - if err := cm.startLeaderElectionRunnables(); err != nil { - cm.errChan <- err - return - } - close(cm.elected) - }, - OnStoppedLeading: func() { - if cm.onStoppedLeading != nil { - cm.onStoppedLeading() - } - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - }, - }, - ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, - Name: cm.leaderElectionID, - }) + leaderElector, err = cm.initLeaderElector() if err != nil { return fmt.Errorf("failed during initialization leader election process: %w", err) } @@ -603,6 +575,43 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e return nil } +func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, error) { + leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: cm.resourceLock, + LeaseDuration: cm.leaseDuration, + RenewDeadline: cm.renewDeadline, + RetryPeriod: cm.retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } + close(cm.elected) + }, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, + }, + ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, + Name: cm.leaderElectionID, + }) + if err != nil { + return nil, err + } + + return leaderElector, nil +} + func (cm *controllerManager) startLeaderElectionRunnables() error { return cm.runnables.LeaderElection.Start(cm.internalCtx) } From 0c35ede8333150c8e08cc6e45227f646eb3a37dc Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Sun, 7 Jul 2024 17:44:26 +0700 Subject: [PATCH 5/8] add test case for Start function bug --- pkg/manager/manager_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 1683013b3f..c345308fc0 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1165,6 +1165,22 @@ var _ = Describe("manger.Manager", func() { cm.onStoppedLeading = func() {} }, ) + + It("should return an error if leader election param incorrect", func() { + renewDeadline := time.Second * 20 + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionID: "controller-runtime", + LeaderElectionNamespace: "default", + newResourceLock: fakeleaderelection.NewResourceLock, + RenewDeadline: &renewDeadline, + }) + Expect(err).NotTo(HaveOccurred()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + Expect(m.Start(ctx)).To(HaveOccurred()) + Expect(errors.Is(err, context.DeadlineExceeded)).NotTo(BeTrue()) + }) }) Context("should start serving metrics", func() { From fc17886d6be5e24f8c220d5b46e0a56d8cf767cd Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Sun, 7 Jul 2024 17:51:54 +0700 Subject: [PATCH 6/8] add comment for new leader elector code --- pkg/manager/internal.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index d2f8c4bac3..fd5122a9f8 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -351,6 +351,8 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + // Leader elector must create before defer which contains engageStopProcedure function + // https://github.com/kubernetes-sigs/controller-runtime/issues/2873 var leaderElector *leaderelection.LeaderElector if cm.resourceLock != nil { leaderElector, err = cm.initLeaderElector() From 79d1b52ae38c9a60ffd59a07a58da42578bb5b49 Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Sun, 7 Jul 2024 17:54:13 +0700 Subject: [PATCH 7/8] fix comment --- pkg/manager/internal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index fd5122a9f8..cdbbcb4235 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -351,7 +351,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) - // Leader elector must create before defer which contains engageStopProcedure function + // Leader elector must be created before defer that contains engageStopProcedure function // https://github.com/kubernetes-sigs/controller-runtime/issues/2873 var leaderElector *leaderelection.LeaderElector if cm.resourceLock != nil { From 463a03bb57f497729fdaf36ebff21d319bebd0a9 Mon Sep 17 00:00:00 2001 From: Alexseij <79832610507@yandex.ru> Date: Sun, 7 Jul 2024 18:01:23 +0700 Subject: [PATCH 8/8] fix test --- pkg/manager/manager_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index c345308fc0..b3383edf88 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1178,7 +1178,8 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - Expect(m.Start(ctx)).To(HaveOccurred()) + err = m.Start(ctx) + Expect(err).To(HaveOccurred()) Expect(errors.Is(err, context.DeadlineExceeded)).NotTo(BeTrue()) }) })