Skip to content

Commit 67a4d20

Browse files
authored
Merge pull request kubernetes#128505 from Jefftree/fix-cle-lock-acquisition
Fix CLE leader lock acquisition
2 parents 98b4ee6 + 1ede4d8 commit 67a4d20

File tree

3 files changed

+244
-47
lines changed

3 files changed

+244
-47
lines changed

pkg/controlplane/controller/leaderelection/run_with_leaderelection.go

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,20 @@ import (
2222
"time"
2323

2424
"k8s.io/apimachinery/pkg/util/uuid"
25+
"k8s.io/apimachinery/pkg/util/wait"
2526
"k8s.io/client-go/rest"
2627
"k8s.io/client-go/tools/leaderelection"
2728
"k8s.io/client-go/tools/leaderelection/resourcelock"
2829
"k8s.io/klog/v2"
2930
)
3031

32+
var (
33+
// TODO: Eventually these should be configurable
34+
LeaseDuration = 15 * time.Second
35+
RenewDeadline = 10 * time.Second
36+
RetryPeriod = 2 * time.Second
37+
)
38+
3139
type NewRunner func() (func(ctx context.Context, workers int), error)
3240

3341
// RunWithLeaderElection runs the provided runner function with leader election.
@@ -36,58 +44,56 @@ type NewRunner func() (func(ctx context.Context, workers int), error)
3644
// RunWithLeaderElection only returns when the context is done, or initial
3745
// leader election fails.
3846
func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) {
39-
var cancel context.CancelFunc
40-
41-
callbacks := leaderelection.LeaderCallbacks{
42-
OnStartedLeading: func(ctx context.Context) {
43-
ctx, cancel = context.WithCancel(ctx)
44-
var err error
45-
run, err := newRunnerFn()
46-
if err != nil {
47-
klog.Infof("Error creating runner: %v", err)
48-
return
49-
}
50-
run(ctx, 1)
51-
},
52-
OnStoppedLeading: func() {
53-
if cancel != nil {
54-
cancel()
55-
}
56-
},
57-
}
58-
5947
hostname, err := os.Hostname()
6048
if err != nil {
6149
klog.Infof("Error parsing hostname: %v", err)
6250
return
6351
}
52+
identity := hostname + "_" + string(uuid.NewUUID())
6453

65-
rl, err := resourcelock.NewFromKubeconfig(
66-
"leases",
67-
"kube-system",
68-
controllerName,
69-
resourcelock.ResourceLockConfig{
70-
Identity: hostname + "_" + string(uuid.NewUUID()),
71-
},
72-
config,
73-
10,
74-
)
75-
if err != nil {
76-
klog.Infof("Error creating resourcelock: %v", err)
77-
return
78-
}
54+
wait.Until(func() {
55+
callbacks := leaderelection.LeaderCallbacks{
56+
OnStartedLeading: func(ctx context.Context) {
57+
var err error
58+
run, err := newRunnerFn()
59+
if err != nil {
60+
klog.Infof("Error creating runner: %v", err)
61+
return
62+
}
63+
run(ctx, 1)
64+
},
65+
OnStoppedLeading: func() {
66+
},
67+
}
7968

80-
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
81-
Lock: rl,
82-
LeaseDuration: 15 * time.Second,
83-
RenewDeadline: 10 * time.Second,
84-
RetryPeriod: 2 * time.Second,
85-
Callbacks: callbacks,
86-
Name: controllerName,
87-
})
88-
if err != nil {
89-
klog.Infof("Error creating leader elector: %v", err)
90-
return
91-
}
92-
le.Run(ctx)
69+
rl, err := resourcelock.NewFromKubeconfig(
70+
"leases",
71+
"kube-system",
72+
controllerName,
73+
resourcelock.ResourceLockConfig{
74+
Identity: identity,
75+
},
76+
config,
77+
10,
78+
)
79+
if err != nil {
80+
klog.Infof("Error creating resourcelock: %v", err)
81+
return
82+
}
83+
84+
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
85+
Lock: rl,
86+
LeaseDuration: LeaseDuration,
87+
RenewDeadline: RenewDeadline,
88+
RetryPeriod: RetryPeriod,
89+
Callbacks: callbacks,
90+
Name: controllerName,
91+
ReleaseOnCancel: true,
92+
})
93+
if err != nil {
94+
klog.Infof("Error creating leader elector: %v", err)
95+
return
96+
}
97+
le.Run(ctx)
98+
}, RetryPeriod, ctx.Done())
9399
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
genericfeatures "k8s.io/apiserver/pkg/features"
29+
utilfeature "k8s.io/apiserver/pkg/util/feature"
30+
kubernetes "k8s.io/client-go/kubernetes"
31+
featuregatetesting "k8s.io/component-base/featuregate/testing"
32+
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
33+
"k8s.io/kubernetes/pkg/controlplane/controller/leaderelection"
34+
"k8s.io/kubernetes/test/integration/framework"
35+
"k8s.io/utils/ptr"
36+
)
37+
38+
func TestCoordinatedLeaderElectionLeaseTransfer(t *testing.T) {
39+
// Reset the coordinated leader election variables after the test
40+
defaultLeaseDuration := leaderelection.LeaseDuration
41+
defaultRenewDeadline := leaderelection.RenewDeadline
42+
defaultRetryPeriod := leaderelection.RetryPeriod
43+
defer func() {
44+
leaderelection.LeaseDuration = defaultLeaseDuration
45+
leaderelection.RenewDeadline = defaultRenewDeadline
46+
leaderelection.RetryPeriod = defaultRetryPeriod
47+
}()
48+
49+
// Use shorter interval for lease duration in integration test
50+
leaderelection.LeaseDuration = 5 * time.Second
51+
leaderelection.RenewDeadline = 3 * time.Second
52+
leaderelection.RetryPeriod = 2 * time.Second
53+
54+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
55+
etcd := framework.SharedEtcd()
56+
57+
server := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd)
58+
defer server.TearDownFn()
59+
60+
config := server.ClientConfig
61+
clientset := kubernetes.NewForConfigOrDie(config)
62+
63+
ctx, cancel := context.WithCancel(context.Background())
64+
defer cancel()
65+
66+
err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
67+
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
68+
if err != nil {
69+
fmt.Println(err)
70+
return false, nil
71+
}
72+
return lease.Spec.HolderIdentity != nil, nil
73+
})
74+
75+
if err != nil {
76+
t.Fatalf("timeout waiting for Lease %s %s err: %v", "leader-election-controller", "kube-system", err)
77+
}
78+
79+
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
84+
leaseName := *lease.Spec.HolderIdentity
85+
86+
server2 := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd)
87+
vap := &admissionregistrationv1.ValidatingAdmissionPolicy{
88+
ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"},
89+
Spec: admissionregistrationv1.ValidatingAdmissionPolicySpec{
90+
FailurePolicy: ptr.To(admissionregistrationv1.Fail),
91+
MatchConstraints: &admissionregistrationv1.MatchResources{
92+
ResourceRules: []admissionregistrationv1.NamedRuleWithOperations{
93+
{
94+
RuleWithOperations: admissionregistrationv1.RuleWithOperations{
95+
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.OperationAll},
96+
Rule: admissionregistrationv1.Rule{APIGroups: []string{"coordination.k8s.io"}, APIVersions: []string{"v1"}, Resources: []string{"leases"}},
97+
},
98+
},
99+
},
100+
},
101+
Validations: []admissionregistrationv1.Validation{{
102+
Expression: "object.spec.holderIdentity != '" + leaseName + "'",
103+
}},
104+
},
105+
}
106+
_, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Create(ctx, vap, metav1.CreateOptions{})
107+
if err != nil {
108+
t.Fatal(err)
109+
}
110+
111+
vapBinding := &admissionregistrationv1.ValidatingAdmissionPolicyBinding{
112+
ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"},
113+
Spec: admissionregistrationv1.ValidatingAdmissionPolicyBindingSpec{
114+
PolicyName: "cle-block-renewal",
115+
ValidationActions: []admissionregistrationv1.ValidationAction{
116+
admissionregistrationv1.Deny,
117+
},
118+
},
119+
}
120+
121+
_, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Create(ctx, vapBinding, metav1.CreateOptions{})
122+
if err != nil {
123+
t.Fatal(err)
124+
}
125+
126+
// Wait until the first apiserver releases the lease and second apiserver takes over the lock
127+
err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
128+
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
129+
if err != nil {
130+
fmt.Println(err)
131+
return false, nil
132+
}
133+
return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != leaseName, nil
134+
})
135+
if err != nil {
136+
t.Error("Expected the cle lease lock to transition to the second apiserver")
137+
}
138+
139+
// Shutdown the second apiserver
140+
server2.TearDownFn()
141+
142+
// Allow writes again from the first apiserver
143+
err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Delete(ctx, vap.Name, metav1.DeleteOptions{})
144+
if err != nil {
145+
t.Fatal(err)
146+
}
147+
err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Delete(ctx, vapBinding.Name, metav1.DeleteOptions{})
148+
if err != nil {
149+
t.Fatal(err)
150+
}
151+
152+
// Ensure that the first apiserver is able to reacquire the CLE leader lease
153+
err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
154+
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
155+
if err != nil {
156+
fmt.Println(err)
157+
return false, nil
158+
}
159+
return *lease.Spec.HolderIdentity == leaseName, nil
160+
})
161+
if err != nil {
162+
t.Error("Expected the cle lease lock to transition to the first apiserver")
163+
}
164+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/kubernetes/test/integration/framework"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
framework.EtcdMain(m.Run)
27+
}

0 commit comments

Comments
 (0)