Skip to content

Commit e4580e8

Browse files
authored
Add stream class events (#211)
* Add stream class events * Fix unit tests * Fix main.py * More fix * Add more unit tests
1 parent 5505f78 commit e4580e8

File tree

6 files changed

+134
-37
lines changed

6 files changed

+134
-37
lines changed

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func main() {
6868
probesService := health.NewProbesService(appConfig.ProbesConfiguration)
6969
go func() {
7070
err := probesService.ListenAndServe(ctx)
71-
if err != nil {
71+
if err != nil && !errors.Is(err, context.Canceled) {
7272
setupLog.V(0).Error(err, "unable to start health probes server")
7373
panic(err)
7474
}
@@ -103,7 +103,7 @@ func main() {
103103
mgr,
104104
eventRecorder,
105105
)
106-
err = stream_class.NewStreamClassReconciler(mgr.GetClient(), controllerFactory, reporter).SetupWithManager(mgr)
106+
err = stream_class.NewStreamClassReconciler(mgr.GetClient(), controllerFactory, reporter, eventRecorder).SetupWithManager(mgr)
107107

108108
if err != nil {
109109
setupLog.V(0).Error(err, "unable to create controller", "controller", "StreamClass")

services/controllers/event_func.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package controllers
2+
3+
// EventFunc is a function type for event handling.
4+
// Used to abstract event handling logic and simplify function signatures.
5+
type EventFunc func()

services/controllers/stream/stream_reconciler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (s *streamReconciler) moveFsm(ctx context.Context, definition Definition, j
297297
)
298298
}
299299

300-
func (s *streamReconciler) stopStream(ctx context.Context, definition Definition, nextPhase Phase, eventFunc func()) (reconcile.Result, error) {
300+
func (s *streamReconciler) stopStream(ctx context.Context, definition Definition, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
301301
j := &batchv1.Job{}
302302
j.SetName(definition.NamespacedName().Name)
303303
j.SetNamespace(definition.NamespacedName().Namespace)
@@ -309,7 +309,7 @@ func (s *streamReconciler) stopStream(ctx context.Context, definition Definition
309309
return s.updateStreamPhase(ctx, definition, nil, nextPhase, eventFunc)
310310
}
311311

312-
func (s *streamReconciler) startBackfill(ctx context.Context, definition Definition, nextPhase Phase, eventFunc func()) (reconcile.Result, error) {
312+
func (s *streamReconciler) startBackfill(ctx context.Context, definition Definition, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
313313

314314
logger := s.getLogger(ctx, definition.NamespacedName())
315315
logger.V(2).Info("starting backfill by creating a backfill request")
@@ -334,7 +334,7 @@ func (s *streamReconciler) startBackfill(ctx context.Context, definition Definit
334334
return s.updateStreamPhase(ctx, definition, backfillRequest, nextPhase, eventFunc)
335335
}
336336

337-
func (s *streamReconciler) reconcileJob(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, eventFunc func()) (reconcile.Result, error) {
337+
func (s *streamReconciler) reconcileJob(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, nextPhase Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
338338
logger := s.getLogger(ctx, definition.NamespacedName())
339339
v1job := batchv1.Job{}
340340
err := s.client.Get(ctx, definition.NamespacedName(), &v1job)
@@ -401,7 +401,7 @@ func (s *streamReconciler) compareConfigurations(ctx context.Context, v1job batc
401401
return jobConfiguration == definitionConfiguration, nil
402402
}
403403

404-
func (s *streamReconciler) completeBackfill(ctx context.Context, job *StreamingJob, definition Definition, request *v1.BackfillRequest, nextStatus Phase, eventFunc func()) (reconcile.Result, error) {
404+
func (s *streamReconciler) completeBackfill(ctx context.Context, job *StreamingJob, definition Definition, request *v1.BackfillRequest, nextStatus Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
405405
if job != nil {
406406
err := s.client.Delete(ctx, job.ToV1Job())
407407
if client.IgnoreNotFound(err) != nil {
@@ -494,7 +494,7 @@ func (s *streamReconciler) getLogger(_ context.Context, request types.Namespaced
494494
WithValues("namespace", request.Namespace, "name", request.Name, "kind", s.gvk.Kind)
495495
}
496496

497-
func (s *streamReconciler) updateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc func()) (reconcile.Result, error) {
497+
func (s *streamReconciler) updateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
498498
logger := s.getLogger(ctx, definition.NamespacedName())
499499
if definition.GetPhase() == next { // coverage-ignore
500500
logger.V(0).Info("Stream phase is already set to", "phase", definition.GetPhase())

services/controllers/stream_class/stream_class.go

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77
v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
8+
"github.com/SneaksAndData/arcane-operator/services/controllers"
9+
corev1 "k8s.io/api/core/v1"
810
apierrors "k8s.io/apimachinery/pkg/api/errors"
911
"k8s.io/apimachinery/pkg/types"
12+
"k8s.io/client-go/tools/record"
1013
"k8s.io/klog/v2"
1114
runtime "sigs.k8s.io/controller-runtime"
1215
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -22,14 +25,16 @@ type StreamClassReconciler struct {
2225
streamControllers map[types.NamespacedName]*StreamControllerHandle
2326
streamControllerFactory UnmanagedControllerFactory
2427
reporter StreamClassMetricsReporter
28+
eventRecorder record.EventRecorder
2529
}
2630

27-
func NewStreamClassReconciler(client client.Client, streamControllerFactory UnmanagedControllerFactory, reporter StreamClassMetricsReporter) *StreamClassReconciler {
31+
func NewStreamClassReconciler(client client.Client, streamControllerFactory UnmanagedControllerFactory, reporter StreamClassMetricsReporter, eventRecorder record.EventRecorder) *StreamClassReconciler {
2832
return &StreamClassReconciler{
2933
client: client,
3034
streamControllers: make(map[types.NamespacedName]*StreamControllerHandle),
3135
streamControllerFactory: streamControllerFactory,
3236
reporter: reporter,
37+
eventRecorder: eventRecorder,
3338
}
3439
}
3540

@@ -40,14 +45,14 @@ func (s *StreamClassReconciler) Reconcile(ctx context.Context, request reconcile
4045
sc := &v1.StreamClass{}
4146
err := s.client.Get(ctx, request.NamespacedName, sc)
4247
deleted := apierrors.IsNotFound(err)
43-
if client.IgnoreNotFound(err) != nil {
48+
if client.IgnoreNotFound(err) != nil { // coverage-ignore
4449
logger.V(0).Error(err, "unable to get stream class")
4550
}
4651

4752
return s.moveFsm(ctx, sc, deleted, request.NamespacedName)
4853
}
4954

50-
func (s *StreamClassReconciler) SetupWithManager(mgr runtime.Manager) error {
55+
func (s *StreamClassReconciler) SetupWithManager(mgr runtime.Manager) error { // coverage-ignore (should be tested in e2e)
5156
return runtime.NewControllerManagedBy(mgr).For(&v1.StreamClass{}).Complete(s)
5257
}
5358

@@ -58,13 +63,41 @@ func (s *StreamClassReconciler) getLogger(ctx context.Context, request types.Nam
5863
}
5964

6065
func (s *StreamClassReconciler) moveFsm(ctx context.Context, sc *v1.StreamClass, deleted bool, name types.NamespacedName) (reconcile.Result, error) {
66+
logger := s.getLogger(ctx, name)
6167
switch {
62-
case !deleted && sc.Status.Phase == "":
63-
return s.updatePhase(ctx, sc, name, v1.PhasePending)
64-
case !deleted && (sc.Status.Phase == v1.PhasePending || sc.Status.Phase == v1.PhaseReady):
65-
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady)
6668
case deleted:
67-
return s.tryStopStreamController(ctx, name)
69+
return s.tryStopStreamController(ctx, name, nil)
70+
71+
case sc.Status.Phase == "":
72+
return s.updatePhase(ctx, sc, name, v1.PhasePending, func() {
73+
s.eventRecorder.Event(sc,
74+
corev1.EventTypeNormal,
75+
"StreamClassCreated",
76+
"New StreamClass has been created and is pending processing")
77+
})
78+
case sc.Status.Phase == v1.PhasePending:
79+
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
80+
s.eventRecorder.Event(sc,
81+
corev1.EventTypeNormal,
82+
"StreamClassReady",
83+
"StreamClass is ready and stream controller has been started")
84+
})
85+
case sc.Status.Phase == v1.PhaseFailed:
86+
logger.V(0).Info("Found StreamClass in Failed state, attempting to recover")
87+
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
88+
s.eventRecorder.Event(sc,
89+
corev1.EventTypeWarning,
90+
"StreamClassRecovered",
91+
"StreamClass was found in Failed state and recovery was attempted")
92+
})
93+
94+
case sc.Status.Phase == v1.PhaseReady:
95+
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
96+
s.eventRecorder.Event(sc,
97+
corev1.EventTypeNormal,
98+
"StreamClassReconciled",
99+
"StreamClass is reconciled and stream controller is running")
100+
})
68101
}
69102

70103
return reconcile.Result{}, fmt.Errorf("failed to reconcile StreamClass FSM for %s. Current state: %s",
@@ -73,7 +106,7 @@ func (s *StreamClassReconciler) moveFsm(ctx context.Context, sc *v1.StreamClass,
73106
)
74107
}
75108

76-
func (s *StreamClassReconciler) tryStartStreamController(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, nextPhase v1.Phase) (reconcile.Result, error) {
109+
func (s *StreamClassReconciler) tryStartStreamController(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, nextPhase v1.Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
77110
s.rwLock.Lock()
78111
defer s.rwLock.Unlock()
79112

@@ -82,14 +115,14 @@ func (s *StreamClassReconciler) tryStartStreamController(ctx context.Context, sc
82115
_, ok := s.streamControllers[name]
83116
if ok {
84117
logger.V(0).Info("Stream controller is already running")
85-
return s.updatePhase(ctx, sc, name, nextPhase)
118+
return s.updatePhase(ctx, sc, name, nextPhase, eventFunc)
86119
}
87120

88121
controller, err := s.streamControllerFactory.CreateStreamController(ctx, sc.TargetResourceGvk(), sc)
89122

90123
if err != nil {
91124
logger.V(0).Error(err, "unable to create stream reconciler")
92-
return s.updatePhase(ctx, sc, name, v1.PhaseFailed)
125+
return s.updatePhase(ctx, sc, name, v1.PhaseFailed, eventFunc)
93126
}
94127

95128
controllerContext, cancelFunc := context.WithCancel(ctx)
@@ -103,60 +136,71 @@ func (s *StreamClassReconciler) tryStartStreamController(ctx context.Context, sc
103136
logger := s.getLogger(ctx, name)
104137
logger.V(0).Error(err, "stream controller exited with error")
105138

106-
_, err = s.updatePhase(ctx, sc, name, v1.PhaseFailed)
139+
_, err = s.updatePhase(ctx, sc, name, v1.PhaseFailed, func() {
140+
s.eventRecorder.Event(sc,
141+
corev1.EventTypeWarning,
142+
"StreamControllerError",
143+
"Stream controller exited with error, StreamClass moved to Failed state")
144+
})
107145
if err != nil {
108146
logger := s.getLogger(ctx, name)
109147
logger.V(0).Error(err, "unable to update StreamClass phase to Failed after stream controller exited with error")
110148
}
111149
}
112150
}()
113151

114-
logger.V(1).Info("Stream controller is started")
152+
logger.V(0).Info("Stream controller is started")
115153
s.streamControllers[name] = &StreamControllerHandle{
116154
cancelFunc: cancelFunc,
117155
gvk: sc.TargetResourceGvk(),
118156
}
119157
s.reporter.AddStreamClass(sc.TargetResourceGvk().Kind, "stream_class", sc.MetricsTags())
120158

121-
return s.updatePhase(ctx, sc, name, nextPhase)
159+
return s.updatePhase(ctx, sc, name, nextPhase, eventFunc)
122160
}
123161

124-
func (s *StreamClassReconciler) tryStopStreamController(ctx context.Context, name types.NamespacedName) (reconcile.Result, error) {
162+
func (s *StreamClassReconciler) tryStopStreamController(ctx context.Context, name types.NamespacedName, eventFunc controllers.EventFunc) (reconcile.Result, error) {
125163
s.rwLock.Lock()
126164
defer s.rwLock.Unlock()
127165

128166
logger := s.getLogger(ctx, name)
129167
_, ok := s.streamControllers[name]
130168
if !ok {
131-
logger.V(2).Info("Stream controller is not running")
169+
logger.V(0).Info("Stream controller is not running")
132170
return reconcile.Result{}, nil
133171
}
134172
s.streamControllers[name].cancelFunc()
135-
logger.V(2).Info("Stream controller is stopped")
173+
logger.V(0).Info("Stream controller is stopped")
136174
s.reporter.RemoveStreamClass(s.streamControllers[name].gvk.Kind)
137175

138176
delete(s.streamControllers, name)
139-
return s.updatePhase(ctx, nil, name, v1.PhaseStopped)
177+
return s.updatePhase(ctx, nil, name, v1.PhaseStopped, eventFunc)
140178
}
141179

142-
func (s *StreamClassReconciler) updatePhase(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, nextPhase v1.Phase) (reconcile.Result, error) {
180+
func (s *StreamClassReconciler) updatePhase(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, nextPhase v1.Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
143181
logger := s.getLogger(ctx, name)
144182
if sc == nil {
145183
logger.V(0).Info("Stream class is deleted, skipping phase update")
146184
return reconcile.Result{}, nil
147185
}
148186

149187
if sc.Status.Phase == nextPhase {
150-
logger.V(2).Info("StreamClass phase is already set to", sc.Status.Phase)
188+
logger.V(0).Info("StreamClass phase is already set to", sc.Status.Phase)
151189
return reconcile.Result{}, nil
152190
}
153191

154-
logger.V(1).Info("Updating StreamClass phase", "from", sc.Status.Phase, "to", nextPhase)
192+
logger.V(0).Info("Updating StreamClass phase", "from", sc.Status.Phase, "to", nextPhase)
155193
sc.Status.Phase = nextPhase
156194
err := s.client.Status().Update(ctx, sc)
195+
196+
if eventFunc != nil {
197+
eventFunc()
198+
}
199+
157200
if client.IgnoreNotFound(err) != nil {
158-
logger.V(1).Error(err, "unable to update Stream Class status")
201+
logger.V(0).Error(err, "unable to update Stream Class status")
159202
return reconcile.Result{}, err
160203
}
204+
161205
return reconcile.Result{}, nil
162206
}

0 commit comments

Comments
 (0)