Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cmd/otel-allocator/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package collector

import (
"context"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -27,6 +28,8 @@ type Watcher struct {
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
eventHandlerReg cache.ResourceEventHandlerRegistration
mutex sync.Mutex
minUpdateInterval time.Duration
collectorNotReadyGracePeriod time.Duration
collectorsDiscovered metric.Int64Gauge
Expand Down Expand Up @@ -63,7 +66,7 @@ func (k *Watcher) Watch(
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k.k8sClient,
2*k.minUpdateInterval,
30*time.Second,
informers.WithNamespace(collectorNamespace),
informers.WithTweakListOptions(listOptionsFunc))
informer := informerFactory.Core().V1().Pods().Informer()
Expand All @@ -77,13 +80,15 @@ func (k *Watcher) Watch(
default:
}
}
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
k.mutex.Lock()
k.eventHandlerReg, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: notifyFunc,
UpdateFunc: func(oldObj, newObj interface{}) {
notifyFunc(newObj)
},
DeleteFunc: notifyFunc,
})
k.mutex.Unlock()
if err != nil {
return err
}
Expand All @@ -92,6 +97,15 @@ func (k *Watcher) Watch(
return nil
}

func (k *Watcher) isSynced() bool {
k.mutex.Lock()
defer k.mutex.Unlock()
if k.eventHandlerReg == nil {
return false
}
return k.eventHandlerReg.HasSynced()
}

// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel,
// but not more frequently than once per k.eventPeriod.
func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) {
Expand Down
27 changes: 14 additions & 13 deletions cmd/otel-allocator/internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *reportingGauge) Record(_ context.Context, value int64, _ ...metric.Reco
r.value.Store(value)
}

func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher {
func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) *Watcher {
podWatcher := Watcher{
k8sClient: fake.NewClientset(),
close: make(chan struct{}),
Expand All @@ -49,7 +49,7 @@ func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher {
collectorNotReadyGracePeriod: collectorNotReadyGracePeriod,
collectorsDiscovered: &reportingGauge{},
}
return podWatcher
return &podWatcher
}

func pod(name string) *v1.Pod {
Expand Down Expand Up @@ -105,7 +105,7 @@ func Test_runWatch(t *testing.T) {
namespace := "test-ns"
type args struct {
collectorNotReadyGracePeriod time.Duration
kubeFn func(t *testing.T, podWatcher Watcher)
kubeFn func(t *testing.T, podWatcher *Watcher)
collectorMap map[string]*allocation.Collector
}
tests := []struct {
Expand All @@ -117,7 +117,7 @@ func Test_runWatch(t *testing.T) {
name: "pod add",
args: args{
collectorNotReadyGracePeriod: 0 * time.Second,
kubeFn: func(t *testing.T, podWatcher Watcher) {
kubeFn: func(t *testing.T, podWatcher *Watcher) {
for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} {
p := pod(k)
_, err := podWatcher.k8sClient.CoreV1().Pods(namespace).Create(context.Background(), p, metav1.CreateOptions{})
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_runWatch(t *testing.T) {
name: "pod delete",
args: args{
collectorNotReadyGracePeriod: 0 * time.Second,
kubeFn: func(t *testing.T, podWatcher Watcher) {
kubeFn: func(t *testing.T, podWatcher *Watcher) {
for _, k := range []string{"test-pod2", "test-pod3"} {
err := podWatcher.k8sClient.CoreV1().Pods(namespace).Delete(context.Background(), k, metav1.DeleteOptions{})
assert.NoError(t, err)
Expand Down Expand Up @@ -187,14 +187,15 @@ func Test_runWatch(t *testing.T) {
_, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{})
assert.NoError(t, err)
}
go func(podWatcher Watcher) {
go func() {
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
mapMutex.Lock()
defer mapMutex.Unlock()
actual = colMap
})
require.NoError(t, err)
}(podWatcher)
}()
assert.Eventually(t, podWatcher.isSynced, time.Second*30, time.Millisecond*100)

tt.args.kubeFn(t, podWatcher)

Expand Down Expand Up @@ -311,14 +312,14 @@ func Test_gracePeriodWithNonRunningPodPhase(t *testing.T) {
assert.NoError(t, err)
}

go func(podWatcher Watcher) {
go func() {
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
mapMutex.Lock()
defer mapMutex.Unlock()
actual = colMap
})
require.NoError(t, err)
}(podWatcher)
}()

assert.EventuallyWithT(t, func(collect *assert.CollectT) {
mapMutex.Lock()
Expand Down Expand Up @@ -434,14 +435,14 @@ func Test_gracePeriodWithNonReadyPodCondition(t *testing.T) {
assert.NoError(t, err)
}

go func(podWatcher Watcher) {
go func() {
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
mapMutex.Lock()
defer mapMutex.Unlock()
actual = colMap
})
require.NoError(t, err)
}(podWatcher)
}()

assert.EventuallyWithT(t, func(collect *assert.CollectT) {
mapMutex.Lock()
Expand All @@ -461,11 +462,11 @@ func Test_closeChannel(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

go func(podWatcher Watcher) {
go func() {
defer wg.Done()
err := podWatcher.Watch("default", &labelSelector, func(colMap map[string]*allocation.Collector) {})
require.NoError(t, err)
}(podWatcher)
}()

podWatcher.Close()
wg.Wait()
Expand Down
Loading