Skip to content

Commit 4eda9c5

Browse files
committed
Fix flaky pod Watcher test in target allocator
The root cause of the flakiness was that the fake client we're using doesn't support resource versions. Informers rely on these to avoid race conditions when processing the initial object list. The flaky tests were caused by this race condition. I've fixed this by having the tests check whether the informer event handlers have synced before changing objects using the client. I've also had the test pass around a pointer to the watcher rather than the whole struct.
1 parent 15e013f commit 4eda9c5

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

cmd/otel-allocator/internal/collector/collector.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package collector
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"github.com/go-logr/logr"
@@ -27,6 +28,8 @@ type Watcher struct {
2728
log logr.Logger
2829
k8sClient kubernetes.Interface
2930
close chan struct{}
31+
eventHandlerReg cache.ResourceEventHandlerRegistration
32+
mutex sync.Mutex
3033
minUpdateInterval time.Duration
3134
collectorNotReadyGracePeriod time.Duration
3235
collectorsDiscovered metric.Int64Gauge
@@ -63,7 +66,7 @@ func (k *Watcher) Watch(
6366
}
6467
informerFactory := informers.NewSharedInformerFactoryWithOptions(
6568
k.k8sClient,
66-
2*k.minUpdateInterval,
69+
30*time.Second,
6770
informers.WithNamespace(collectorNamespace),
6871
informers.WithTweakListOptions(listOptionsFunc))
6972
informer := informerFactory.Core().V1().Pods().Informer()
@@ -77,13 +80,15 @@ func (k *Watcher) Watch(
7780
default:
7881
}
7982
}
80-
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
83+
k.mutex.Lock()
84+
k.eventHandlerReg, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
8185
AddFunc: notifyFunc,
8286
UpdateFunc: func(oldObj, newObj interface{}) {
8387
notifyFunc(newObj)
8488
},
8589
DeleteFunc: notifyFunc,
8690
})
91+
k.mutex.Unlock()
8792
if err != nil {
8893
return err
8994
}
@@ -92,6 +97,15 @@ func (k *Watcher) Watch(
9297
return nil
9398
}
9499

100+
func (k *Watcher) isSynced() bool {
101+
k.mutex.Lock()
102+
defer k.mutex.Unlock()
103+
if k.eventHandlerReg == nil {
104+
return false
105+
}
106+
return k.eventHandlerReg.HasSynced()
107+
}
108+
95109
// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel,
96110
// but not more frequently than once per k.eventPeriod.
97111
func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) {

cmd/otel-allocator/internal/collector/collector_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (r *reportingGauge) Record(_ context.Context, value int64, _ ...metric.Reco
4040
r.value.Store(value)
4141
}
4242

43-
func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher {
43+
func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) *Watcher {
4444
podWatcher := Watcher{
4545
k8sClient: fake.NewClientset(),
4646
close: make(chan struct{}),
@@ -49,7 +49,7 @@ func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher {
4949
collectorNotReadyGracePeriod: collectorNotReadyGracePeriod,
5050
collectorsDiscovered: &reportingGauge{},
5151
}
52-
return podWatcher
52+
return &podWatcher
5353
}
5454

5555
func pod(name string) *v1.Pod {
@@ -105,7 +105,7 @@ func Test_runWatch(t *testing.T) {
105105
namespace := "test-ns"
106106
type args struct {
107107
collectorNotReadyGracePeriod time.Duration
108-
kubeFn func(t *testing.T, podWatcher Watcher)
108+
kubeFn func(t *testing.T, podWatcher *Watcher)
109109
collectorMap map[string]*allocation.Collector
110110
}
111111
tests := []struct {
@@ -117,7 +117,7 @@ func Test_runWatch(t *testing.T) {
117117
name: "pod add",
118118
args: args{
119119
collectorNotReadyGracePeriod: 0 * time.Second,
120-
kubeFn: func(t *testing.T, podWatcher Watcher) {
120+
kubeFn: func(t *testing.T, podWatcher *Watcher) {
121121
for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} {
122122
p := pod(k)
123123
_, err := podWatcher.k8sClient.CoreV1().Pods(namespace).Create(context.Background(), p, metav1.CreateOptions{})
@@ -145,7 +145,7 @@ func Test_runWatch(t *testing.T) {
145145
name: "pod delete",
146146
args: args{
147147
collectorNotReadyGracePeriod: 0 * time.Second,
148-
kubeFn: func(t *testing.T, podWatcher Watcher) {
148+
kubeFn: func(t *testing.T, podWatcher *Watcher) {
149149
for _, k := range []string{"test-pod2", "test-pod3"} {
150150
err := podWatcher.k8sClient.CoreV1().Pods(namespace).Delete(context.Background(), k, metav1.DeleteOptions{})
151151
assert.NoError(t, err)
@@ -187,14 +187,15 @@ func Test_runWatch(t *testing.T) {
187187
_, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{})
188188
assert.NoError(t, err)
189189
}
190-
go func(podWatcher Watcher) {
190+
go func() {
191191
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
192192
mapMutex.Lock()
193193
defer mapMutex.Unlock()
194194
actual = colMap
195195
})
196196
require.NoError(t, err)
197-
}(podWatcher)
197+
}()
198+
assert.Eventually(t, podWatcher.isSynced, time.Second*30, time.Millisecond*100)
198199

199200
tt.args.kubeFn(t, podWatcher)
200201

@@ -311,14 +312,14 @@ func Test_gracePeriodWithNonRunningPodPhase(t *testing.T) {
311312
assert.NoError(t, err)
312313
}
313314

314-
go func(podWatcher Watcher) {
315+
go func() {
315316
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
316317
mapMutex.Lock()
317318
defer mapMutex.Unlock()
318319
actual = colMap
319320
})
320321
require.NoError(t, err)
321-
}(podWatcher)
322+
}()
322323

323324
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
324325
mapMutex.Lock()
@@ -434,14 +435,14 @@ func Test_gracePeriodWithNonReadyPodCondition(t *testing.T) {
434435
assert.NoError(t, err)
435436
}
436437

437-
go func(podWatcher Watcher) {
438+
go func() {
438439
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
439440
mapMutex.Lock()
440441
defer mapMutex.Unlock()
441442
actual = colMap
442443
})
443444
require.NoError(t, err)
444-
}(podWatcher)
445+
}()
445446

446447
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
447448
mapMutex.Lock()
@@ -461,11 +462,11 @@ func Test_closeChannel(t *testing.T) {
461462
var wg sync.WaitGroup
462463
wg.Add(1)
463464

464-
go func(podWatcher Watcher) {
465+
go func() {
465466
defer wg.Done()
466467
err := podWatcher.Watch("default", &labelSelector, func(colMap map[string]*allocation.Collector) {})
467468
require.NoError(t, err)
468-
}(podWatcher)
469+
}()
469470

470471
podWatcher.Close()
471472
wg.Wait()

0 commit comments

Comments
 (0)