@@ -25,10 +25,27 @@ type LeaderElection interface {
2525 SetCallBackFuncs (StartCallback , StopCallback )
2626}
2727
28+ type callBackFuncs struct {
29+ onStartLeading StartCallback
30+ onStopLeading StopCallback
31+ }
32+
2833// SetCallBackFuncs set the functions that can be invoked when the leader wins or loss the election
2934func (lee * leaderElectionExtension ) SetCallBackFuncs (onStartLeading StartCallback , onStopLeading StopCallback ) {
30- lee .onStartedLeading = append (lee .onStartedLeading , onStartLeading )
31- lee .onStoppedLeading = append (lee .onStoppedLeading , onStopLeading )
35+ // Have a write lock while setting the callbacks.
36+ lee .mu .Lock ()
37+ defer lee .mu .Unlock ()
38+ callBack := callBackFuncs {
39+ onStartLeading : onStartLeading ,
40+ onStopLeading : onStopLeading ,
41+ }
42+
43+ lee .callBackFuncs = append (lee .callBackFuncs , callBack )
44+
45+ if lee .isLeader {
46+ // Immediately invoke the callback since we are already leader
47+ onStartLeading (context .Background ())
48+ }
3249}
3350
3451// leaderElectionExtension is the main struct implementing the extension's behavior.
@@ -40,21 +57,34 @@ type leaderElectionExtension struct {
4057 cancel context.CancelFunc
4158 waitGroup sync.WaitGroup
4259
43- onStartedLeading []StartCallback
44- onStoppedLeading []StopCallback
60+ callBackFuncs []callBackFuncs
61+
62+ isLeader bool
63+
64+ mu sync.Mutex
4565}
4666
4767// If the receiver sets a callback function then it would be invoked when the leader wins the election
4868func (lee * leaderElectionExtension ) startedLeading (ctx context.Context ) {
49- for _ , callback := range lee .onStartedLeading {
50- callback (ctx )
69+ // Have read lock so that we no new callbacks can be added while we are invoking the callbacks.
70+ lee .mu .Lock ()
71+ defer lee .mu .Unlock ()
72+ lee .isLeader = true
73+ for _ , callback := range lee .callBackFuncs {
74+ callback .onStartLeading (ctx )
5175 }
5276}
5377
5478// If the receiver sets a callback function then it would be invoked when the leader loss the election
5579func (lee * leaderElectionExtension ) stoppedLeading () {
56- for _ , callback := range lee .onStoppedLeading {
57- callback ()
80+ // Have a read lock while stopping the receivers. This would make sure that if we have executed any onStartLeading callbacks
81+ // after becoming leader, we would execute the onStopLeading callbacks for them as well.
82+ lee .mu .Lock ()
83+ defer lee .mu .Unlock ()
84+
85+ lee .isLeader = false
86+ for _ , callback := range lee .callBackFuncs {
87+ callback .onStopLeading ()
5888 }
5989}
6090
@@ -64,6 +94,7 @@ func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) e
6494
6595 ctx := context .Background ()
6696 ctx , lee .cancel = context .WithCancel (ctx )
97+
6798 // Create the K8s leader elector
6899 leaderElector , err := newK8sLeaderElector (lee .config , lee .client , lee .startedLeading , lee .stoppedLeading , lee .leaseHolderID )
69100 if err != nil {
@@ -77,7 +108,6 @@ func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) e
77108 defer lee .waitGroup .Done ()
78109 for {
79110 leaderElector .Run (ctx )
80-
81111 if ctx .Err () != nil {
82112 break
83113 }
0 commit comments