Skip to content

Commit f4e4fa2

Browse files
committed
Fix unit tests
1 parent f803bde commit f4e4fa2

File tree

1 file changed

+21
-15
lines changed

1 file changed

+21
-15
lines changed

services/controllers/stream_class/stream_class.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ func (s *StreamClassReconciler) moveFsm(ctx context.Context, sc *v1.StreamClass,
7171
return s.tryStopStreamController(ctx, name, nil)
7272

7373
case sc.Status.Phase == "":
74-
return s.updatePhase(ctx, sc, name, v1.PhasePending, func() {
74+
return s.updatePhase(ctx, unsetRetry(sc), name, v1.PhasePending, func() {
7575
s.eventRecorder.Event(sc,
7676
corev1.EventTypeNormal,
7777
"StreamClassCreated",
7878
"New StreamClass has been created and is pending processing")
7979
})
8080
case sc.Status.Phase == v1.PhasePending:
81-
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
81+
return s.tryStartStreamController(ctx, unsetRetry(sc), name, v1.PhaseReady, func() {
8282
s.eventRecorder.Event(sc,
8383
corev1.EventTypeNormal,
8484
"StreamClassReady",
@@ -91,23 +91,24 @@ func (s *StreamClassReconciler) moveFsm(ctx context.Context, sc *v1.StreamClass,
9191
case sc.Status.Phase == v1.PhaseFailed && sc.Status.ReconcileAfter != nil:
9292
logger.V(0).Info("Found StreamClass in Failed state, attempting to recover")
9393
t := time.Until(*sc.Status.ReconcileAfter)
94-
if t <= 0 {
95-
logger.V(0).Info("ReconcileAfter time has passed, attempting to restart stream controller")
96-
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
97-
s.eventRecorder.Event(sc,
98-
corev1.EventTypeNormal,
99-
"StreamClassRecovered",
100-
"StreamClass has been recovered from Failed state and stream controller has been restarted")
101-
})
94+
if t >= 0 {
95+
logger.V(0).Info("StreamClass is in Failed state, waiting until ReconcileAfter time to attempt recovery",
96+
"reconcileAfter",
97+
sc.Status.ReconcileAfter)
98+
return reconcile.Result{RequeueAfter: t}, nil
10299
}
103-
logger.V(0).Info("StreamClass is in Failed state, waiting until ReconcileAfter time to attempt recovery",
104-
"reconcileAfter",
105-
sc.Status.ReconcileAfter)
106-
return reconcile.Result{RequeueAfter: t}, nil
100+
101+
logger.V(0).Info("ReconcileAfter time has passed, attempting to restart stream controller")
102+
return s.tryStartStreamController(ctx, unsetRetry(sc), name, v1.PhaseReady, func() {
103+
s.eventRecorder.Event(sc,
104+
corev1.EventTypeNormal,
105+
"StreamClassRecovered",
106+
"StreamClass has been recovered from Failed state and stream controller has been restarted")
107+
})
107108

108109
case sc.Status.Phase == v1.PhaseReady:
109110
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
110-
s.eventRecorder.Event(sc,
111+
s.eventRecorder.Event(unsetRetry(sc),
111112
corev1.EventTypeNormal,
112113
"StreamClassReconciled",
113114
"StreamClass is reconciled and stream controller is running")
@@ -178,6 +179,11 @@ func (s *StreamClassReconciler) tryStartStreamController(ctx context.Context, sc
178179
return s.updatePhase(ctx, sc, name, nextPhase, eventFunc)
179180
}
180181

182+
func unsetRetry(sc *v1.StreamClass) *v1.StreamClass {
183+
sc.Status.ReconcileAfter = nil
184+
return sc
185+
}
186+
181187
func (s *StreamClassReconciler) setForRetry(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, logger klog.Logger) error {
182188
now := metav1.Now().Add(time.Second * 10) // TODO: make the retry delay configurable and implement exponential backoff
183189
sc.Status.ReconcileAfter = &now

0 commit comments

Comments
 (0)