@@ -14,15 +14,14 @@ See the License for the specific language governing permissions and
14
14
limitations under the License.
15
15
*/
16
16
17
- package nodelease
17
+ package lease
18
18
19
19
import (
20
20
"context"
21
21
"fmt"
22
22
"time"
23
23
24
24
coordinationv1 "k8s.io/api/coordination/v1"
25
- corev1 "k8s.io/api/core/v1"
26
25
apierrors "k8s.io/apimachinery/pkg/api/errors"
27
26
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
27
"k8s.io/apimachinery/pkg/util/clock"
@@ -35,71 +34,80 @@ import (
35
34
)
36
35
37
36
const (
38
- // renewIntervalFraction is the fraction of lease duration to renew the lease
39
- renewIntervalFraction = 0.25
40
- // maxUpdateRetries is the number of immediate, successive retries the Kubelet will attempt
37
+ // maxUpdateRetries is the number of immediate, successive retries the controller will attempt
41
38
// when renewing the lease before it waits for the renewal interval before trying again,
42
39
// similar to what we do for node status retries
43
40
maxUpdateRetries = 5
44
41
// maxBackoff is the maximum sleep time during backoff (e.g. in backoffEnsureLease)
45
42
maxBackoff = 7 * time .Second
46
43
)
47
44
48
- // Controller manages creating and renewing the lease for this Kubelet
45
+ // Controller manages creating and renewing the lease for this component (kube-apiserver, kubelet, etc.)
49
46
type Controller interface {
50
47
Run (stopCh <- chan struct {})
51
48
}
52
49
50
+ // ProcessLeaseFunc processes the given lease in-place
51
+ type ProcessLeaseFunc func (* coordinationv1.Lease ) error
52
+
53
53
type controller struct {
54
54
client clientset.Interface
55
55
leaseClient coordclientset.LeaseInterface
56
56
holderIdentity string
57
+ leaseNamespace string
57
58
leaseDurationSeconds int32
58
59
renewInterval time.Duration
59
60
clock clock.Clock
60
61
onRepeatedHeartbeatFailure func ()
61
62
62
- // latestLease is the latest node lease which Kubelet updated or created
63
+ // latestLease is the latest lease which the controller updated or created
63
64
latestLease * coordinationv1.Lease
65
+
66
+ // newLeasePostProcessFunc allows customizing a lease object (e.g. setting OwnerReference)
67
+ // before every time the lease is created/refreshed(updated). Note that an error will block
68
+ // a lease CREATE, causing the controller to retry next time, but an error won't block a
69
+ // lease UPDATE.
70
+ newLeasePostProcessFunc ProcessLeaseFunc
64
71
}
65
72
66
73
// NewController constructs and returns a controller
67
- func NewController (clock clock.Clock , client clientset.Interface , holderIdentity string , leaseDurationSeconds int32 , onRepeatedHeartbeatFailure func ()) Controller {
74
+ func NewController (clock clock.Clock , client clientset.Interface , holderIdentity string , leaseDurationSeconds int32 , onRepeatedHeartbeatFailure func (), renewInterval time. Duration , leaseNamespace string , newLeasePostProcessFunc ProcessLeaseFunc ) Controller {
68
75
var leaseClient coordclientset.LeaseInterface
69
76
if client != nil {
70
- leaseClient = client .CoordinationV1 ().Leases (corev1 . NamespaceNodeLease )
77
+ leaseClient = client .CoordinationV1 ().Leases (leaseNamespace )
71
78
}
72
- leaseDuration := time .Duration (leaseDurationSeconds ) * time .Second
73
79
return & controller {
74
80
client : client ,
75
81
leaseClient : leaseClient ,
76
82
holderIdentity : holderIdentity ,
83
+ leaseNamespace : leaseNamespace ,
77
84
leaseDurationSeconds : leaseDurationSeconds ,
78
- renewInterval : time . Duration ( float64 ( leaseDuration ) * renewIntervalFraction ) ,
85
+ renewInterval : renewInterval ,
79
86
clock : clock ,
80
87
onRepeatedHeartbeatFailure : onRepeatedHeartbeatFailure ,
88
+ newLeasePostProcessFunc : newLeasePostProcessFunc ,
81
89
}
82
90
}
83
91
84
92
// Run runs the controller
85
93
func (c * controller ) Run (stopCh <- chan struct {}) {
86
94
if c .leaseClient == nil {
87
- klog .Infof ("node lease controller has nil lease client, will not claim or renew leases" )
95
+ klog .Infof ("lease controller has nil lease client, will not claim or renew leases" )
88
96
return
89
97
}
90
98
wait .Until (c .sync , c .renewInterval , stopCh )
91
99
}
92
100
93
101
func (c * controller ) sync () {
94
102
if c .latestLease != nil {
95
- // As long as node lease is not (or very rarely) updated by any other agent than Kubelet ,
103
+ // As long as the lease is not (or very rarely) updated by any other agent than the component itself ,
96
104
// we can optimistically assume it didn't change since our last update and try updating
97
105
// based on the version from that time. Thanks to it we avoid GET call and reduce load
98
106
// on etcd and kube-apiserver.
99
107
// If at some point other agents will also be frequently updating the Lease object, this
100
108
// can result in performance degradation, because we will end up with calling additional
101
109
// GET/PUT - at this point this whole "if" should be removed.
102
- err := c .retryUpdateLease (c .newLease ( c . latestLease ) )
110
+ err := c .retryUpdateLease (c .latestLease )
103
111
if err == nil {
104
112
return
105
113
}
@@ -133,7 +141,7 @@ func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) {
133
141
break
134
142
}
135
143
sleep = minDuration (2 * sleep , maxBackoff )
136
- klog .Errorf ("failed to ensure node lease exists, will retry in %v, error: %v" , sleep , err )
144
+ klog .Errorf ("failed to ensure lease exists, will retry in %v, error: %v" , sleep , err )
137
145
// backoff wait
138
146
c .clock .Sleep (sleep )
139
147
}
@@ -146,11 +154,11 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) {
146
154
lease , err := c .leaseClient .Get (context .TODO (), c .holderIdentity , metav1.GetOptions {})
147
155
if apierrors .IsNotFound (err ) {
148
156
// lease does not exist, create it.
149
- leaseToCreate := c .newLease (nil )
150
- if len ( leaseToCreate . OwnerReferences ) == 0 {
151
- // We want to ensure that a lease will always have OwnerReferences set.
152
- // Thus, given that we weren't able to set it correctly, we simply
153
- // not create it this time - we will retry in the next iteration.
157
+ leaseToCreate , err := c .newLease (nil )
158
+ // An error occurred during allocating the new lease (likely from newLeasePostProcessFunc).
159
+ // Given that we weren't able to set the lease correctly, we simply
160
+ // not create it this time - we will retry in the next iteration.
161
+ if err != nil {
154
162
return nil , false , nil
155
163
}
156
164
lease , err := c .leaseClient .Create (context .TODO (), leaseToCreate , metav1.CreateOptions {})
@@ -170,12 +178,13 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) {
170
178
// call this once you're sure the lease has been created
171
179
func (c * controller ) retryUpdateLease (base * coordinationv1.Lease ) error {
172
180
for i := 0 ; i < maxUpdateRetries ; i ++ {
173
- lease , err := c .leaseClient .Update (context .TODO (), c .newLease (base ), metav1.UpdateOptions {})
181
+ leaseToUpdate , _ := c .newLease (base )
182
+ lease , err := c .leaseClient .Update (context .TODO (), leaseToUpdate , metav1.UpdateOptions {})
174
183
if err == nil {
175
184
c .latestLease = lease
176
185
return nil
177
186
}
178
- klog .Errorf ("failed to update node lease, error: %v" , err )
187
+ klog .Errorf ("failed to update lease, error: %v" , err )
179
188
// OptimisticLockError requires getting the newer version of lease to proceed.
180
189
if apierrors .IsConflict (err ) {
181
190
base , _ = c .backoffEnsureLease ()
@@ -185,20 +194,22 @@ func (c *controller) retryUpdateLease(base *coordinationv1.Lease) error {
185
194
c .onRepeatedHeartbeatFailure ()
186
195
}
187
196
}
188
- return fmt .Errorf ("failed %d attempts to update node lease" , maxUpdateRetries )
197
+ return fmt .Errorf ("failed %d attempts to update lease" , maxUpdateRetries )
189
198
}
190
199
191
200
// newLease constructs a new lease if base is nil, or returns a copy of base
192
201
// with desired state asserted on the copy.
193
- func (c * controller ) newLease (base * coordinationv1.Lease ) * coordinationv1.Lease {
202
+ // Note that an error will block lease CREATE, causing the CREATE to be retried in
203
+ // the next iteration; but the error won't block lease refresh (UPDATE).
204
+ func (c * controller ) newLease (base * coordinationv1.Lease ) (* coordinationv1.Lease , error ) {
194
205
// Use the bare minimum set of fields; other fields exist for debugging/legacy,
195
- // but we don't need to make node heartbeats more complicated by using them.
206
+ // but we don't need to make component heartbeats more complicated by using them.
196
207
var lease * coordinationv1.Lease
197
208
if base == nil {
198
209
lease = & coordinationv1.Lease {
199
210
ObjectMeta : metav1.ObjectMeta {
200
211
Name : c .holderIdentity ,
201
- Namespace : corev1 . NamespaceNodeLease ,
212
+ Namespace : c . leaseNamespace ,
202
213
},
203
214
Spec : coordinationv1.LeaseSpec {
204
215
HolderIdentity : pointer .StringPtr (c .holderIdentity ),
@@ -210,26 +221,12 @@ func (c *controller) newLease(base *coordinationv1.Lease) *coordinationv1.Lease
210
221
}
211
222
lease .Spec .RenewTime = & metav1.MicroTime {Time : c .clock .Now ()}
212
223
213
- // Setting owner reference needs node's UID. Note that it is different from
214
- // kubelet.nodeRef.UID. When lease is initially created, it is possible that
215
- // the connection between master and node is not ready yet. So try to set
216
- // owner reference every time when renewing the lease, until successful.
217
- if len (lease .OwnerReferences ) == 0 {
218
- if node , err := c .client .CoreV1 ().Nodes ().Get (context .TODO (), c .holderIdentity , metav1.GetOptions {}); err == nil {
219
- lease .OwnerReferences = []metav1.OwnerReference {
220
- {
221
- APIVersion : corev1 .SchemeGroupVersion .WithKind ("Node" ).Version ,
222
- Kind : corev1 .SchemeGroupVersion .WithKind ("Node" ).Kind ,
223
- Name : c .holderIdentity ,
224
- UID : node .UID ,
225
- },
226
- }
227
- } else {
228
- klog .Errorf ("failed to get node %q when trying to set owner ref to the node lease: %v" , c .holderIdentity , err )
229
- }
224
+ if c .newLeasePostProcessFunc != nil {
225
+ err := c .newLeasePostProcessFunc (lease )
226
+ return lease , err
230
227
}
231
228
232
- return lease
229
+ return lease , nil
233
230
}
234
231
235
232
func minDuration (a , b time.Duration ) time.Duration {
0 commit comments