Skip to content

Commit 08e1cea

Browse files
authored
Merge pull request kubernetes#123099 from MikeSpreitzer/update-sample-controller
Update the sample controller
2 parents 8d0ee91 + fdbf0bb commit 08e1cea

File tree

2 files changed

+60
-67
lines changed

2 files changed

+60
-67
lines changed

staging/src/k8s.io/sample-controller/controller.go

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ const (
6161
// MessageResourceSynced is the message used for an Event fired when a Foo
6262
// is synced successfully
6363
MessageResourceSynced = "Foo synced successfully"
64+
// FieldManager distinguishes this controller from other things writing to API objects
65+
FieldManager = controllerAgentName
6466
)
6567

6668
// Controller is the controller implementation for Foo resources
@@ -80,7 +82,7 @@ type Controller struct {
8082
// means we can ensure we only process a fixed amount of resources at a
8183
// time, and makes it easy to ensure we are never processing the same item
8284
// simultaneously in two different workers.
83-
workqueue workqueue.TypedRateLimitingInterface[string]
85+
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
8486
// recorder is an event recorder for recording Event resources to the
8587
// Kubernetes API.
8688
recorder record.EventRecorder
@@ -106,8 +108,8 @@ func NewController(
106108
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
107109
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
108110
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
109-
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
110-
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
111+
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
112+
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
111113
)
112114

113115
controller := &Controller{
@@ -196,64 +198,56 @@ func (c *Controller) runWorker(ctx context.Context) {
196198
// processNextWorkItem will read a single work item off the workqueue and
197199
// attempt to process it, by calling the syncHandler.
198200
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
199-
obj, shutdown := c.workqueue.Get()
201+
objRef, shutdown := c.workqueue.Get()
200202
logger := klog.FromContext(ctx)
201203

202204
if shutdown {
203205
return false
204206
}
205207

206-
// We wrap this block in a func so we can defer c.workqueue.Done.
207-
err := func() error {
208-
// We call Done here so the workqueue knows we have finished
209-
// processing this item. We also must remember to call Forget if we
210-
// do not want this work item being re-queued. For example, we do
211-
// not call Forget if a transient error occurs, instead the item is
212-
// put back on the workqueue and attempted again after a back-off
213-
// period.
214-
defer c.workqueue.Done(obj)
215-
// Run the syncHandler, passing it the namespace/name string of the
216-
// Foo resource to be synced.
217-
if err := c.syncHandler(ctx, obj); err != nil {
218-
// Put the item back on the workqueue to handle any transient errors.
219-
c.workqueue.AddRateLimited(obj)
220-
return fmt.Errorf("error syncing '%s': %s, requeuing", obj, err.Error())
221-
}
222-
// Finally, if no error occurs we Forget this item so it does not
208+
// We call Done at the end of this func so the workqueue knows we have
209+
// finished processing this item. We also must remember to call Forget
210+
// if we do not want this work item being re-queued. For example, we do
211+
// not call Forget if a transient error occurs, instead the item is
212+
// put back on the workqueue and attempted again after a back-off
213+
// period.
214+
defer c.workqueue.Done(objRef)
215+
216+
// Run the syncHandler, passing it the structured reference to the object to be synced.
217+
err := c.syncHandler(ctx, objRef)
218+
if err == nil {
219+
// If no error occurs then we Forget this item so it does not
223220
// get queued again until another change happens.
224-
c.workqueue.Forget(obj)
225-
logger.Info("Successfully synced", "resourceName", obj)
226-
return nil
227-
}()
228-
229-
if err != nil {
230-
utilruntime.HandleError(err)
221+
c.workqueue.Forget(objRef)
222+
logger.Info("Successfully synced", "objectName", objRef)
231223
return true
232224
}
233-
225+
// there was a failure so be sure to report it. This method allows for
226+
// pluggable error handling which can be used for things like
227+
// cluster-monitoring.
228+
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
229+
// since we failed, we should requeue the item to work on later. This
230+
// method will add a backoff to avoid hotlooping on particular items
231+
// (they're probably still not going to work right away) and overall
232+
// controller protection (everything I've done is broken, this controller
233+
// needs to calm down or it can starve other useful work) cases.
234+
c.workqueue.AddRateLimited(objRef)
234235
return true
235236
}
236237

237238
// syncHandler compares the actual state with the desired, and attempts to
238239
// converge the two. It then updates the Status block of the Foo resource
239240
// with the current status of the resource.
240-
func (c *Controller) syncHandler(ctx context.Context, key string) error {
241-
// Convert the namespace/name string into a distinct namespace and name
242-
logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key)
243-
244-
namespace, name, err := cache.SplitMetaNamespaceKey(key)
245-
if err != nil {
246-
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
247-
return nil
248-
}
241+
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
242+
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
249243

250244
// Get the Foo resource with this namespace/name
251-
foo, err := c.foosLister.Foos(namespace).Get(name)
245+
foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
252246
if err != nil {
253247
// The Foo resource may no longer exist, in which case we stop
254248
// processing.
255249
if errors.IsNotFound(err) {
256-
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
250+
utilruntime.HandleErrorWithContext(ctx, err, "Foo referenced by item in work queue no longer exists", "objectReference", objectRef)
257251
return nil
258252
}
259253

@@ -265,15 +259,15 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
265259
// We choose to absorb the error here as the worker would requeue the
266260
// resource otherwise. Instead, the next time the resource is updated
267261
// the resource will be queued again.
268-
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
262+
utilruntime.HandleErrorWithContext(ctx, nil, "Deployment name missing from object reference", "objectReference", objectRef)
269263
return nil
270264
}
271265

272266
// Get the deployment with the name specified in Foo.spec
273267
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
274268
// If the resource doesn't exist, we'll create it
275269
if errors.IsNotFound(err) {
276-
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
270+
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
277271
}
278272

279273
// If an error occurs during Get/Create, we'll requeue the item so we can
@@ -296,7 +290,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
296290
// should update the Deployment resource.
297291
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
298292
logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
299-
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
293+
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
300294
}
301295

302296
// If an error occurs during Update, we'll requeue the item so we can
@@ -327,21 +321,20 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
327321
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
328322
// UpdateStatus will not allow changes to the Spec of the resource,
329323
// which is ideal for ensuring nothing other than resource status has been updated.
330-
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{})
324+
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{FieldManager: FieldManager})
331325
return err
332326
}
333327

334328
// enqueueFoo takes a Foo resource and converts it into a namespace/name
335329
// string which is then put onto the work queue. This method should *not* be
336330
// passed resources of any type other than Foo.
337331
func (c *Controller) enqueueFoo(obj interface{}) {
338-
var key string
339-
var err error
340-
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
332+
if objectRef, err := cache.ObjectToName(obj); err != nil {
341333
utilruntime.HandleError(err)
342334
return
335+
} else {
336+
c.workqueue.Add(objectRef)
343337
}
344-
c.workqueue.Add(key)
345338
}
346339

347340
// handleObject will take any resource implementing metav1.Object and attempt
@@ -356,12 +349,16 @@ func (c *Controller) handleObject(obj interface{}) {
356349
if object, ok = obj.(metav1.Object); !ok {
357350
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
358351
if !ok {
359-
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
352+
// If the object value is not too big and does not contain sensitive information then
353+
// it may be useful to include it.
354+
utilruntime.HandleErrorWithContext(context.Background(), nil, "Error decoding object, invalid type", "type", fmt.Sprintf("%T", obj))
360355
return
361356
}
362357
object, ok = tombstone.Obj.(metav1.Object)
363358
if !ok {
364-
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
359+
// If the object value is not too big and does not contain sensitive information then
360+
// it may be useful to include it.
361+
utilruntime.HandleErrorWithContext(context.Background(), nil, "Error decoding object tombstone, invalid type", "type", fmt.Sprintf("%T", tombstone.Obj))
365362
return
366363
}
367364
logger.V(4).Info("Recovered deleted object", "resourceName", object.GetName())

staging/src/k8s.io/sample-controller/controller_test.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,22 @@ func (f *fixture) newController(ctx context.Context) (*Controller, informers.Sha
108108
return c, i, k8sI
109109
}
110110

111-
func (f *fixture) run(ctx context.Context, fooName string) {
112-
f.runController(ctx, fooName, true, false)
111+
func (f *fixture) run(ctx context.Context, fooRef cache.ObjectName) {
112+
f.runController(ctx, fooRef, true, false)
113113
}
114114

115-
func (f *fixture) runExpectError(ctx context.Context, fooName string) {
116-
f.runController(ctx, fooName, true, true)
115+
func (f *fixture) runExpectError(ctx context.Context, fooRef cache.ObjectName) {
116+
f.runController(ctx, fooRef, true, true)
117117
}
118118

119-
func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) {
119+
func (f *fixture) runController(ctx context.Context, fooRef cache.ObjectName, startInformers bool, expectError bool) {
120120
c, i, k8sI := f.newController(ctx)
121121
if startInformers {
122122
i.Start(ctx.Done())
123123
k8sI.Start(ctx.Done())
124124
}
125125

126-
err := c.syncHandler(ctx, fooName)
126+
err := c.syncHandler(ctx, fooRef)
127127
if !expectError && err != nil {
128128
f.t.Errorf("error syncing foo: %v", err)
129129
} else if expectError && err == nil {
@@ -240,13 +240,9 @@ func (f *fixture) expectUpdateFooStatusAction(foo *samplecontroller.Foo) {
240240
f.actions = append(f.actions, action)
241241
}
242242

243-
func getKey(foo *samplecontroller.Foo, t *testing.T) string {
244-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(foo)
245-
if err != nil {
246-
t.Errorf("Unexpected error getting key for foo %v: %v", foo.Name, err)
247-
return ""
248-
}
249-
return key
243+
func getRef(foo *samplecontroller.Foo, t *testing.T) cache.ObjectName {
244+
ref := cache.MetaObjectToName(foo)
245+
return ref
250246
}
251247

252248
func TestCreatesDeployment(t *testing.T) {
@@ -261,7 +257,7 @@ func TestCreatesDeployment(t *testing.T) {
261257
f.expectCreateDeploymentAction(expDeployment)
262258
f.expectUpdateFooStatusAction(foo)
263259

264-
f.run(ctx, getKey(foo, t))
260+
f.run(ctx, getRef(foo, t))
265261
}
266262

267263
func TestDoNothing(t *testing.T) {
@@ -277,7 +273,7 @@ func TestDoNothing(t *testing.T) {
277273
f.kubeobjects = append(f.kubeobjects, d)
278274

279275
f.expectUpdateFooStatusAction(foo)
280-
f.run(ctx, getKey(foo, t))
276+
f.run(ctx, getRef(foo, t))
281277
}
282278

283279
func TestUpdateDeployment(t *testing.T) {
@@ -298,7 +294,7 @@ func TestUpdateDeployment(t *testing.T) {
298294

299295
f.expectUpdateFooStatusAction(foo)
300296
f.expectUpdateDeploymentAction(expDeployment)
301-
f.run(ctx, getKey(foo, t))
297+
f.run(ctx, getRef(foo, t))
302298
}
303299

304300
func TestNotControlledByUs(t *testing.T) {
@@ -315,7 +311,7 @@ func TestNotControlledByUs(t *testing.T) {
315311
f.deploymentLister = append(f.deploymentLister, d)
316312
f.kubeobjects = append(f.kubeobjects, d)
317313

318-
f.runExpectError(ctx, getKey(foo, t))
314+
f.runExpectError(ctx, getRef(foo, t))
319315
}
320316

321317
func int32Ptr(i int32) *int32 { return &i }

0 commit comments

Comments
 (0)