@@ -26,7 +26,6 @@ import (
2626 "strings"
2727 "time"
2828
29- "github.com/fluxcd/pkg/apis/meta"
3029 "github.com/go-logr/logr"
3130 "github.com/minio/minio-go/v7"
3231 "github.com/minio/minio-go/v7/pkg/credentials"
@@ -42,41 +41,56 @@ import (
4241 "sigs.k8s.io/controller-runtime/pkg/client"
4342 "sigs.k8s.io/controller-runtime/pkg/controller"
4443 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
44+ "sigs.k8s.io/controller-runtime/pkg/predicate"
4545
46+ "github.com/fluxcd/pkg/apis/meta"
4647 "github.com/fluxcd/pkg/runtime/events"
4748 "github.com/fluxcd/pkg/runtime/metrics"
4849 "github.com/fluxcd/pkg/runtime/predicates"
4950
5051 sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
5152)
5253
54+ // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
55+ // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
56+ // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
57+ // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
58+
5359// BucketReconciler reconciles a Bucket object
5460type BucketReconciler struct {
5561 client.Client
56- Log logr.Logger
5762 Scheme * runtime.Scheme
5863 Storage * Storage
5964 EventRecorder kuberecorder.EventRecorder
6065 ExternalEventRecorder * events.Recorder
6166 MetricsRecorder * metrics.Recorder
6267}
6368
64- // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
65- // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
66- // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
67- // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
69+ type BucketReconcilerOptions struct {
70+ MaxConcurrentReconciles int
71+ }
72+
73+ func (r * BucketReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
74+ return r .SetupWithManagerAndOptions (mgr , BucketReconcilerOptions {})
75+ }
76+
77+ func (r * BucketReconciler ) SetupWithManagerAndOptions (mgr ctrl.Manager , opts BucketReconcilerOptions ) error {
78+ return ctrl .NewControllerManagedBy (mgr ).
79+ For (& sourcev1.Bucket {}).
80+ WithEventFilter (predicate .Or (predicate.GenerationChangedPredicate {}, predicates.ReconcilateAtChangedPredicate {})).
81+ WithOptions (controller.Options {MaxConcurrentReconciles : opts .MaxConcurrentReconciles }).
82+ Complete (r )
83+ }
6884
69- func (r * BucketReconciler ) Reconcile (req ctrl.Request ) (ctrl.Result , error ) {
70- ctx := context .Background ()
85+ func (r * BucketReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
7186 start := time .Now ()
87+ log := ctrl .LoggerFrom (ctx )
7288
7389 var bucket sourcev1.Bucket
7490 if err := r .Get (ctx , req .NamespacedName , & bucket ); err != nil {
7591 return ctrl.Result {}, client .IgnoreNotFound (err )
7692 }
7793
78- log := r .Log .WithValues ("controller" , strings .ToLower (sourcev1 .BucketKind ), "request" , req .NamespacedName )
79-
8094 // Add our finalizer if it does not exist
8195 if ! controllerutil .ContainsFinalizer (& bucket , sourcev1 .SourceFinalizer ) {
8296 controllerutil .AddFinalizer (& bucket , sourcev1 .SourceFinalizer )
@@ -113,7 +127,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
113127 log .Error (err , "unable to update status" )
114128 return ctrl.Result {Requeue : true }, err
115129 }
116- r .recordReadiness (bucket )
130+ r .recordReadiness (ctx , bucket )
117131 }
118132
119133 // record the value of the reconciliation request, if any
@@ -139,16 +153,16 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
139153
140154 // if reconciliation failed, record the failure and requeue immediately
141155 if reconcileErr != nil {
142- r .event (reconciledBucket , events .EventSeverityError , reconcileErr .Error ())
143- r .recordReadiness (reconciledBucket )
156+ r .event (ctx , reconciledBucket , events .EventSeverityError , reconcileErr .Error ())
157+ r .recordReadiness (ctx , reconciledBucket )
144158 return ctrl.Result {Requeue : true }, reconcileErr
145159 }
146160
147161 // emit revision change event
148162 if bucket .Status .Artifact == nil || reconciledBucket .Status .Artifact .Revision != bucket .Status .Artifact .Revision {
149- r .event (reconciledBucket , events .EventSeverityInfo , sourcev1 .BucketReadyMessage (reconciledBucket ))
163+ r .event (ctx , reconciledBucket , events .EventSeverityInfo , sourcev1 .BucketReadyMessage (reconciledBucket ))
150164 }
151- r .recordReadiness (reconciledBucket )
165+ r .recordReadiness (ctx , reconciledBucket )
152166
153167 log .Info (fmt .Sprintf ("Reconciliation finished in %s, next run in %s" ,
154168 time .Now ().Sub (start ).String (),
@@ -158,22 +172,6 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
158172 return ctrl.Result {RequeueAfter : bucket .GetInterval ().Duration }, nil
159173}
160174
161- type BucketReconcilerOptions struct {
162- MaxConcurrentReconciles int
163- }
164-
165- func (r * BucketReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
166- return r .SetupWithManagerAndOptions (mgr , BucketReconcilerOptions {})
167- }
168-
169- func (r * BucketReconciler ) SetupWithManagerAndOptions (mgr ctrl.Manager , opts BucketReconcilerOptions ) error {
170- return ctrl .NewControllerManagedBy (mgr ).
171- For (& sourcev1.Bucket {}).
172- WithEventFilter (predicates.ChangePredicate {}).
173- WithOptions (controller.Options {MaxConcurrentReconciles : opts .MaxConcurrentReconciles }).
174- Complete (r )
175- }
176-
177175func (r * BucketReconciler ) reconcile (ctx context.Context , bucket sourcev1.Bucket ) (sourcev1.Bucket , error ) {
178176 s3Client , err := r .auth (ctx , bucket )
179177 if err != nil {
@@ -272,13 +270,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket
272270
273271func (r * BucketReconciler ) reconcileDelete (ctx context.Context , bucket sourcev1.Bucket ) (ctrl.Result , error ) {
274272 if err := r .gc (bucket ); err != nil {
275- r .event (bucket , events .EventSeverityError , fmt .Sprintf ("garbage collection for deleted resource failed: %s" , err .Error ()))
273+ r .event (ctx , bucket , events .EventSeverityError ,
274+ fmt .Sprintf ("garbage collection for deleted resource failed: %s" , err .Error ()))
276275 // Return the error so we retry the failed garbage collection
277276 return ctrl.Result {}, err
278277 }
279278
280279 // Record deleted status
281- r .recordReadiness (bucket )
280+ r .recordReadiness (ctx , bucket )
282281
283282 // Remove our finalizer from the list and update it
284283 controllerutil .RemoveFinalizer (& bucket , sourcev1 .SourceFinalizer )
@@ -383,41 +382,33 @@ func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
383382}
384383
385384// event emits a Kubernetes event and forwards the event to notification controller if configured
386- func (r * BucketReconciler ) event (bucket sourcev1.Bucket , severity , msg string ) {
385+ func (r * BucketReconciler ) event (ctx context.Context , bucket sourcev1.Bucket , severity , msg string ) {
386+ log := logr .FromContext (ctx )
387387 if r .EventRecorder != nil {
388388 r .EventRecorder .Eventf (& bucket , "Normal" , severity , msg )
389389 }
390390 if r .ExternalEventRecorder != nil {
391391 objRef , err := reference .GetReference (r .Scheme , & bucket )
392392 if err != nil {
393- r .Log .WithValues (
394- "request" ,
395- fmt .Sprintf ("%s/%s" , bucket .GetNamespace (), bucket .GetName ()),
396- ).Error (err , "unable to send event" )
393+ log .Error (err , "unable to send event" )
397394 return
398395 }
399396
400397 if err := r .ExternalEventRecorder .Eventf (* objRef , nil , severity , severity , msg ); err != nil {
401- r .Log .WithValues (
402- "request" ,
403- fmt .Sprintf ("%s/%s" , bucket .GetNamespace (), bucket .GetName ()),
404- ).Error (err , "unable to send event" )
398+ log .Error (err , "unable to send event" )
405399 return
406400 }
407401 }
408402}
409403
410- func (r * BucketReconciler ) recordReadiness (bucket sourcev1.Bucket ) {
404+ func (r * BucketReconciler ) recordReadiness (ctx context.Context , bucket sourcev1.Bucket ) {
405+ log := logr .FromContext (ctx )
411406 if r .MetricsRecorder == nil {
412407 return
413408 }
414-
415409 objRef , err := reference .GetReference (r .Scheme , & bucket )
416410 if err != nil {
417- r .Log .WithValues (
418- strings .ToLower (bucket .Kind ),
419- fmt .Sprintf ("%s/%s" , bucket .GetNamespace (), bucket .GetName ()),
420- ).Error (err , "unable to record readiness metric" )
411+ log .Error (err , "unable to record readiness metric" )
421412 return
422413 }
423414 if rc := apimeta .FindStatusCondition (bucket .Status .Conditions , meta .ReadyCondition ); rc != nil {
0 commit comments