Skip to content

Commit 4cf010a

Browse files
add mutex locks when multiple receivers setting callback functions (open-telemetry#41062)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description - In otel collector extensions are always started before receivers. This can lead to a scenario where `k8sleaderelector` extension has already started and became a leader, but the receiver has not finished setting up of callback functions. This causes that desired callback functions are not executed when the instance is leader. - Fix this by executing the late registered callbacks immediately if we are already a leader. - Add mutex locks so that list of callbacks are updated atomically <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#40346 <!--Describe what testing was performed and which tests were added.--> #### Testing - Added additional unit test to cover this scenario <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 2543b55 commit 4cf010a

File tree

3 files changed

+120
-9
lines changed

3 files changed

+120
-9
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type:
5+
bug_fix
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component:
8+
k8sleaderelector
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: handle late registration of receivers to k8sleaderelector
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40346]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

extension/k8sleaderelector/extension.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,27 @@ type LeaderElection interface {
2525
SetCallBackFuncs(StartCallback, StopCallback)
2626
}
2727

28+
type callBackFuncs struct {
29+
onStartLeading StartCallback
30+
onStopLeading StopCallback
31+
}
32+
2833
// SetCallBackFuncs set the functions that can be invoked when the leader wins or loss the election
2934
func (lee *leaderElectionExtension) SetCallBackFuncs(onStartLeading StartCallback, onStopLeading StopCallback) {
30-
lee.onStartedLeading = append(lee.onStartedLeading, onStartLeading)
31-
lee.onStoppedLeading = append(lee.onStoppedLeading, onStopLeading)
35+
// Have a write lock while setting the callbacks.
36+
lee.mu.Lock()
37+
defer lee.mu.Unlock()
38+
callBack := callBackFuncs{
39+
onStartLeading: onStartLeading,
40+
onStopLeading: onStopLeading,
41+
}
42+
43+
lee.callBackFuncs = append(lee.callBackFuncs, callBack)
44+
45+
if lee.isLeader {
46+
// Immediately invoke the callback since we are already leader
47+
onStartLeading(context.Background())
48+
}
3249
}
3350

3451
// leaderElectionExtension is the main struct implementing the extension's behavior.
@@ -40,21 +57,34 @@ type leaderElectionExtension struct {
4057
cancel context.CancelFunc
4158
waitGroup sync.WaitGroup
4259

43-
onStartedLeading []StartCallback
44-
onStoppedLeading []StopCallback
60+
callBackFuncs []callBackFuncs
61+
62+
isLeader bool
63+
64+
mu sync.Mutex
4565
}
4666

4767
// If the receiver sets a callback function then it would be invoked when the leader wins the election
4868
func (lee *leaderElectionExtension) startedLeading(ctx context.Context) {
49-
for _, callback := range lee.onStartedLeading {
50-
callback(ctx)
69+
// Have read lock so that we no new callbacks can be added while we are invoking the callbacks.
70+
lee.mu.Lock()
71+
defer lee.mu.Unlock()
72+
lee.isLeader = true
73+
for _, callback := range lee.callBackFuncs {
74+
callback.onStartLeading(ctx)
5175
}
5276
}
5377

5478
// If the receiver sets a callback function then it would be invoked when the leader loss the election
5579
func (lee *leaderElectionExtension) stoppedLeading() {
56-
for _, callback := range lee.onStoppedLeading {
57-
callback()
80+
// Have a read lock while stopping the receivers. This would make sure that if we have executed any onStartLeading callbacks
81+
// after becoming leader, we would execute the onStopLeading callbacks for them as well.
82+
lee.mu.Lock()
83+
defer lee.mu.Unlock()
84+
85+
lee.isLeader = false
86+
for _, callback := range lee.callBackFuncs {
87+
callback.onStopLeading()
5888
}
5989
}
6090

@@ -64,6 +94,7 @@ func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) e
6494

6595
ctx := context.Background()
6696
ctx, lee.cancel = context.WithCancel(ctx)
97+
6798
// Create the K8s leader elector
6899
leaderElector, err := newK8sLeaderElector(lee.config, lee.client, lee.startedLeading, lee.stoppedLeading, lee.leaseHolderID)
69100
if err != nil {
@@ -77,7 +108,6 @@ func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) e
77108
defer lee.waitGroup.Done()
78109
for {
79110
leaderElector.Run(ctx)
80-
81111
if ctx.Err() != nil {
82112
break
83113
}

extension/k8sleaderelector/extension_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,57 @@ func TestExtension(t *testing.T) {
7272
require.True(t, onStartLeadingInvoked.Load())
7373
require.NoError(t, leaderElection.Shutdown(ctx))
7474
}
75+
76+
func TestExtension_WithDelay(t *testing.T) {
77+
config := &Config{
78+
LeaseName: "foo",
79+
LeaseNamespace: "default",
80+
LeaseDuration: 15 * time.Second,
81+
RenewDuration: 10 * time.Second,
82+
RetryPeriod: 2 * time.Second,
83+
}
84+
85+
ctx := context.TODO()
86+
fakeClient := fake.NewClientset()
87+
config.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) {
88+
return fakeClient, nil
89+
}
90+
91+
observedZapCore, _ := observer.New(zap.WarnLevel)
92+
93+
leaderElection := leaderElectionExtension{
94+
config: config,
95+
client: fakeClient,
96+
logger: zap.New(observedZapCore),
97+
leaseHolderID: "foo",
98+
}
99+
100+
var onStartLeadingInvoked atomic.Bool
101+
102+
require.NoError(t, leaderElection.Start(ctx, componenttest.NewNopHost()))
103+
104+
// Simulate a delay of setting up callbacks after the leader has been elected.
105+
expectedLeaseDurationSeconds := ptr.To(int32(15))
106+
require.Eventually(t, func() bool {
107+
lease, err := fakeClient.CoordinationV1().Leases("default").Get(ctx, "foo", metav1.GetOptions{})
108+
require.NoError(t, err)
109+
require.NotNil(t, lease)
110+
require.NotNil(t, lease.Spec.AcquireTime)
111+
require.NotNil(t, lease.Spec.HolderIdentity)
112+
require.Equal(t, expectedLeaseDurationSeconds, lease.Spec.LeaseDurationSeconds)
113+
return true
114+
}, 10*time.Second, 100*time.Millisecond)
115+
116+
leaderElection.SetCallBackFuncs(
117+
func(_ context.Context) {
118+
onStartLeadingInvoked.Store(true)
119+
fmt.Printf("%v: LeaderElection started leading\n", time.Now().String())
120+
},
121+
func() {
122+
fmt.Printf("%v: LeaderElection stopped leading\n", time.Now().String())
123+
},
124+
)
125+
126+
require.True(t, onStartLeadingInvoked.Load())
127+
require.NoError(t, leaderElection.Shutdown(ctx))
128+
}

0 commit comments

Comments
 (0)