Skip to content

Commit e25e539

Browse files
Merge pull request #4893 from justinsb/fix_resource_watch_terminate_on_delete
fix: resourcewatcher should terminate on delete
2 parents b4a7d67 + 27be245 commit e25e539

File tree

9 files changed

+95
-35
lines changed

9 files changed

+95
-35
lines changed

pkg/controller/dcl/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ func (r *Reconciler) handleUnresolvableDeps(ctx context.Context, resource *k8s.R
411411
ctx, cancel := context.WithTimeout(ctx, timeoutPeriod)
412412
defer cancel()
413413
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
414-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
414+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
415415
logger.Error(err, "error while waiting for resource's reference to be ready")
416416
return
417417
}

pkg/controller/direct/directbase/directbase_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ func (r *reconcileContext) handleUnresolvableDeps(ctx context.Context, policy *u
550550
ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
551551
defer cancel()
552552
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
553-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
553+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
554554
logger.Error(err, "error while waiting for resource's reference to be ready")
555555
return
556556
}

pkg/controller/iam/auditconfig/iamauditconfig_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ func (r *reconcileContext) handleUnresolvableDeps(auditConfig *iamv1beta1.IAMAud
372372
ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
373373
defer cancel()
374374
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
375-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
375+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
376376
logger.Error(err, "error while waiting for resource's reference to be ready")
377377
return
378378
}

pkg/controller/iam/partialpolicy/iampartialpolicy_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func (r *reconcileContext) handleUnresolvableDeps(policy *iamv1beta1.IAMPartialP
438438
ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
439439
defer cancel()
440440
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
441-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
441+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
442442
logger.Error(err, "error while waiting for resource's reference to be ready")
443443
return
444444
}

pkg/controller/iam/policy/iampolicy_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (r *reconcileContext) handleUnresolvableDeps(policy *iamv1beta1.IAMPolicy,
371371
ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
372372
defer cancel()
373373
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
374-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
374+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
375375
logger.Error(err, "error while waiting for resource's reference to be ready")
376376
return
377377
}

pkg/controller/iam/policymember/iampolicymember_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (r *reconcileContext) handleUnresolvableDeps(policyMember *iamv1beta1.IAMPo
383383
ctx, cancel := context.WithTimeout(context.TODO(), timeoutPeriod)
384384
defer cancel()
385385
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
386-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
386+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
387387
logger.Error(err, "error while waiting for resource's reference to be ready")
388388
return
389389
}

pkg/controller/resourcewatcher/resourcewatcher.go

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,19 @@ func NewWithClient(dynamicClient dynamic.Interface, logger logr.Logger) *Resourc
5353
}
5454
}
5555

56-
// WaitForResourceToBeReady waits for the resource identified by the given GVK
57-
// and NamespacedName. It blocks until the resource is ready, an error occurs, or a context
56+
// WaitForResourceToBeReadyOrDeleted waits for the resource identified by the given GVK
57+
// and NamespacedName. It blocks until the resource is ready or deleted, an error occurs, or a context
5858
// cancellation occurs. Note that a nil return value signifies that the resource is ready and
5959
// no errors have occurred.
60-
func (r *ResourceWatcher) WaitForResourceToBeReady(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) error {
60+
func (r *ResourceWatcher) WaitForResourceToBeReadyOrDeleted(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) error {
6161
logger := r.logger.WithValues("resource", nn, "resourceGVK", gvk)
6262
watch, err := r.WatchResource(ctx, nn, gvk)
6363
if err != nil {
6464
return err
6565
}
6666
defer watch.Stop()
6767
logger.Info("successfully created watch on resource")
68-
return WaitForResourceToBeReadyViaWatch(ctx, watch, logger)
68+
return WaitForResourceToBeReadyOrDeletedViaWatch(ctx, watch, logger)
6969
}
7070

7171
// WatchResource creates a watch on a resource identified by the given GVK and NamespacedName.
@@ -79,25 +79,41 @@ func (r *ResourceWatcher) WatchResource(ctx context.Context, nn types.Namespaced
7979
return watch, nil
8080
}
8181

82-
// WaitForResourceToBeReadyViaWatch monitors a given 'Watch' for any
82+
// WaitForResourceToBeReadyOrDeletedViaWatch monitors a given 'Watch' for any
8383
// updates to the resource that the given 'Watch' is targeting. Note that
8484
// an error is returned to signify a failure during the 'Watch' process,
85-
// while nil is returned to signify the watched resource is ready.
86-
func WaitForResourceToBeReadyViaWatch(ctx context.Context, watch watch.Interface, logger logr.Logger) error {
85+
// while nil is returned to signify the watched resource is ready or deleted.
86+
func WaitForResourceToBeReadyOrDeletedViaWatch(ctx context.Context, w watch.Interface, logger logr.Logger) error {
8787
for {
8888
select {
8989
case <-ctx.Done():
9090
return fmt.Errorf("context was cancelled: %w", ctx.Err())
91-
case event, ok := <-watch.ResultChan():
91+
case event, ok := <-w.ResultChan():
9292
if !ok {
9393
return fmt.Errorf("watch channel was closed")
9494
}
95-
ok, reason, err := isResourceReady(event)
95+
if event.Type == watch.Bookmark {
96+
continue // ignore
97+
}
98+
if event.Type == watch.Deleted {
99+
logger.Info("resource has been deleted; triggering watch completion")
100+
return nil
101+
}
102+
if event.Type != watch.Modified && event.Type != watch.Added {
103+
return fmt.Errorf("unexpected watch event type %v", event.Type)
104+
}
105+
106+
u, ok := event.Object.(*unstructured.Unstructured)
107+
if !ok {
108+
return fmt.Errorf("error casting event object '%v' of kind '%v' to unstructured", event.Object, event.Object.GetObjectKind())
109+
}
110+
111+
isReady, err := isResourceReady(u)
96112
if err != nil {
97113
return fmt.Errorf("error checking if resource is ready: %w", err)
98114
}
99-
if !ok {
100-
logger.Info("resource not ready", "reason", reason)
115+
if !isReady {
116+
logger.Info("resource not ready")
101117
continue
102118
}
103119
logger.Info("resource is ready")
@@ -107,27 +123,19 @@ func WaitForResourceToBeReadyViaWatch(ctx context.Context, watch watch.Interface
107123
}
108124

109125
// isResourceReady returns whether a resource identified by the given GVK
110-
// and NamespacedName is ready. Note that a 'reason' for failure is returned only
111-
// when the resource is not ready and no fatal error has occurred.
112-
func isResourceReady(event watch.Event) (ok bool, reason string, err error) {
113-
if event.Type != watch.Modified && event.Type != watch.Added {
114-
return false, fmt.Sprintf("got watch event of type '%v', want event type '%v' or '%v'", event.Type, watch.Modified, watch.Added), nil
115-
}
116-
u, ok := event.Object.(*unstructured.Unstructured)
117-
if !ok {
118-
return false, "", fmt.Errorf("error casting event object '%v' of kind '%v' to unstructured", event.Object, event.Object.GetObjectKind())
119-
}
126+
// and NamespacedName is ready.
127+
func isResourceReady(u *unstructured.Unstructured) (isReady bool, err error) {
120128
resource, err := k8s.NewResource(u)
121129
if err != nil {
122-
return false, "", fmt.Errorf("error converting unstructured to resource: %w", err)
130+
return false, fmt.Errorf("error converting unstructured to resource: %w", err)
123131
}
124132
// Secrets don't have a 'ready' condition. As long as they can be
125133
// found on the API server, we consider them ready as resources.
126134
if resource.Kind == "Secret" {
127-
return true, "", nil
135+
return true, nil
128136
}
129137
if !k8s.IsResourceReady(resource) {
130-
return false, "resource not ready", nil
138+
return false, nil
131139
}
132-
return true, "", nil
140+
return true, nil
133141
}

pkg/controller/resourcewatcher/resourcewatcher_test.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestWatchResourceTimeout(t *testing.T) {
4848
}
4949
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
5050
defer cancel()
51-
if err := resourcewatcher.WaitForResourceToBeReadyViaWatch(ctx, watch, logger); !errors.Is(err, context.DeadlineExceeded) {
51+
if err := resourcewatcher.WaitForResourceToBeReadyOrDeletedViaWatch(ctx, watch, logger); !errors.Is(err, context.DeadlineExceeded) {
5252
t.Fatalf("got error '%v', expected '%v'", err, context.DeadlineExceeded)
5353
}
5454
}
@@ -68,15 +68,67 @@ func TestWatchResourceSuccess(t *testing.T) {
6868
if _, err := fake.Resource(k8s.ToGVR(gvk)).
6969
Namespace(nn.Namespace).
7070
Create(context.TODO(), readyResourceUnstructured, metav1.CreateOptions{}); err != nil {
71-
t.Fatal(err)
71+
t.Fatalf("creating resource: %v", err)
7272
}
7373
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
7474
defer cancel()
75-
if err := resourcewatcher.WaitForResourceToBeReadyViaWatch(ctx, watch, logger); err != nil {
75+
if err := resourcewatcher.WaitForResourceToBeReadyOrDeletedViaWatch(ctx, watch, logger); err != nil {
7676
t.Fatalf("got unexpected error: %v", err)
7777
}
7878
}
7979

80+
func TestWatchResourceDeleted(t *testing.T) {
81+
ctx := context.TODO()
82+
83+
notReadyResourceUnstructured := newResourceUnstructured(newStatus(corev1.ConditionFalse))
84+
gvk, nn, err := getResourceInformation(notReadyResourceUnstructured)
85+
if err != nil {
86+
t.Fatalf("got unexpected error from getResourceInformation: %v", err)
87+
}
88+
fake := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())
89+
logger := log.Log.WithName("resourcewatcher-test-success")
90+
watch, err := resourcewatcher.NewWithClient(fake, logger).WatchResource(ctx, nn, gvk)
91+
if err != nil {
92+
t.Fatalf("got unexpected error from WatchResource: %v", err)
93+
}
94+
if _, err := fake.Resource(k8s.ToGVR(gvk)).
95+
Namespace(nn.Namespace).
96+
Create(ctx, notReadyResourceUnstructured, metav1.CreateOptions{}); err != nil {
97+
t.Fatalf("creating resource: %v", err)
98+
}
99+
100+
// We want to ensure that the watch waits for the resource to be deleted.
101+
deleted := false
102+
103+
errChan := make(chan error, 1)
104+
go func() {
105+
ctx, cancel := context.WithTimeout(ctx, time.Minute)
106+
defer cancel()
107+
if err := resourcewatcher.WaitForResourceToBeReadyOrDeletedViaWatch(ctx, watch, logger); err != nil {
108+
errChan <- fmt.Errorf("got unexpected error: %w", err)
109+
return
110+
}
111+
112+
if !deleted {
113+
errChan <- errors.New("wait finished before resource was deleted")
114+
return
115+
}
116+
117+
errChan <- nil
118+
}()
119+
120+
if err := fake.Resource(k8s.ToGVR(gvk)).
121+
Namespace(nn.Namespace).
122+
Delete(ctx, nn.Name, metav1.DeleteOptions{}); err != nil {
123+
t.Fatalf("deleting resource: %v", err)
124+
}
125+
deleted = true
126+
127+
if err := <-errChan; err != nil {
128+
t.Fatalf("error from watch: %v", err)
129+
}
130+
}
131+
80132
func newResourceUnstructured(status map[string]interface{}) *unstructured.Unstructured {
81133
u := &unstructured.Unstructured{
82134
Object: map[string]interface{}{

pkg/controller/tf/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (r *Reconciler) handleUnresolvableDeps(ctx context.Context, resource *k8s.R
460460
ctx, cancel := context.WithTimeout(ctx, timeoutPeriod)
461461
defer cancel()
462462
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
463-
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
463+
if err := watcher.WaitForResourceToBeReadyOrDeleted(ctx, refNN, refGVK); err != nil {
464464
logger.Error(err, "error while waiting for resource's reference to be ready")
465465
return
466466
}

0 commit comments

Comments
 (0)