Skip to content

Commit 1ff789f

Browse files
committed
Add exp backoff for connection refused errors
Currently when ListAndWatch() receives a connection refused error, it is assumed to be due to the apiserver being transiently unresponsive. In situations where a controller is running outside the k8s cluster it's controlling, it is more common for the controller to lose connection permanently to the apiserver and needs to exponentially backoff its retry rather than continously spamming logs with Watch attempts that will never succeed.
1 parent ab3ed8c commit 1ff789f

File tree

2 files changed

+88
-7
lines changed

2 files changed

+88
-7
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type Reflector struct {
6969

7070
// backoff manages backoff of ListWatch
7171
backoffManager wait.BackoffManager
72+
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
73+
initConnBackoffManager wait.BackoffManager
7274

7375
resyncPeriod time.Duration
7476
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
@@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
166168
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
167169
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
168170
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
169-
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
170-
resyncPeriod: resyncPeriod,
171-
clock: realClock,
172-
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
171+
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
172+
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
173+
resyncPeriod: resyncPeriod,
174+
clock: realClock,
175+
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
173176
}
174177
r.setExpectedType(expectedType)
175178
return r
@@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
404407
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
405408
// It doesn't make sense to re-list all objects because most likely we will be able to restart
406409
// watch where we ended.
407-
// If that's the case wait and resend watch request.
410+
// If that's the case begin exponentially backing off and resend watch request.
408411
if utilnet.IsConnectionRefused(err) {
409-
time.Sleep(time.Second)
412+
<-r.initConnBackoffManager.Backoff().C()
410413
continue
411414
}
412415
return err

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ import (
2222
"math/rand"
2323
"reflect"
2424
"strconv"
25+
"syscall"
2526
"testing"
2627
"time"
2728

28-
"k8s.io/api/core/v1"
29+
v1 "k8s.io/api/core/v1"
2930
apierrors "k8s.io/apimachinery/pkg/api/errors"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3233
"k8s.io/apimachinery/pkg/runtime"
3334
"k8s.io/apimachinery/pkg/runtime/schema"
35+
"k8s.io/apimachinery/pkg/util/clock"
3436
"k8s.io/apimachinery/pkg/util/wait"
3537
"k8s.io/apimachinery/pkg/watch"
3638
)
@@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
358360
}
359361
}
360362

363+
func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
364+
maxBackoff := 50 * time.Millisecond
365+
table := []struct {
366+
numConnFails int
367+
expLowerBound time.Duration
368+
expUpperBound time.Duration
369+
}{
370+
{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
371+
{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten
372+
373+
}
374+
for _, test := range table {
375+
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
376+
func(t *testing.T) {
377+
stopCh := make(chan struct{})
378+
connFails := test.numConnFails
379+
fakeClock := clock.NewFakeClock(time.Unix(0, 0))
380+
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
381+
done := make(chan struct{})
382+
defer close(done)
383+
go func() {
384+
i := 0
385+
for {
386+
select {
387+
case <-done:
388+
return
389+
default:
390+
}
391+
if fakeClock.HasWaiters() {
392+
step := (1 << (i + 1)) * time.Millisecond
393+
if step > maxBackoff*2 {
394+
step = maxBackoff * 2
395+
}
396+
fakeClock.Step(step)
397+
i++
398+
}
399+
time.Sleep(100 * time.Microsecond)
400+
}
401+
}()
402+
lw := &testLW{
403+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
404+
if connFails > 0 {
405+
connFails--
406+
return nil, syscall.ECONNREFUSED
407+
}
408+
close(stopCh)
409+
return watch.NewFake(), nil
410+
},
411+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
412+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
413+
},
414+
}
415+
r := &Reflector{
416+
name: "test-reflector",
417+
listerWatcher: lw,
418+
store: NewFIFO(MetaNamespaceKeyFunc),
419+
initConnBackoffManager: bm,
420+
clock: fakeClock,
421+
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
422+
}
423+
start := fakeClock.Now()
424+
err := r.ListAndWatch(stopCh)
425+
elapsed := fakeClock.Since(start)
426+
if err != nil {
427+
t.Errorf("unexpected error %v", err)
428+
}
429+
if elapsed < (test.expLowerBound) {
430+
t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
431+
}
432+
if elapsed > (test.expUpperBound) {
433+
t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
434+
}
435+
})
436+
}
437+
}
438+
361439
func TestReflectorResync(t *testing.T) {
362440
iteration := 0
363441
stopCh := make(chan struct{})

0 commit comments

Comments
 (0)