@@ -74,8 +74,6 @@ type GarbageCollector struct {
74
74
75
75
kubeClient clientset.Interface
76
76
eventBroadcaster record.EventBroadcaster
77
-
78
- workerLock sync.RWMutex
79
77
}
80
78
81
79
var _ controller.Interface = (* GarbageCollector )(nil )
@@ -131,7 +129,7 @@ func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResource
131
129
}
132
130
133
131
// Run starts garbage collector workers.
134
- func (gc * GarbageCollector ) Run (ctx context.Context , workers int ) {
132
+ func (gc * GarbageCollector ) Run (ctx context.Context , workers int , initialSyncTimeout time. Duration ) {
135
133
defer utilruntime .HandleCrash ()
136
134
defer gc .attemptToDelete .ShutDown ()
137
135
defer gc .attemptToOrphan .ShutDown ()
@@ -148,13 +146,15 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
148
146
149
147
go gc .dependencyGraphBuilder .Run (ctx )
150
148
151
- if ! cache .WaitForNamedCacheSync ("garbage collector" , ctx .Done (), func () bool {
149
+ if ! cache .WaitForNamedCacheSync ("garbage collector" , waitForStopOrTimeout ( ctx .Done (), initialSyncTimeout ), func () bool {
152
150
return gc .dependencyGraphBuilder .IsSynced (logger )
153
151
}) {
154
- return
152
+ logger .Info ("Garbage collector: not all resource monitors could be synced, proceeding anyways" )
153
+ } else {
154
+ logger .Info ("Garbage collector: all resource monitors have synced" )
155
155
}
156
156
157
- logger .Info ("All resource monitors have synced. Proceeding to collect garbage" )
157
+ logger .Info ("Proceeding to collect garbage" )
158
158
159
159
// gc workers
160
160
for i := 0 ; i < workers ; i ++ {
@@ -166,8 +166,8 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
166
166
}
167
167
168
168
// Sync periodically resyncs the garbage collector when new resources are
169
- // observed from discovery. When new resources are detected, Sync will stop all
170
- // GC workers, reset gc.restMapper, and resync the monitors.
169
+ // observed from discovery. When new resources are detected, it will reset
170
+ // gc.restMapper, and resync the monitors.
171
171
//
172
172
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
173
173
// the mapper's underlying discovery client will be unnecessarily reset during
@@ -200,85 +200,48 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
200
200
return
201
201
}
202
202
203
- // Ensure workers are paused to avoid processing events before informers
204
- // have resynced.
205
- gc .workerLock .Lock ()
206
- defer gc .workerLock .Unlock ()
207
-
208
- // Once we get here, we should not unpause workers until we've successfully synced
209
- attempt := 0
210
- wait .PollImmediateUntilWithContext (ctx , 100 * time .Millisecond , func (ctx context.Context ) (bool , error ) {
211
- attempt ++
212
-
213
- // On a reattempt, check if available resources have changed
214
- if attempt > 1 {
215
- newResources , err = GetDeletableResources (logger , discoveryClient )
216
-
217
- if len (newResources ) == 0 {
218
- logger .V (2 ).Info ("no resources reported by discovery" , "attempt" , attempt )
219
- metrics .GarbageCollectorResourcesSyncError .Inc ()
220
- return false , nil
221
- }
222
- if groupLookupFailures , isLookupFailure := discovery .GroupDiscoveryFailedErrorGroups (err ); isLookupFailure {
223
- // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
224
- for k , v := range oldResources {
225
- if _ , failed := groupLookupFailures [k .GroupVersion ()]; failed && gc .dependencyGraphBuilder .IsResourceSynced (k ) {
226
- newResources [k ] = v
227
- }
228
- }
229
- }
230
- }
231
-
232
- logger .V (2 ).Info (
233
- "syncing garbage collector with updated resources from discovery" ,
234
- "attempt" , attempt ,
235
- "diff" , printDiff (oldResources , newResources ),
236
- )
203
+ logger .V (2 ).Info (
204
+ "syncing garbage collector with updated resources from discovery" ,
205
+ "diff" , printDiff (oldResources , newResources ),
206
+ )
237
207
238
- // Resetting the REST mapper will also invalidate the underlying discovery
239
- // client. This is a leaky abstraction and assumes behavior about the REST
240
- // mapper, but we'll deal with it for now.
241
- gc .restMapper .Reset ()
242
- logger .V (4 ).Info ("reset restmapper" )
243
-
244
- // Perform the monitor resync and wait for controllers to report cache sync.
245
- //
246
- // NOTE: It's possible that newResources will diverge from the resources
247
- // discovered by restMapper during the call to Reset, since they are
248
- // distinct discovery clients invalidated at different times. For example,
249
- // newResources may contain resources not returned in the restMapper's
250
- // discovery call if the resources appeared in-between the calls. In that
251
- // case, the restMapper will fail to map some of newResources until the next
252
- // attempt.
253
- if err := gc .resyncMonitors (logger , newResources ); err != nil {
254
- utilruntime .HandleError (fmt .Errorf ("failed to sync resource monitors (attempt %d): %v" , attempt , err ))
255
- metrics .GarbageCollectorResourcesSyncError .Inc ()
256
- return false , nil
257
- }
258
- logger .V (4 ).Info ("resynced monitors" )
259
-
260
- // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
261
- // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
262
- // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
263
- // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
264
- // note that workers stay paused until we successfully resync.
265
- if ! cache .WaitForNamedCacheSync ("garbage collector" , waitForStopOrTimeout (ctx .Done (), period ), func () bool {
266
- return gc .dependencyGraphBuilder .IsSynced (logger )
267
- }) {
268
- utilruntime .HandleError (fmt .Errorf ("timed out waiting for dependency graph builder sync during GC sync (attempt %d)" , attempt ))
269
- metrics .GarbageCollectorResourcesSyncError .Inc ()
270
- return false , nil
271
- }
208
+ // Resetting the REST mapper will also invalidate the underlying discovery
209
+ // client. This is a leaky abstraction and assumes behavior about the REST
210
+ // mapper, but we'll deal with it for now.
211
+ gc .restMapper .Reset ()
212
+ logger .V (4 ).Info ("reset restmapper" )
213
+
214
+ // Perform the monitor resync and wait for controllers to report cache sync.
215
+ //
216
+ // NOTE: It's possible that newResources will diverge from the resources
217
+ // discovered by restMapper during the call to Reset, since they are
218
+ // distinct discovery clients invalidated at different times. For example,
219
+ // newResources may contain resources not returned in the restMapper's
220
+ // discovery call if the resources appeared in-between the calls. In that
221
+ // case, the restMapper will fail to map some of newResources until the next
222
+ // attempt.
223
+ if err := gc .resyncMonitors (logger , newResources ); err != nil {
224
+ utilruntime .HandleError (fmt .Errorf ("failed to sync resource monitors: %w" , err ))
225
+ metrics .GarbageCollectorResourcesSyncError .Inc ()
226
+ return
227
+ }
228
+ logger .V (4 ).Info ("resynced monitors" )
272
229
273
- // success, break out of the loop
274
- return true , nil
230
+ // gc worker no longer waits for cache to be synced, but we will keep the periodical check to provide logs & metrics
231
+ cacheSynced := cache .WaitForNamedCacheSync ("garbage collector" , waitForStopOrTimeout (ctx .Done (), period ), func () bool {
232
+ return gc .dependencyGraphBuilder .IsSynced (logger )
275
233
})
234
+ if cacheSynced {
235
+ logger .V (2 ).Info ("synced garbage collector" )
236
+ } else {
237
+ utilruntime .HandleError (fmt .Errorf ("timed out waiting for dependency graph builder sync during GC sync" ))
238
+ metrics .GarbageCollectorResourcesSyncError .Inc ()
239
+ }
276
240
277
- // Finally, keep track of our new state. Do this after all preceding steps
278
- // have succeeded to ensure we'll retry on subsequent syncs if an error
279
- // occurred .
241
+ // Finally, keep track of our new resource monitor state.
242
+ // Monitors where the cache sync times out are still tracked here as
243
+ // subsequent runs should stop them if their resources were removed .
280
244
oldResources = newResources
281
- logger .V (2 ).Info ("synced garbage collector" )
282
245
}, period )
283
246
}
284
247
@@ -328,8 +291,6 @@ var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objec
328
291
329
292
func (gc * GarbageCollector ) processAttemptToDeleteWorker (ctx context.Context ) bool {
330
293
item , quit := gc .attemptToDelete .Get ()
331
- gc .workerLock .RLock ()
332
- defer gc .workerLock .RUnlock ()
333
294
if quit {
334
295
return false
335
296
}
@@ -754,8 +715,6 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
754
715
// these steps fail.
755
716
func (gc * GarbageCollector ) processAttemptToOrphanWorker (logger klog.Logger ) bool {
756
717
item , quit := gc .attemptToOrphan .Get ()
757
- gc .workerLock .RLock ()
758
- defer gc .workerLock .RUnlock ()
759
718
if quit {
760
719
return false
761
720
}
0 commit comments