Skip to content

Commit 04a4841

Browse files
authored
Merge pull request #12536 from sbueringer/pr-fix-ms-issue
🐛 Fix condition handling during controller start
2 parents 66c75eb + 3f1d55c commit 04a4841

File tree

12 files changed

+540
-106
lines changed

12 files changed

+540
-106
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type clusterAccessor struct {
5151
lockedStateLock sync.RWMutex
5252

5353
// lockedState is the state of the clusterAccessor. This includes the connection (e.g. client, cache)
54-
// and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures).
54+
// and health checking information (e.g. lastProbeSuccessTime, consecutiveFailures).
5555
// lockedStateLock must be *always* held (via lock or rLock) before accessing this field.
5656
lockedState clusterAccessorLockedState
5757

@@ -151,11 +151,11 @@ type clusterAccessorHealthProbeConfig struct {
151151
}
152152

153153
// clusterAccessorLockedState is the state of the clusterAccessor. This includes the connection (e.g. client, cache)
154-
// and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures).
154+
// and health checking information (e.g. lastProbeSuccessTime, consecutiveFailures).
155155
// lockedStateLock must be *always* held (via lock or rLock) before accessing this field.
156156
type clusterAccessorLockedState struct {
157-
// lastConnectionCreationErrorTimestamp is the timestamp when connection creation failed the last time.
158-
lastConnectionCreationErrorTimestamp time.Time
157+
// lastConnectionCreationErrorTime is the time when connection creation failed the last time.
158+
lastConnectionCreationErrorTime time.Time
159159

160160
// connection holds the connection state (e.g. client, cache) of the clusterAccessor.
161161
connection *clusterAccessorLockedConnectionState
@@ -167,7 +167,7 @@ type clusterAccessorLockedState struct {
167167
// private key in every single Reconcile.
168168
clientCertificatePrivateKey *rsa.PrivateKey
169169

170-
// healthChecking holds the health checking state (e.g. lastProbeSuccessTimestamp, consecutiveFailures)
170+
// healthChecking holds the health checking state (e.g. lastProbeSuccessTime, consecutiveFailures)
171171
// of the clusterAccessor.
172172
healthChecking clusterAccessorLockedHealthCheckingState
173173
}
@@ -201,14 +201,14 @@ type clusterAccessorLockedConnectionState struct {
201201
watches sets.Set[string]
202202
}
203203

204-
// clusterAccessorLockedHealthCheckingState holds the health checking state (e.g. lastProbeSuccessTimestamp,
204+
// clusterAccessorLockedHealthCheckingState holds the health checking state (e.g. lastProbeSuccessTime,
205205
// consecutiveFailures) of the clusterAccessor.
206206
type clusterAccessorLockedHealthCheckingState struct {
207-
// lastProbeTimestamp is the time when the health probe was executed last.
208-
lastProbeTimestamp time.Time
207+
// lastProbeTime is the time when the health probe was executed last.
208+
lastProbeTime time.Time
209209

210-
// lastProbeSuccessTimestamp is the time when the health probe was successfully executed last.
211-
lastProbeSuccessTimestamp time.Time
210+
// lastProbeSuccessTime is the time when the health probe was successfully executed last.
211+
lastProbeSuccessTime time.Time
212212

213213
// consecutiveFailures is the number of consecutive health probe failures.
214214
consecutiveFailures int
@@ -261,7 +261,11 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) {
261261
if retErr != nil {
262262
log.Error(retErr, "Connect failed")
263263
connectionUp.WithLabelValues(ca.cluster.Name, ca.cluster.Namespace).Set(0)
264-
ca.lockedState.lastConnectionCreationErrorTimestamp = time.Now()
264+
ca.lockedState.lastConnectionCreationErrorTime = time.Now()
265+
// A client creation just failed, so let's count this as a failed probe.
266+
ca.lockedState.healthChecking.lastProbeTime = time.Now()
267+
// Note: Intentionally not modifying lastProbeSuccessTime.
268+
ca.lockedState.healthChecking.consecutiveFailures++
265269
} else {
266270
connectionUp.WithLabelValues(ca.cluster.Name, ca.cluster.Namespace).Set(1)
267271
}
@@ -288,9 +292,9 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) {
288292
now := time.Now()
289293
ca.lockedState.healthChecking = clusterAccessorLockedHealthCheckingState{
290294
// A client was just created successfully, so let's set the last probe times.
291-
lastProbeTimestamp: now,
292-
lastProbeSuccessTimestamp: now,
293-
consecutiveFailures: 0,
295+
lastProbeTime: now,
296+
lastProbeSuccessTime: now,
297+
consecutiveFailures: 0,
294298
}
295299
ca.lockedState.connection = &clusterAccessorLockedConnectionState{
296300
restConfig: connection.RESTConfig,
@@ -350,7 +354,7 @@ func (ca *clusterAccessor) HealthCheck(ctx context.Context) (bool, bool) {
350354
ca.lock(ctx)
351355
defer ca.unlock(ctx)
352356

353-
ca.lockedState.healthChecking.lastProbeTimestamp = time.Now()
357+
ca.lockedState.healthChecking.lastProbeTime = time.Now()
354358

355359
unauthorizedErrorOccurred := false
356360
switch {
@@ -371,7 +375,7 @@ func (ca *clusterAccessor) HealthCheck(ctx context.Context) (bool, bool) {
371375
healthChecksTotal.WithLabelValues(ca.cluster.Name, ca.cluster.Namespace, "error").Inc()
372376
default:
373377
ca.lockedState.healthChecking.consecutiveFailures = 0
374-
ca.lockedState.healthChecking.lastProbeSuccessTimestamp = ca.lockedState.healthChecking.lastProbeTimestamp
378+
ca.lockedState.healthChecking.lastProbeSuccessTime = ca.lockedState.healthChecking.lastProbeTime
375379
log.V(6).Info("Health probe succeeded")
376380
healthCheck.WithLabelValues(ca.cluster.Name, ca.cluster.Namespace).Set(1)
377381
healthChecksTotal.WithLabelValues(ca.cluster.Name, ca.cluster.Namespace, "success").Inc()
@@ -462,25 +466,22 @@ func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error {
462466
return nil
463467
}
464468

465-
func (ca *clusterAccessor) GetLastProbeSuccessTimestamp(ctx context.Context) time.Time {
469+
func (ca *clusterAccessor) GetHealthCheckingState(ctx context.Context) HealthCheckingState {
466470
ca.rLock(ctx)
467471
defer ca.rUnlock(ctx)
468472

469-
return ca.lockedState.healthChecking.lastProbeSuccessTimestamp
470-
}
471-
472-
func (ca *clusterAccessor) GetLastProbeTimestamp(ctx context.Context) time.Time {
473-
ca.rLock(ctx)
474-
defer ca.rUnlock(ctx)
475-
476-
return ca.lockedState.healthChecking.lastProbeTimestamp
473+
return HealthCheckingState{
474+
LastProbeTime: ca.lockedState.healthChecking.lastProbeTime,
475+
LastProbeSuccessTime: ca.lockedState.healthChecking.lastProbeSuccessTime,
476+
ConsecutiveFailures: ca.lockedState.healthChecking.consecutiveFailures,
477+
}
477478
}
478479

479-
func (ca *clusterAccessor) GetLastConnectionCreationErrorTimestamp(ctx context.Context) time.Time {
480+
func (ca *clusterAccessor) GetLastConnectionCreationErrorTime(ctx context.Context) time.Time {
480481
ca.rLock(ctx)
481482
defer ca.rUnlock(ctx)
482483

483-
return ca.lockedState.lastConnectionCreationErrorTimestamp
484+
return ca.lockedState.lastConnectionCreationErrorTime
484485
}
485486

486487
func (ca *clusterAccessor) rLock(ctx context.Context) {

controllers/clustercache/cluster_accessor_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,14 @@ func TestConnect(t *testing.T) {
8181
g.Expect(err).To(HaveOccurred())
8282
g.Expect(err.Error()).To(Equal("error creating REST config: error getting kubeconfig secret: Secret \"test-cluster-kubeconfig\" not found"))
8383
g.Expect(accessor.Connected(ctx)).To(BeFalse())
84-
g.Expect(accessor.lockedState.lastConnectionCreationErrorTimestamp.IsZero()).To(BeFalse())
85-
accessor.lockedState.lastConnectionCreationErrorTimestamp = time.Time{} // so we can compare in the next line
86-
g.Expect(accessor.lockedState).To(Equal(clusterAccessorLockedState{}))
84+
g.Expect(accessor.lockedState.lastConnectionCreationErrorTime.IsZero()).To(BeFalse())
85+
g.Expect(accessor.lockedState).To(Equal(clusterAccessorLockedState{
86+
lastConnectionCreationErrorTime: accessor.lockedState.lastConnectionCreationErrorTime,
87+
healthChecking: clusterAccessorLockedHealthCheckingState{
88+
lastProbeTime: accessor.lockedState.healthChecking.lastProbeTime,
89+
consecutiveFailures: 1,
90+
},
91+
}))
8792

8893
// Create invalid kubeconfig Secret
8994
kubeconfigBytes := kubeconfig.FromEnvTestConfig(env.Config, testCluster)
@@ -100,7 +105,7 @@ func TestConnect(t *testing.T) {
100105
g.Expect(err).To(HaveOccurred())
101106
g.Expect(err.Error()).To(Equal("error creating HTTP client and mapper: cluster is not reachable: the server could not find the requested resource"))
102107
g.Expect(accessor.Connected(ctx)).To(BeFalse())
103-
g.Expect(accessor.lockedState.lastConnectionCreationErrorTimestamp.IsZero()).To(BeFalse())
108+
g.Expect(accessor.lockedState.lastConnectionCreationErrorTime.IsZero()).To(BeFalse())
104109

105110
// Cleanup invalid kubeconfig Secret
106111
g.Expect(env.CleanupAndWait(ctx, kubeconfigSecret)).To(Succeed())
@@ -127,8 +132,8 @@ func TestConnect(t *testing.T) {
127132

128133
g.Expect(accessor.lockedState.clientCertificatePrivateKey).ToNot(BeNil())
129134

130-
g.Expect(accessor.lockedState.healthChecking.lastProbeTimestamp.IsZero()).To(BeFalse())
131-
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTimestamp.IsZero()).To(BeFalse())
135+
g.Expect(accessor.lockedState.healthChecking.lastProbeTime.IsZero()).To(BeFalse())
136+
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse())
132137
g.Expect(accessor.lockedState.healthChecking.consecutiveFailures).To(Equal(0))
133138

134139
// Get client and test Get & List
@@ -201,8 +206,8 @@ func TestDisconnect(t *testing.T) {
201206
// Verify health checking state was preserved
202207
g.Expect(accessor.lockedState.clientCertificatePrivateKey).ToNot(BeNil())
203208

204-
g.Expect(accessor.lockedState.healthChecking.lastProbeTimestamp.IsZero()).To(BeFalse())
205-
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTimestamp.IsZero()).To(BeFalse())
209+
g.Expect(accessor.lockedState.healthChecking.lastProbeTime.IsZero()).To(BeFalse())
210+
g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse())
206211
}
207212

208213
func TestHealthCheck(t *testing.T) {

controllers/clustercache/cluster_cache.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ type ClusterCache interface {
152152
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
153153
Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error
154154

155-
// GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last.
156-
GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time
155+
// GetHealthCheckingState returns the health checking state of a Cluster.
156+
GetHealthCheckingState(ctx context.Context, cluster client.ObjectKey) HealthCheckingState
157157

158158
// GetClusterSource returns a Source of Cluster events.
159159
// The mapFunc will be used to map from Cluster to reconcile.Request.
@@ -166,6 +166,21 @@ type ClusterCache interface {
166166
GetClusterSource(controllerName string, mapFunc func(ctx context.Context, cluster client.Object) []ctrl.Request, opts ...GetClusterSourceOption) source.Source
167167
}
168168

169+
// HealthCheckingState holds the health checking state for a Cluster.
170+
type HealthCheckingState struct {
171+
// LastProbeTime is the time when a health probe was executed last.
172+
// Note: client creations are also counted as probes.
173+
LastProbeTime time.Time
174+
175+
// LastProbeSuccessTime is the time when a health probe was successfully executed last.
176+
// Note: client creations are also counted as probes.
177+
LastProbeSuccessTime time.Time
178+
179+
// ConsecutiveFailures is the number of consecutive health probe failures.
180+
// Note: client creations are also counted as probes.
181+
ConsecutiveFailures int
182+
}
183+
169184
// ErrClusterNotConnected is returned by the ClusterCache when e.g. a Client cannot be returned
170185
// because there is no connection to the workload cluster.
171186
var ErrClusterNotConnected = errors.New("connection to the workload cluster is down")
@@ -262,7 +277,7 @@ func (o *GetClusterSourceOptions) ApplyOptions(opts []GetClusterSourceOption) *G
262277

263278
// WatchForProbeFailure will configure the Cluster source to enqueue reconcile.Requests if the health probe
264279
// didn't succeed for the configured duration.
265-
// For example if WatchForProbeFailure is set to 5m, an event will be sent if LastProbeSuccessTimestamp
280+
// For example if WatchForProbeFailure is set to 5m, an event will be sent if LastProbeSuccessTime
266281
// is 5m in the past (i.e. health probes didn't succeed in the last 5m).
267282
type WatchForProbeFailure time.Duration
268283

@@ -353,11 +368,11 @@ type clusterSource struct {
353368
// ch is the channel on which to send events.
354369
ch chan event.GenericEvent
355370

356-
// sendEventAfterProbeFailureDurations are the durations after LastProbeSuccessTimestamp
371+
// sendEventAfterProbeFailureDurations are the durations after LastProbeSuccessTime
357372
// after which we have to send events.
358373
sendEventAfterProbeFailureDurations []time.Duration
359374

360-
// lastEventSentTimeByCluster are the timestamps when we last sent an event for a cluster.
375+
// lastEventSentTimeByCluster are the times when we last sent an event for a cluster.
361376
lastEventSentTimeByCluster map[client.ObjectKey]time.Time
362377
}
363378

@@ -401,12 +416,12 @@ func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, wat
401416
return accessor.Watch(ctx, watcher)
402417
}
403418

404-
func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {
419+
func (cc *clusterCache) GetHealthCheckingState(ctx context.Context, cluster client.ObjectKey) HealthCheckingState {
405420
accessor := cc.getClusterAccessor(cluster)
406421
if accessor == nil {
407-
return time.Time{}
422+
return HealthCheckingState{}
408423
}
409-
return accessor.GetLastProbeSuccessTimestamp(ctx)
424+
return accessor.GetHealthCheckingState(ctx)
410425
}
411426

412427
const (
@@ -452,10 +467,10 @@ func (cc *clusterCache) Reconcile(ctx context.Context, req reconcile.Request) (r
452467
// Try to connect, if not connected.
453468
connected := accessor.Connected(ctx)
454469
if !connected {
455-
lastConnectionCreationErrorTimestamp := accessor.GetLastConnectionCreationErrorTimestamp(ctx)
470+
lastConnectionCreationErrorTime := accessor.GetLastConnectionCreationErrorTime(ctx)
456471

457472
// Requeue, if connection creation failed within the ConnectionCreationRetryInterval.
458-
if requeueAfter, requeue := shouldRequeue(time.Now(), lastConnectionCreationErrorTimestamp, accessor.config.ConnectionCreationRetryInterval); requeue {
473+
if requeueAfter, requeue := shouldRequeue(time.Now(), lastConnectionCreationErrorTime, accessor.config.ConnectionCreationRetryInterval); requeue {
459474
log.V(6).Info(fmt.Sprintf("Requeuing after %s as connection creation already failed within the last %s",
460475
requeueAfter.Truncate(time.Second/10), accessor.config.ConnectionCreationRetryInterval))
461476
requeueAfterDurations = append(requeueAfterDurations, requeueAfter)
@@ -475,10 +490,10 @@ func (cc *clusterCache) Reconcile(ctx context.Context, req reconcile.Request) (r
475490

476491
// Run the health probe, if connected.
477492
if connected {
478-
lastProbeTimestamp := accessor.GetLastProbeTimestamp(ctx)
493+
healthCheckingState := accessor.GetHealthCheckingState(ctx)
479494

480495
// Requeue, if health probe was already run within the HealthProbe.Interval.
481-
if requeueAfter, requeue := shouldRequeue(time.Now(), lastProbeTimestamp, accessor.config.HealthProbe.Interval); requeue {
496+
if requeueAfter, requeue := shouldRequeue(time.Now(), healthCheckingState.LastProbeTime, accessor.config.HealthProbe.Interval); requeue {
482497
log.V(6).Info(fmt.Sprintf("Requeuing after %s as health probe was already run within the last %s",
483498
requeueAfter.Truncate(time.Second/10), accessor.config.HealthProbe.Interval))
484499
requeueAfterDurations = append(requeueAfterDurations, requeueAfter)
@@ -515,8 +530,7 @@ func (cc *clusterCache) Reconcile(ctx context.Context, req reconcile.Request) (r
515530
}
516531

517532
// Send events to cluster sources.
518-
lastProbeSuccessTime := accessor.GetLastProbeSuccessTimestamp(ctx)
519-
cc.sendEventsToClusterSources(ctx, cluster, time.Now(), lastProbeSuccessTime, didConnect, didDisconnect)
533+
cc.sendEventsToClusterSources(ctx, cluster, time.Now(), accessor.GetHealthCheckingState(ctx).LastProbeSuccessTime, didConnect, didDisconnect)
520534

521535
// Requeue based on requeueAfterDurations (fallback to defaultRequeueAfter).
522536
return reconcile.Result{RequeueAfter: minDurationOrDefault(requeueAfterDurations, defaultRequeueAfter)}, nil

controllers/clustercache/cluster_cache_fake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ func NewFakeClusterCache(workloadClient client.Client, clusterKey client.ObjectK
3636
watches: sets.Set[string]{}.Insert(watchObjects...),
3737
},
3838
healthChecking: clusterAccessorLockedHealthCheckingState{
39-
lastProbeTimestamp: time.Now(),
40-
lastProbeSuccessTimestamp: time.Now(),
41-
consecutiveFailures: 0,
39+
lastProbeTime: time.Now(),
40+
lastProbeSuccessTime: time.Now(),
41+
consecutiveFailures: 0,
4242
},
4343
},
4444
}

0 commit comments

Comments
 (0)