@@ -19,6 +19,7 @@ import (
19
19
"errors"
20
20
"sync"
21
21
22
+ "golang.org/x/exp/maps"
22
23
"k8s.io/apimachinery/pkg/api/meta"
23
24
"k8s.io/apimachinery/pkg/labels"
24
25
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -147,7 +148,7 @@ func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersion
147
148
defer m .mux .Unlock ()
148
149
m .watching = true
149
150
150
- klog .V (3 ).Infof ("AddWatches(%v)" , gvkMap )
151
+ klog .V (3 ).Infof ("AddWatches(%v)" , maps . Keys ( gvkMap ) )
151
152
152
153
var startedWatches uint64
153
154
@@ -165,7 +166,12 @@ func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersion
165
166
if _ , err := m .mapper .RESTMapping (gvk .GroupKind (), gvk .Version ); err != nil {
166
167
switch {
167
168
case meta .IsNoMatchError (err ):
168
- klog .Infof ("Remediator skipped adding watch for resource %v: %v: resource watch will be started after apply is successful" , gvk , err )
169
+ statusErr := syncerclient .ConflictWatchResourceDoesNotExist (err , gvk )
170
+ klog .Infof ("Remediator skipped starting resource watch: " +
171
+ "%v. The remediator will start the resource watch after the sync has succeeded." , statusErr )
172
+ // This is expected behavior before a sync attempt.
173
+ // It likely means a CR and CRD are in the same ApplySet.
174
+ // So don't record a resource conflict metric or return an error here.
169
175
default :
170
176
errs = status .Append (errs , status .APIServerErrorWrap (err ))
171
177
}
@@ -180,9 +186,9 @@ func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersion
180
186
}
181
187
182
188
if startedWatches > 0 {
183
- klog .Infof ("The remediator made new progress: started %d new watches" , startedWatches )
189
+ klog .Infof ("Remediator started %d new watches" , startedWatches )
184
190
} else {
185
- klog .V (4 ).Infof ("The remediator made no new progress " )
191
+ klog .V (4 ).Infof ("Remediator watches unchanged " )
186
192
}
187
193
return errs
188
194
}
@@ -200,7 +206,7 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers
200
206
defer m .mux .Unlock ()
201
207
m .watching = true
202
208
203
- klog .V (3 ).Infof ("UpdateWatches(%v)" , gvkMap )
209
+ klog .V (3 ).Infof ("UpdateWatches(%v)" , maps . Keys ( gvkMap ) )
204
210
205
211
m .needsUpdate = false
206
212
@@ -228,8 +234,15 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers
228
234
if _ , err := m .mapper .RESTMapping (gvk .GroupKind (), gvk .Version ); err != nil {
229
235
switch {
230
236
case meta .IsNoMatchError (err ):
237
+ statusErr := syncerclient .ConflictWatchResourceDoesNotExist (err , gvk )
238
+ klog .Warningf ("Remediator encountered a resource conflict: " +
239
+ "%v. To resolve the conflict, the remediator will enqueue a resync " +
240
+ "and restart the resource watch after the sync has succeeded." , statusErr )
241
+ // This is unexpected behavior after a successful sync.
242
+ // It likely means that some other controller deleted managed objects shortly after they were applied.
243
+ // So record a resource conflict metric and return an error.
231
244
metrics .RecordResourceConflict (ctx , commit )
232
- errs = status .Append (errs , syncerclient . ConflictWatchResourceDoesNotExist ( err , gvk ) )
245
+ errs = status .Append (errs , statusErr )
233
246
default :
234
247
errs = status .Append (errs , status .APIServerErrorWrap (err ))
235
248
}
@@ -244,9 +257,9 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers
244
257
}
245
258
246
259
if startedWatches > 0 || stoppedWatches > 0 {
247
- klog .Infof ("The remediator made new progress: started %d new watches, and stopped %d watches" , startedWatches , stoppedWatches )
260
+ klog .Infof ("Remediator started %d new watches and stopped %d watches" , startedWatches , stoppedWatches )
248
261
} else {
249
- klog .V (4 ).Infof ("The remediator made no new progress " )
262
+ klog .V (4 ).Infof ("Remediator watches unchanged " )
250
263
}
251
264
return errs
252
265
}
@@ -284,10 +297,12 @@ func (m *Manager) startWatcher(ctx context.Context, gvk schema.GroupVersionKind,
284
297
// threadsafe.
285
298
func (m * Manager ) runWatcher (ctx context.Context , r Runnable , gvk schema.GroupVersionKind ) {
286
299
if err := r .Run (ctx ); err != nil {
287
- if errors .Is (err , context .Canceled ) {
288
- klog .Infof ("Watcher stopped for %s: %v" , gvk , status .FormatSingleLine (err ))
300
+ // TODO: Make status.Error work with errors.Is unwrapping.
301
+ // For now, check the Cause directly, to avoid logging a warning on shutdown.
302
+ if errors .Is (err .Cause (), context .Canceled ) {
303
+ klog .Infof ("Watcher stopped for %s: %v" , gvk , context .Canceled )
289
304
} else {
290
- klog .Warningf ("Error running watcher for %s: %v" , gvk , status .FormatSingleLine (err ))
305
+ klog .Warningf ("Watcher errored for %s: %v" , gvk , status .FormatSingleLine (err ))
291
306
}
292
307
m .mux .Lock ()
293
308
delete (m .watcherMap , gvk )
0 commit comments