Skip to content

Commit d6d6aad

Browse files
authored
Merge pull request kubernetes#131266 from karlkfi/karl-reflector-refactor
fix: Watcher deadlock from Stop not being called
2 parents d895ad2 + 3e609ec commit d6d6aad

File tree

2 files changed

+271
-109
lines changed

2 files changed

+271
-109
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,12 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
406406
useWatchList := ptr.Deref(r.UseWatchList, false)
407407
fallbackToList := !useWatchList
408408

409+
defer func() {
410+
if w != nil {
411+
w.Stop()
412+
}
413+
}()
414+
409415
if useWatchList {
410416
w, err = r.watchList(ctx)
411417
if w == nil && err == nil {
@@ -476,22 +482,28 @@ func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) erro
476482
return r.watch(ctx, w, resyncerrc)
477483
}
478484

479-
// watch simply starts a watch request with the server.
485+
// watch starts a watch request with the server, consumes watch events, and
486+
// restarts the watch until an exit scenario is reached.
487+
//
488+
// If a watch is provided, it will be used, otherwise another will be started.
489+
// If the watcher has started, it will always be stopped before returning.
480490
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
481491
stopCh := ctx.Done()
482492
logger := klog.FromContext(ctx)
483493
var err error
484494
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
495+
defer func() {
496+
if w != nil {
497+
w.Stop()
498+
}
499+
}()
485500

486501
for {
487502
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
488503
select {
489504
case <-stopCh:
490505
// we can only end up here when the stopCh
491506
// was closed after a successful watchlist or list request
492-
if w != nil {
493-
w.Stop()
494-
}
495507
return nil
496508
default:
497509
}
@@ -529,8 +541,8 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
529541

530542
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
531543
r.clock, resyncerrc)
532-
// Ensure that watch will not be reused across iterations.
533-
w.Stop()
544+
// handleWatch always stops the watcher. So we don't need to here.
545+
// Just set it to nil to trigger a retry on the next loop.
534546
w = nil
535547
retry.After(err)
536548
if err != nil {
@@ -863,6 +875,12 @@ func handleAnyWatch(
863875
logger := klog.FromContext(ctx)
864876
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
865877
defer initialEventsEndBookmarkWarningTicker.Stop()
878+
stopWatcher := true
879+
defer func() {
880+
if stopWatcher {
881+
w.Stop()
882+
}
883+
}()
866884

867885
loop:
868886
for {
@@ -929,6 +947,7 @@ loop:
929947
}
930948
eventCount++
931949
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
950+
stopWatcher = false
932951
watchDuration := clock.Since(start)
933952
klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
934953
return watchListBookmarkReceived, nil

0 commit comments

Comments
 (0)