Skip to content

Commit 45b31f8

Browse files
Address flaking tests (#21)
1 parent 77fa06b commit 45b31f8

File tree

3 files changed

+47
-26
lines changed

3 files changed

+47
-26
lines changed

app/app_dependencies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (appServices *ApplicationServices) WithKubeClient(ctx context.Context, kube
5555
func (appServices *ApplicationServices) WithSupervisor(ctx context.Context, resourceNamespace string) *ApplicationServices {
5656
if appServices.supervisor == nil {
5757
logger := klog.FromContext(ctx)
58-
appServices.supervisor = services.NewSupervisor(appServices.kubeClient, resourceNamespace, appServices.cqlStore, logger, nil)
58+
appServices.supervisor = services.NewSupervisor(appServices.kubeClient, resourceNamespace, appServices.cqlStore, logger, nil, nil)
5959
}
6060

6161
return appServices

services/supervisor.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ import (
2121
)
2222

2323
type Supervisor struct {
24-
logger klog.Logger
25-
factory kubeinformers.SharedInformerFactory
26-
eventInformer cache.SharedIndexInformer
27-
podInformer cache.SharedIndexInformer
28-
jobInformer cache.SharedIndexInformer
29-
informers map[string]cache.SharedIndexInformer
24+
logger klog.Logger
25+
factory kubeinformers.SharedInformerFactory
26+
eventInformer cache.SharedIndexInformer
27+
podInformer cache.SharedIndexInformer
28+
jobInformer cache.SharedIndexInformer
29+
informers map[string]cache.SharedIndexInformer
30+
31+
eventInformerSynced cache.InformerSynced
32+
podInformerSynced cache.InformerSynced
33+
jobInformerSynced cache.InformerSynced
34+
3035
kubeClient kubernetes.Interface
3136
resourceNamespace string
3237
cqlStore *request.CqlStore
@@ -61,22 +66,37 @@ type RunStatusAnalysisResult struct {
6166
}
6267

6368
// NewSupervisor creates a new cache + resource watcher for pod and job resources
64-
func NewSupervisor(client kubernetes.Interface, resourceNamespace string, cqlStore *request.CqlStore, logger klog.Logger, resyncPeriod *time.Duration) *Supervisor {
69+
func NewSupervisor(client kubernetes.Interface, resourceNamespace string, cqlStore *request.CqlStore, logger klog.Logger, resyncPeriod *time.Duration, syncState *func() bool) *Supervisor {
6570
defaultResyncPeriod := time.Second * 30
6671
factory := kubeinformers.NewSharedInformerFactoryWithOptions(client, *util.CoalescePointer(resyncPeriod, &defaultResyncPeriod), kubeinformers.WithNamespace(resourceNamespace))
6772

6873
eventInformer := factory.Core().V1().Events().Informer()
6974
podInformer := factory.Core().V1().Pods().Informer()
7075
jobInformer := factory.Batch().V1().Jobs().Informer()
7176

77+
eventInformerSynced := eventInformer.HasSynced
78+
podInformerSynced := podInformer.HasSynced
79+
jobInformerSynced := jobInformer.HasSynced
80+
81+
if syncState != nil {
82+
eventInformerSynced = *syncState
83+
podInformerSynced = *syncState
84+
jobInformerSynced = *syncState
85+
}
86+
7287
return &Supervisor{
73-
logger: logger,
74-
factory: factory,
75-
kubeClient: client,
76-
resourceNamespace: resourceNamespace,
77-
eventInformer: eventInformer,
78-
podInformer: podInformer,
79-
jobInformer: jobInformer,
88+
logger: logger,
89+
factory: factory,
90+
kubeClient: client,
91+
resourceNamespace: resourceNamespace,
92+
eventInformer: eventInformer,
93+
podInformer: podInformer,
94+
jobInformer: jobInformer,
95+
96+
eventInformerSynced: eventInformerSynced,
97+
podInformerSynced: podInformerSynced,
98+
jobInformerSynced: jobInformerSynced,
99+
80100
cqlStore: cqlStore,
81101
elementReceiverActor: nil,
82102
}
@@ -101,8 +121,10 @@ func (c *Supervisor) Init(_ context.Context, config *ProcessingConfig) error {
101121
"Pod": c.podInformer,
102122
}
103123

104-
_, eventErr := c.eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
124+
_, eventErr := c.eventInformer.AddEventHandlerWithOptions(cache.ResourceEventHandlerFuncs{
105125
AddFunc: c.onEvent,
126+
}, cache.HandlerOptions{
127+
Logger: &c.logger,
106128
})
107129

108130
if eventErr != nil {
@@ -113,6 +135,7 @@ func (c *Supervisor) Init(_ context.Context, config *ProcessingConfig) error {
113135
}
114136

115137
func (c *Supervisor) onEvent(obj interface{}) {
138+
c.logger.V(4).Info("event received", "object", obj)
116139
_, err := cache.ObjectToName(obj)
117140
if err != nil { // coverage-ignore
118141
utilruntime.HandleError(err)
@@ -354,7 +377,7 @@ func (c *Supervisor) Start(ctx context.Context) {
354377
c.elementReceiverActor.Start(ctx, pipeline.NewActorPostStart(func(ctx context.Context) error {
355378
c.factory.Start(ctx.Done())
356379

357-
if ok := cache.WaitForCacheSync(ctx.Done(), c.eventInformer.HasSynced, c.podInformer.HasSynced, c.jobInformer.HasSynced); !ok { // coverage-ignore
380+
if ok := cache.WaitForCacheSync(ctx.Done(), c.podInformerSynced, c.jobInformerSynced, c.eventInformerSynced); !ok { // coverage-ignore
358381
return fmt.Errorf("failed to wait for pod informer caches to sync")
359382
}
360383

services/supervisor_test.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
)
1919

2020
var noResyncPeriod = time.Second * 0
21+
var alwaysReady = func() bool { return true }
2122

2223
type fixture struct {
2324
supervisor *Supervisor
@@ -37,10 +38,7 @@ func newFixture(t *testing.T, k8sObjects []runtime.Object) *fixture {
3738
Hosts: []string{"127.0.0.1"},
3839
})
3940
f.kubeClient = fake.NewClientset(k8sObjects...)
40-
41-
time.Sleep(time.Second)
42-
43-
f.supervisor = NewSupervisor(f.kubeClient, "nexus", f.cqlStore, klog.FromContext(f.ctx), &noResyncPeriod)
41+
f.supervisor = NewSupervisor(f.kubeClient, "nexus", f.cqlStore, klog.FromContext(f.ctx), &noResyncPeriod, &alwaysReady)
4442

4543
return f
4644
}
@@ -553,11 +551,11 @@ func TestSupervisor(t *testing.T) {
553551

554552
f := newFixture(t, k8sObjects)
555553
err := f.supervisor.Init(f.ctx, &ProcessingConfig{
556-
FailureRateBaseDelay: time.Second,
557-
FailureRateMaxDelay: time.Second * 2,
558-
RateLimitElementsPerSecond: 1000,
559-
RateLimitElementsBurst: 1000,
560-
Workers: 1,
554+
FailureRateBaseDelay: time.Millisecond * 100,
555+
FailureRateMaxDelay: time.Second,
556+
RateLimitElementsPerSecond: 10,
557+
RateLimitElementsBurst: 10,
558+
Workers: 4,
561559
})
562560

563561
time.Sleep(time.Second)

0 commit comments

Comments
 (0)