diff --git a/cmd/otel-allocator/internal/collector/collector.go b/cmd/otel-allocator/internal/collector/collector.go index 79453477d0..4fa3f97c01 100644 --- a/cmd/otel-allocator/internal/collector/collector.go +++ b/cmd/otel-allocator/internal/collector/collector.go @@ -5,6 +5,7 @@ package collector import ( "context" + "sync" "time" "github.com/go-logr/logr" @@ -27,6 +28,8 @@ type Watcher struct { log logr.Logger k8sClient kubernetes.Interface close chan struct{} + eventHandlerReg cache.ResourceEventHandlerRegistration + mutex sync.Mutex minUpdateInterval time.Duration collectorNotReadyGracePeriod time.Duration collectorsDiscovered metric.Int64Gauge @@ -63,7 +66,7 @@ func (k *Watcher) Watch( } informerFactory := informers.NewSharedInformerFactoryWithOptions( k.k8sClient, - 2*k.minUpdateInterval, + 30*time.Second, informers.WithNamespace(collectorNamespace), informers.WithTweakListOptions(listOptionsFunc)) informer := informerFactory.Core().V1().Pods().Informer() @@ -77,13 +80,15 @@ func (k *Watcher) Watch( default: } } - _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + k.mutex.Lock() + k.eventHandlerReg, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: notifyFunc, UpdateFunc: func(oldObj, newObj interface{}) { notifyFunc(newObj) }, DeleteFunc: notifyFunc, }) + k.mutex.Unlock() if err != nil { return err } @@ -92,6 +97,15 @@ func (k *Watcher) Watch( return nil } +func (k *Watcher) isSynced() bool { + k.mutex.Lock() + defer k.mutex.Unlock() + if k.eventHandlerReg == nil { + return false + } + return k.eventHandlerReg.HasSynced() +} + // rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, // but not more frequently than once per k.eventPeriod. func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { diff --git a/cmd/otel-allocator/internal/collector/collector_test.go b/cmd/otel-allocator/internal/collector/collector_test.go index df4e8cdb9b..27dd68b357 100644 --- a/cmd/otel-allocator/internal/collector/collector_test.go +++ b/cmd/otel-allocator/internal/collector/collector_test.go @@ -40,7 +40,7 @@ func (r *reportingGauge) Record(_ context.Context, value int64, _ ...metric.Reco r.value.Store(value) } -func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher { +func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) *Watcher { podWatcher := Watcher{ k8sClient: fake.NewClientset(), close: make(chan struct{}), @@ -49,7 +49,7 @@ func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher { collectorNotReadyGracePeriod: collectorNotReadyGracePeriod, collectorsDiscovered: &reportingGauge{}, } - return podWatcher + return &podWatcher } func pod(name string) *v1.Pod { @@ -105,7 +105,7 @@ func Test_runWatch(t *testing.T) { namespace := "test-ns" type args struct { collectorNotReadyGracePeriod time.Duration - kubeFn func(t *testing.T, podWatcher Watcher) + kubeFn func(t *testing.T, podWatcher *Watcher) collectorMap map[string]*allocation.Collector } tests := []struct { @@ -117,7 +117,7 @@ func Test_runWatch(t *testing.T) { name: "pod add", args: args{ collectorNotReadyGracePeriod: 0 * time.Second, - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher *Watcher) { for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { p := pod(k) _, err := podWatcher.k8sClient.CoreV1().Pods(namespace).Create(context.Background(), p, metav1.CreateOptions{}) @@ -145,7 +145,7 @@ func Test_runWatch(t *testing.T) { name: "pod delete", args: args{ collectorNotReadyGracePeriod: 0 * time.Second, - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher *Watcher) { for _, k := range []string{"test-pod2", "test-pod3"} { err := podWatcher.k8sClient.CoreV1().Pods(namespace).Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) @@ -187,14 +187,15 @@ func Test_runWatch(t *testing.T) { _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) assert.NoError(t, err) } - go func(podWatcher Watcher) { + go func() { err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) { mapMutex.Lock() defer mapMutex.Unlock() actual = colMap }) require.NoError(t, err) - }(podWatcher) + }() + assert.Eventually(t, podWatcher.isSynced, time.Second*30, time.Millisecond*100) tt.args.kubeFn(t, podWatcher) @@ -311,14 +312,14 @@ func Test_gracePeriodWithNonRunningPodPhase(t *testing.T) { assert.NoError(t, err) } - go func(podWatcher Watcher) { + go func() { err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) { mapMutex.Lock() defer mapMutex.Unlock() actual = colMap }) require.NoError(t, err) - }(podWatcher) + }() assert.EventuallyWithT(t, func(collect *assert.CollectT) { mapMutex.Lock() @@ -434,14 +435,14 @@ func Test_gracePeriodWithNonReadyPodCondition(t *testing.T) { assert.NoError(t, err) } - go func(podWatcher Watcher) { + go func() { err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) { mapMutex.Lock() defer mapMutex.Unlock() actual = colMap }) require.NoError(t, err) - }(podWatcher) + }() assert.EventuallyWithT(t, func(collect *assert.CollectT) { mapMutex.Lock() @@ -461,11 +462,11 @@ func Test_closeChannel(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - go func(podWatcher Watcher) { + go func() { defer wg.Done() err := podWatcher.Watch("default", &labelSelector, func(colMap map[string]*allocation.Collector) {}) require.NoError(t, err) - }(podWatcher) + }() podWatcher.Close() wg.Wait()