Skip to content

Commit e688a06

Browse files
authored
Merge pull request kubernetes#94235 from kevindelgado/draft/connection-refused-backoff
Add exponential backoff for connection refused errors
2 parents b2cba08 + 1ff789f commit e688a06

File tree

2 files changed

+88
-7
lines changed

2 files changed

+88
-7
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type Reflector struct {
6969

7070
// backoff manages backoff of ListWatch
7171
backoffManager wait.BackoffManager
72+
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
73+
initConnBackoffManager wait.BackoffManager
7274

7375
resyncPeriod time.Duration
7476
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
@@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
166168
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
167169
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
168170
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
169-
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
170-
resyncPeriod: resyncPeriod,
171-
clock: realClock,
172-
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
171+
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
172+
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
173+
resyncPeriod: resyncPeriod,
174+
clock: realClock,
175+
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
173176
}
174177
r.setExpectedType(expectedType)
175178
return r
@@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
404407
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
405408
// It doesn't make sense to re-list all objects because most likely we will be able to restart
406409
// watch where we ended.
407-
// If that's the case wait and resend watch request.
410+
// If that's the case begin exponentially backing off and resend watch request.
408411
if utilnet.IsConnectionRefused(err) {
409-
time.Sleep(time.Second)
412+
<-r.initConnBackoffManager.Backoff().C()
410413
continue
411414
}
412415
return err

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ import (
2222
"math/rand"
2323
"reflect"
2424
"strconv"
25+
"syscall"
2526
"testing"
2627
"time"
2728

28-
"k8s.io/api/core/v1"
29+
v1 "k8s.io/api/core/v1"
2930
apierrors "k8s.io/apimachinery/pkg/api/errors"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3233
"k8s.io/apimachinery/pkg/runtime"
3334
"k8s.io/apimachinery/pkg/runtime/schema"
35+
"k8s.io/apimachinery/pkg/util/clock"
3436
"k8s.io/apimachinery/pkg/util/wait"
3537
"k8s.io/apimachinery/pkg/watch"
3638
)
@@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
358360
}
359361
}
360362

363+
func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
364+
maxBackoff := 50 * time.Millisecond
365+
table := []struct {
366+
numConnFails int
367+
expLowerBound time.Duration
368+
expUpperBound time.Duration
369+
}{
370+
{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
371+
{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten
372+
373+
}
374+
for _, test := range table {
375+
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
376+
func(t *testing.T) {
377+
stopCh := make(chan struct{})
378+
connFails := test.numConnFails
379+
fakeClock := clock.NewFakeClock(time.Unix(0, 0))
380+
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
381+
done := make(chan struct{})
382+
defer close(done)
383+
go func() {
384+
i := 0
385+
for {
386+
select {
387+
case <-done:
388+
return
389+
default:
390+
}
391+
if fakeClock.HasWaiters() {
392+
step := (1 << (i + 1)) * time.Millisecond
393+
if step > maxBackoff*2 {
394+
step = maxBackoff * 2
395+
}
396+
fakeClock.Step(step)
397+
i++
398+
}
399+
time.Sleep(100 * time.Microsecond)
400+
}
401+
}()
402+
lw := &testLW{
403+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
404+
if connFails > 0 {
405+
connFails--
406+
return nil, syscall.ECONNREFUSED
407+
}
408+
close(stopCh)
409+
return watch.NewFake(), nil
410+
},
411+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
412+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
413+
},
414+
}
415+
r := &Reflector{
416+
name: "test-reflector",
417+
listerWatcher: lw,
418+
store: NewFIFO(MetaNamespaceKeyFunc),
419+
initConnBackoffManager: bm,
420+
clock: fakeClock,
421+
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
422+
}
423+
start := fakeClock.Now()
424+
err := r.ListAndWatch(stopCh)
425+
elapsed := fakeClock.Since(start)
426+
if err != nil {
427+
t.Errorf("unexpected error %v", err)
428+
}
429+
if elapsed < (test.expLowerBound) {
430+
t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
431+
}
432+
if elapsed > (test.expUpperBound) {
433+
t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
434+
}
435+
})
436+
}
437+
}
438+
361439
func TestReflectorResync(t *testing.T) {
362440
iteration := 0
363441
stopCh := make(chan struct{})

0 commit comments

Comments
 (0)