Skip to content

Commit 8ee0104

Browse files
authored
fix(collector): recover leader election after etcd or kubernetes API issues (#3861)
1 parent c18d674 commit 8ee0104

File tree

1 file changed

+27
-13
lines changed
  • collector/benthos/services/leaderelection

1 file changed

+27
-13
lines changed

collector/benthos/services/leaderelection/service.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,25 +124,39 @@ func (s *Service) Start(ctx context.Context) error {
124124

125125
s.resources.SetGeneric(IsLeaderKey, false)
126126

127-
lec := *s.leaderElectionConfig
128-
le, err := leaderelection.NewLeaderElector(lec)
129-
if err != nil {
130-
return fmt.Errorf("failed to create leader elector: %w", err)
131-
}
132-
133-
if lec.WatchDog != nil {
134-
lec.WatchDog.SetLeaderElection(le)
135-
}
136-
137127
ctx, cancel := context.WithCancel(ctx)
138128
s.cancel = cancel
129+
s.started = true
139130

140-
// Start leader election in a goroutine to make this non-blocking
141131
go func() {
142132
defer s.Stop(ctx)
143133

144-
s.started = true
145-
le.Run(ctx)
134+
for {
135+
lec := *s.leaderElectionConfig
136+
le, err := leaderelection.NewLeaderElector(lec)
137+
if err != nil {
138+
s.logger.Errorf("failed to create leader elector: %v", err)
139+
return
140+
}
141+
142+
if lec.WatchDog != nil {
143+
lec.WatchDog.SetLeaderElection(le)
144+
}
145+
146+
s.logger.Info("starting leader election loop")
147+
le.Run(ctx)
148+
149+
if ctx.Err() != nil {
150+
return
151+
}
152+
153+
s.logger.Info("leader election ended, retrying...")
154+
select {
155+
case <-time.After(s.config.LeaseRetryPeriod):
156+
case <-ctx.Done():
157+
return
158+
}
159+
}
146160
}()
147161

148162
return nil

0 commit comments

Comments
 (0)