Skip to content

Commit 03e3249

Browse files
committed
Implement Prometheus instrumentation
Signed-off-by: Stefan Prodan <[email protected]>
1 parent c8c2eec commit 03e3249

File tree

5 files changed

+166
-0
lines changed

5 files changed

+166
-0
lines changed

controllers/bucket_controller.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/controller-runtime/pkg/controller"
4141

4242
"github.com/fluxcd/pkg/runtime/events"
43+
"github.com/fluxcd/pkg/runtime/metrics"
4344
"github.com/fluxcd/pkg/runtime/predicates"
4445

4546
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@@ -53,6 +54,7 @@ type BucketReconciler struct {
5354
Storage *Storage
5455
EventRecorder kuberecorder.EventRecorder
5556
ExternalEventRecorder *events.Recorder
57+
MetricsRecorder *metrics.Recorder
5658
}
5759

5860
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@@ -91,6 +93,8 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
9193
// Return the error so we retry the failed garbage collection
9294
return ctrl.Result{}, err
9395
}
96+
// Record deleted status
97+
r.recordReadiness(bucket, true)
9498
// Remove our finalizer from the list and update it
9599
bucket.ObjectMeta.Finalizers = removeString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
96100
if err := r.Update(ctx, &bucket); err != nil {
@@ -101,13 +105,23 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
101105
}
102106
}
103107

108+
// record reconciliation duration
109+
if r.MetricsRecorder != nil {
110+
objRef, err := reference.GetReference(r.Scheme, &bucket)
111+
if err != nil {
112+
return ctrl.Result{}, err
113+
}
114+
defer r.MetricsRecorder.RecordDuration(*objRef, start)
115+
}
116+
104117
// set initial status
105118
if resetBucket, ok := r.resetStatus(bucket); ok {
106119
bucket = resetBucket
107120
if err := r.Status().Update(ctx, &bucket); err != nil {
108121
log.Error(err, "unable to update status")
109122
return ctrl.Result{Requeue: true}, err
110123
}
124+
r.recordReadiness(bucket, false)
111125
}
112126

113127
// purge old artifacts from storage
@@ -127,13 +141,15 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
127141
// if reconciliation failed, record the failure and requeue immediately
128142
if reconcileErr != nil {
129143
r.event(reconciledBucket, events.EventSeverityError, reconcileErr.Error())
144+
r.recordReadiness(reconciledBucket, false)
130145
return ctrl.Result{Requeue: true}, reconcileErr
131146
}
132147

133148
// emit revision change event
134149
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
135150
r.event(reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
136151
}
152+
r.recordReadiness(reconciledBucket, false)
137153

138154
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
139155
time.Now().Sub(start).String(),
@@ -365,3 +381,26 @@ func (r *BucketReconciler) event(bucket sourcev1.Bucket, severity, msg string) {
365381
}
366382
}
367383
}
384+
385+
func (r *BucketReconciler) recordReadiness(bucket sourcev1.Bucket, deleted bool) {
386+
if r.MetricsRecorder == nil {
387+
return
388+
}
389+
390+
objRef, err := reference.GetReference(r.Scheme, &bucket)
391+
if err != nil {
392+
r.Log.WithValues(
393+
strings.ToLower(bucket.Kind),
394+
fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()),
395+
).Error(err, "unable to record readiness metric")
396+
return
397+
}
398+
if rc := meta.GetCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
399+
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
400+
} else {
401+
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
402+
Type: meta.ReadyCondition,
403+
Status: corev1.ConditionUnknown,
404+
}, deleted)
405+
}
406+
}

controllers/gitrepository_controller.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/controller"
3939

4040
"github.com/fluxcd/pkg/runtime/events"
41+
"github.com/fluxcd/pkg/runtime/metrics"
4142
"github.com/fluxcd/pkg/runtime/predicates"
4243

4344
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@@ -52,6 +53,7 @@ type GitRepositoryReconciler struct {
5253
Storage *Storage
5354
EventRecorder kuberecorder.EventRecorder
5455
ExternalEventRecorder *events.Recorder
56+
MetricsRecorder *metrics.Recorder
5557
}
5658

5759
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@@ -90,6 +92,8 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
9092
// Return the error so we retry the failed garbage collection
9193
return ctrl.Result{}, err
9294
}
95+
// Record deleted status
96+
r.recordReadiness(repository, true)
9397
// Remove our finalizer from the list and update it
9498
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
9599
if err := r.Update(ctx, &repository); err != nil {
@@ -100,13 +104,23 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
100104
}
101105
}
102106

107+
// record reconciliation duration
108+
if r.MetricsRecorder != nil {
109+
objRef, err := reference.GetReference(r.Scheme, &repository)
110+
if err != nil {
111+
return ctrl.Result{}, err
112+
}
113+
defer r.MetricsRecorder.RecordDuration(*objRef, start)
114+
}
115+
103116
// set initial status
104117
if resetRepository, ok := r.resetStatus(repository); ok {
105118
repository = resetRepository
106119
if err := r.Status().Update(ctx, &repository); err != nil {
107120
log.Error(err, "unable to update status")
108121
return ctrl.Result{Requeue: true}, err
109122
}
123+
r.recordReadiness(repository, false)
110124
}
111125

112126
// purge old artifacts from storage
@@ -126,13 +140,15 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
126140
// if reconciliation failed, record the failure and requeue immediately
127141
if reconcileErr != nil {
128142
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
143+
r.recordReadiness(reconciledRepository, false)
129144
return ctrl.Result{Requeue: true}, reconcileErr
130145
}
131146

132147
// emit revision change event
133148
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
134149
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
135150
}
151+
r.recordReadiness(reconciledRepository, false)
136152

137153
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
138154
time.Now().Sub(start).String(),
@@ -325,3 +341,26 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever
325341
}
326342
}
327343
}
344+
345+
func (r *GitRepositoryReconciler) recordReadiness(repository sourcev1.GitRepository, deleted bool) {
346+
if r.MetricsRecorder == nil {
347+
return
348+
}
349+
350+
objRef, err := reference.GetReference(r.Scheme, &repository)
351+
if err != nil {
352+
r.Log.WithValues(
353+
strings.ToLower(repository.Kind),
354+
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
355+
).Error(err, "unable to record readiness metric")
356+
return
357+
}
358+
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
359+
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
360+
} else {
361+
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
362+
Type: meta.ReadyCondition,
363+
Status: corev1.ConditionUnknown,
364+
}, deleted)
365+
}
366+
}

controllers/helmchart_controller.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"sigs.k8s.io/controller-runtime/pkg/controller"
4242

4343
"github.com/fluxcd/pkg/runtime/events"
44+
"github.com/fluxcd/pkg/runtime/metrics"
4445
"github.com/fluxcd/pkg/runtime/predicates"
4546
"github.com/fluxcd/pkg/untar"
4647

@@ -57,6 +58,7 @@ type HelmChartReconciler struct {
5758
Getters getter.Providers
5859
EventRecorder kuberecorder.EventRecorder
5960
ExternalEventRecorder *events.Recorder
61+
MetricsRecorder *metrics.Recorder
6062
}
6163

6264
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
@@ -95,6 +97,8 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
9597
// Return the error so we retry the failed garbage collection
9698
return ctrl.Result{}, err
9799
}
100+
// Record deleted status
101+
r.recordReadiness(chart, true)
98102
// Remove our finalizer from the list and update it
99103
chart.ObjectMeta.Finalizers = removeString(chart.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
100104
if err := r.Update(ctx, &chart); err != nil {
@@ -105,6 +109,15 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
105109
}
106110
}
107111

112+
// record reconciliation duration
113+
if r.MetricsRecorder != nil {
114+
objRef, err := reference.GetReference(r.Scheme, &chart)
115+
if err != nil {
116+
return ctrl.Result{}, err
117+
}
118+
defer r.MetricsRecorder.RecordDuration(*objRef, start)
119+
}
120+
108121
// Conditionally set progressing condition in status
109122
resetChart, changed := r.resetStatus(chart)
110123
if changed {
@@ -113,6 +126,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
113126
log.Error(err, "unable to update status")
114127
return ctrl.Result{Requeue: true}, err
115128
}
129+
r.recordReadiness(chart, false)
116130
}
117131

118132
// Purge all but current artifact from storage
@@ -138,6 +152,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
138152
if err := r.Status().Update(ctx, &chart); err != nil {
139153
log.Error(err, "unable to update status")
140154
}
155+
r.recordReadiness(chart, false)
141156
return ctrl.Result{Requeue: true}, err
142157
}
143158

@@ -164,13 +179,15 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
164179
// If reconciliation failed, record the failure and requeue immediately
165180
if reconcileErr != nil {
166181
r.event(reconciledChart, events.EventSeverityError, reconcileErr.Error())
182+
r.recordReadiness(reconciledChart, false)
167183
return ctrl.Result{Requeue: true}, reconcileErr
168184
}
169185

170186
// Emit an event if we did not have an artifact before, or the revision has changed
171187
if chart.Status.Artifact == nil || reconciledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision {
172188
r.event(reconciledChart, events.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart))
173189
}
190+
r.recordReadiness(reconciledChart, false)
174191

175192
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
176193
time.Now().Sub(start).String(),
@@ -527,3 +544,26 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri
527544
}
528545
}
529546
}
547+
548+
func (r *HelmChartReconciler) recordReadiness(chart sourcev1.HelmChart, deleted bool) {
549+
if r.MetricsRecorder == nil {
550+
return
551+
}
552+
553+
objRef, err := reference.GetReference(r.Scheme, &chart)
554+
if err != nil {
555+
r.Log.WithValues(
556+
strings.ToLower(chart.Kind),
557+
fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()),
558+
).Error(err, "unable to record readiness metric")
559+
return
560+
}
561+
if rc := meta.GetCondition(chart.Status.Conditions, meta.ReadyCondition); rc != nil {
562+
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
563+
} else {
564+
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
565+
Type: meta.ReadyCondition,
566+
Status: corev1.ConditionUnknown,
567+
}, deleted)
568+
}
569+
}

controllers/helmrepository_controller.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"sigs.k8s.io/yaml"
3939

4040
"github.com/fluxcd/pkg/runtime/events"
41+
"github.com/fluxcd/pkg/runtime/metrics"
4142
"github.com/fluxcd/pkg/runtime/predicates"
4243

4344
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@@ -53,6 +54,7 @@ type HelmRepositoryReconciler struct {
5354
Getters getter.Providers
5455
EventRecorder kuberecorder.EventRecorder
5556
ExternalEventRecorder *events.Recorder
57+
MetricsRecorder *metrics.Recorder
5658
}
5759

5860
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@@ -92,6 +94,8 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
9294
// Return the error so we retry the failed garbage collection
9395
return ctrl.Result{}, err
9496
}
97+
// Record deleted status
98+
r.recordReadiness(repository, true)
9599
// Remove our finalizer from the list and update it
96100
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
97101
if err := r.Update(ctx, &repository); err != nil {
@@ -102,13 +106,23 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
102106
}
103107
}
104108

109+
// record reconciliation duration
110+
if r.MetricsRecorder != nil {
111+
objRef, err := reference.GetReference(r.Scheme, &repository)
112+
if err != nil {
113+
return ctrl.Result{}, err
114+
}
115+
defer r.MetricsRecorder.RecordDuration(*objRef, start)
116+
}
117+
105118
// set initial status
106119
if resetRepository, ok := r.resetStatus(repository); ok {
107120
repository = resetRepository
108121
if err := r.Status().Update(ctx, &repository); err != nil {
109122
log.Error(err, "unable to update status")
110123
return ctrl.Result{Requeue: true}, err
111124
}
125+
r.recordReadiness(repository, false)
112126
}
113127

114128
// purge old artifacts from storage
@@ -128,13 +142,15 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
128142
// if reconciliation failed, record the failure and requeue immediately
129143
if reconcileErr != nil {
130144
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
145+
r.recordReadiness(reconciledRepository, false)
131146
return ctrl.Result{Requeue: true}, reconcileErr
132147
}
133148

134149
// emit revision change event
135150
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
136151
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
137152
}
153+
r.recordReadiness(reconciledRepository, false)
138154

139155
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
140156
time.Now().Sub(start).String(),
@@ -299,3 +315,26 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev
299315
}
300316
}
301317
}
318+
319+
func (r *HelmRepositoryReconciler) recordReadiness(repository sourcev1.HelmRepository, deleted bool) {
320+
if r.MetricsRecorder == nil {
321+
return
322+
}
323+
324+
objRef, err := reference.GetReference(r.Scheme, &repository)
325+
if err != nil {
326+
r.Log.WithValues(
327+
strings.ToLower(repository.Kind),
328+
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
329+
).Error(err, "unable to record readiness metric")
330+
return
331+
}
332+
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
333+
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
334+
} else {
335+
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
336+
Type: meta.ReadyCondition,
337+
Status: corev1.ConditionUnknown,
338+
}, deleted)
339+
}
340+
}

0 commit comments

Comments
 (0)