Skip to content

Commit 7c6473b

Browse files
authored
Merge pull request kubernetes#87329 from windmilleng/nicks/informer-error-handling
cache: add error handling to informers
2 parents 7233908 + 435b40a commit 7c6473b

File tree

5 files changed

+114
-23
lines changed

5 files changed

+114
-23
lines changed

staging/src/k8s.io/client-go/tools/cache/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type Config struct {
6969
// question to this interface as a parameter. This is probably moot
7070
// now that this functionality appears at a higher level.
7171
RetryOnError bool
72+
73+
// Called whenever the ListAndWatch drops the connection with an error.
74+
WatchErrorHandler WatchErrorHandler
7275
}
7376

7477
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
@@ -132,6 +135,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
132135
)
133136
r.ShouldResync = c.config.ShouldResync
134137
r.clock = c.clock
138+
if c.config.WatchErrorHandler != nil {
139+
r.watchErrorHandler = c.config.WatchErrorHandler
140+
}
135141

136142
c.reflectorMutex.Lock()
137143
c.reflector = r

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,37 @@ type Reflector struct {
9595
// etcd, which is significantly less efficient and may lead to serious performance and
9696
// scalability problems.
9797
WatchListPageSize int64
98+
// Called whenever the ListAndWatch drops the connection with an error.
99+
watchErrorHandler WatchErrorHandler
100+
}
101+
102+
// The WatchErrorHandler is called whenever ListAndWatch drops the
103+
// connection with an error. After calling this handler, the informer
104+
// will backoff and retry.
105+
//
106+
// The default implementation looks at the error type and tries to log
107+
// the error message at an appropriate level.
108+
//
109+
// Implementations of this handler may display the error message in other
110+
// ways. Implementations should return quickly - any expensive processing
111+
// should be offloaded.
112+
type WatchErrorHandler func(r *Reflector, err error)
113+
114+
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
115+
func DefaultWatchErrorHandler(r *Reflector, err error) {
116+
switch {
117+
case isExpiredError(err):
118+
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
119+
// has a semantic that it returns data at least as fresh as provided RV.
120+
// So first try to LIST with setting RV to resource version of last observed object.
121+
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
122+
case err == io.EOF:
123+
// watch closed normally
124+
case err == io.ErrUnexpectedEOF:
125+
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
126+
default:
127+
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
128+
}
98129
}
99130

100131
var (
@@ -135,9 +166,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
135166
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
136167
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
137168
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
138-
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
139-
resyncPeriod: resyncPeriod,
140-
clock: realClock,
169+
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
170+
resyncPeriod: resyncPeriod,
171+
clock: realClock,
172+
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
141173
}
142174
r.setExpectedType(expectedType)
143175
return r
@@ -175,7 +207,7 @@ func (r *Reflector) Run(stopCh <-chan struct{}) {
175207
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
176208
wait.BackoffUntil(func() {
177209
if err := r.ListAndWatch(stopCh); err != nil {
178-
utilruntime.HandleError(err)
210+
r.watchErrorHandler(r, err)
179211
}
180212
}, r.backoffManager, true, stopCh)
181213
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
@@ -275,7 +307,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
275307
case <-listCh:
276308
}
277309
if err != nil {
278-
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
310+
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
279311
}
280312

281313
// We check if the list was paginated and if so set the paginatedResult based on that.
@@ -296,17 +328,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
296328
initTrace.Step("Objects listed")
297329
listMetaInterface, err := meta.ListAccessor(list)
298330
if err != nil {
299-
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
331+
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
300332
}
301333
resourceVersion = listMetaInterface.GetResourceVersion()
302334
initTrace.Step("Resource version extracted")
303335
items, err := meta.ExtractList(list)
304336
if err != nil {
305-
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
337+
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
306338
}
307339
initTrace.Step("Objects extracted")
308340
if err := r.syncWith(items, resourceVersion); err != nil {
309-
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
341+
return fmt.Errorf("unable to sync list result: %v", err)
310342
}
311343
initTrace.Step("SyncWith done")
312344
r.setLastSyncResourceVersion(resourceVersion)
@@ -366,19 +398,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
366398

367399
w, err := r.listerWatcher.Watch(options)
368400
if err != nil {
369-
switch {
370-
case isExpiredError(err):
371-
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
372-
// has a semantic that it returns data at least as fresh as provided RV.
373-
// So first try to LIST with setting RV to resource version of last observed object.
374-
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
375-
case err == io.EOF:
376-
// watch closed normally
377-
case err == io.ErrUnexpectedEOF:
378-
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
379-
default:
380-
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
381-
}
382401
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
383402
// It doesn't make sense to re-list all objects because most likely we will be able to restart
384403
// watch where we ended.
@@ -387,7 +406,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
387406
time.Sleep(time.Second)
388407
continue
389408
}
390-
return nil
409+
return err
391410
}
392411

393412
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {

staging/src/k8s.io/client-go/tools/cache/shared_informer.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ type SharedInformer interface {
165165
// store. The value returned is not synchronized with access to the underlying store and is not
166166
// thread-safe.
167167
LastSyncResourceVersion() string
168+
169+
// The WatchErrorHandler is called whenever ListAndWatch drops the
170+
// connection with an error. After calling this handler, the informer
171+
// will backoff and retry.
172+
//
173+
// The default implementation looks at the error type and tries to log
174+
// the error message at an appropriate level.
175+
//
176+
// There's only one handler, so if you call this multiple times, last one
177+
// wins; calling after the informer has been started returns an error.
178+
//
179+
// The handler is intended for visibility, not to e.g. pause the consumers.
180+
// The handler should return quickly - any expensive processing should be
181+
// offloaded.
182+
SetWatchErrorHandler(handler WatchErrorHandler) error
168183
}
169184

170185
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@@ -300,6 +315,9 @@ type sharedIndexInformer struct {
300315
// blockDeltas gives a way to stop all event distribution so that a late event handler
301316
// can safely join the shared informer.
302317
blockDeltas sync.Mutex
318+
319+
// Called whenever the ListAndWatch drops the connection with an error.
320+
watchErrorHandler WatchErrorHandler
303321
}
304322

305323
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -335,6 +353,18 @@ type deleteNotification struct {
335353
oldObj interface{}
336354
}
337355

356+
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
357+
s.startedLock.Lock()
358+
defer s.startedLock.Unlock()
359+
360+
if s.started {
361+
return fmt.Errorf("informer has already started")
362+
}
363+
364+
s.watchErrorHandler = handler
365+
return nil
366+
}
367+
338368
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
339369
defer utilruntime.HandleCrash()
340370

@@ -351,7 +381,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
351381
RetryOnError: false,
352382
ShouldResync: s.processor.shouldResync,
353383

354-
Process: s.HandleDeltas,
384+
Process: s.HandleDeltas,
385+
WatchErrorHandler: s.watchErrorHandler,
355386
}
356387

357388
func() {

staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache
1818

1919
import (
2020
"fmt"
21+
"strings"
2122
"sync"
2223
"testing"
2324
"time"
@@ -330,3 +331,29 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
330331
}
331332
}
332333
}
334+
335+
func TestSharedInformerErrorHandling(t *testing.T) {
336+
source := fcache.NewFakeControllerSource()
337+
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
338+
source.ListError = fmt.Errorf("Access Denied")
339+
340+
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
341+
342+
errCh := make(chan error)
343+
_ = informer.SetWatchErrorHandler(func(_ *Reflector, err error) {
344+
errCh <- err
345+
})
346+
347+
stop := make(chan struct{})
348+
go informer.Run(stop)
349+
350+
select {
351+
case err := <-errCh:
352+
if !strings.Contains(err.Error(), "Access Denied") {
353+
t.Errorf("Expected 'Access Denied' error. Actual: %v", err)
354+
}
355+
case <-time.After(time.Second):
356+
t.Errorf("Timeout waiting for error handler call")
357+
}
358+
close(stop)
359+
}

staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type FakeControllerSource struct {
6262
changes []watch.Event // one change per resourceVersion
6363
Broadcaster *watch.Broadcaster
6464
lastRV int
65+
66+
// Set this to simulate an error on List()
67+
ListError error
6568
}
6669

6770
type FakePVControllerSource struct {
@@ -174,6 +177,11 @@ func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
174177
func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
175178
f.lock.RLock()
176179
defer f.lock.RUnlock()
180+
181+
if f.ListError != nil {
182+
return nil, f.ListError
183+
}
184+
177185
list, err := f.getListItemsLocked()
178186
if err != nil {
179187
return nil, err

0 commit comments

Comments
 (0)