@@ -20,31 +20,33 @@ import (
2020 "fmt"
2121 "strings"
2222
23+ "google.golang.org/grpc/codes"
24+ "google.golang.org/grpc/status"
25+ v1 "k8s.io/api/core/v1"
26+ kubeerrors "k8s.io/apimachinery/pkg/api/errors"
2327 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24- utilversion "k8s.io/apimachinery/pkg/util/version "
28+ "k8s.io/apimachinery/pkg/runtime "
2529 kube "k8s.io/client-go/kubernetes"
30+ "k8s.io/client-go/tools/record"
2631 "k8s.io/klog/v2"
27-
2832 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
2933 buckets "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
3034 bucketapi "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"
35+ "sigs.k8s.io/container-object-storage-interface-api/controller/events"
3136 "sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/consts"
3237 cosi "sigs.k8s.io/container-object-storage-interface-spec"
3338 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34-
35- "github.com/pkg/errors"
36- "google.golang.org/grpc/codes"
37- "google.golang.org/grpc/status"
3839)
3940
4041// BucketListener manages Bucket objects
4142type BucketListener struct {
4243 provisionerClient cosi.ProvisionerClient
4344 driverName string
4445
46+ eventRecorder record.EventRecorder
47+
4548 kubeClient kube.Interface
4649 bucketClient buckets.Interface
47- kubeVersion * utilversion.Version
4850}
4951
5052// NewBucketListener returns a resource handler for Bucket objects
@@ -58,9 +60,10 @@ func NewBucketListener(driverName string, client cosi.ProvisionerClient) *Bucket
5860}
5961
6062// Add attempts to create a bucket for a given bucket. This function must be idempotent
63+ //
6164// Return values
62- // nil - Bucket successfully provisioned
63- // non-nil err - Internal error [requeue'd with exponential backoff]
65+ // - nil - Bucket successfully provisioned
66+ // - non-nil err - Internal error [requeue'd with exponential backoff]
6467func (b * BucketListener ) Add (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
6568 bucket := inputBucket .DeepCopy ()
6669
@@ -70,9 +73,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
7073 "name" , bucket .ObjectMeta .Name )
7174
7275 if bucket .Spec .BucketClassName == "" {
73- err = errors .New (fmt .Sprintf ("BucketClassName not defined for bucket %s" , bucket .ObjectMeta .Name ))
74- klog .V (3 ).ErrorS (err , "BucketClassName not defined" )
75- return err
76+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , fmt .Errorf ("%w for Bucket %v" , consts .ErrUndefinedBucketClassName , bucket .Name ))
7677 }
7778
7879 if ! strings .EqualFold (bucket .Spec .DriverName , b .driverName ) {
@@ -100,15 +101,17 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
100101 bucketID = bucket .Spec .ExistingBucketID
101102 if bucket .Spec .Parameters == nil {
102103 bucketClass , err := b .bucketClasses ().Get (ctx , bucket .Spec .BucketClassName , metav1.GetOptions {})
103- if err != nil {
104+ if kubeerrors .IsNotFound (err ) {
105+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , err )
106+ } else if err != nil {
104107 klog .V (3 ).ErrorS (err , "Error fetching bucketClass" ,
105108 "bucketClass" , bucket .Spec .BucketClassName ,
106109 "bucket" , bucket .ObjectMeta .Name )
107- return err
110+ return b . recordError ( inputBucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
108111 }
109112
110113 if bucketClass .Parameters != nil {
111- var param map [string ]string
114+ param := make ( map [string ]string )
112115 for k , v := range bucketClass .Parameters {
113116 param [k ] = v
114117 }
@@ -125,18 +128,15 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
125128 rsp , err := b .provisionerClient .DriverCreateBucket (ctx , req )
126129 if err != nil {
127130 if status .Code (err ) != codes .AlreadyExists {
128- klog .V (3 ).ErrorS (err , "Driver failed to create bucket" ,
129- "bucket" , bucket .ObjectMeta .Name )
130- return errors .Wrap (err , "Failed to create bucket" )
131+ return b .recordError (inputBucket , v1 .EventTypeWarning , events .FailedCreateBucket , fmt .Errorf ("failed to create bucket: %w" , err ))
131132 }
132-
133133 }
134134
135135 if rsp == nil {
136- err = errors . New ( fmt . Sprintf ( "DriverCreateBucket returned a nil response for bucket: %s" , bucket . ObjectMeta . Name ))
136+ err = consts . ErrInternal
137137 klog .V (3 ).ErrorS (err , "Internal Error from driver" ,
138138 "bucket" , bucket .ObjectMeta .Name )
139- return err
139+ return fmt . Errorf ( "%w for Bucket %v" , err , bucket . Name )
140140 }
141141
142142 if rsp .BucketId != "" {
@@ -145,8 +145,7 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
145145 } else {
146146 klog .V (3 ).ErrorS (err , "DriverCreateBucket returned an empty bucketID" ,
147147 "bucket" , bucket .ObjectMeta .Name )
148- err = errors .New (fmt .Sprintf ("DriverCreateBucket returned an empty bucketID for bucket: %s" , bucket .ObjectMeta .Name ))
149- return err
148+ return fmt .Errorf ("%w for Bucket %v" , consts .ErrEmptyBucketID , bucket .Name )
150149 }
151150
152151 // Now we update the BucketReady status of BucketClaim
@@ -157,15 +156,15 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
157156 klog .V (3 ).ErrorS (err , "Failed to get bucketClaim" ,
158157 "bucketClaim" , ref .Name ,
159158 "bucket" , bucket .ObjectMeta .Name )
160- return err
159+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
161160 }
162161
163162 bucketClaim .Status .BucketReady = true
164163 if _ , err = b .bucketClaims (bucketClaim .Namespace ).UpdateStatus (ctx , bucketClaim , metav1.UpdateOptions {}); err != nil {
165164 klog .V (3 ).ErrorS (err , "Failed to update bucketClaim" ,
166165 "bucketClaim" , ref .Name ,
167166 "bucket" , bucket .ObjectMeta .Name )
168- return err
167+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedCreateBucket , err )
169168 }
170169
171170 klog .V (5 ).Infof ("Successfully updated status of bucketClaim: %s, bucket: %s" , bucketClaim .ObjectMeta .Name , bucket .ObjectMeta .Name )
@@ -175,7 +174,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
175174 controllerutil .AddFinalizer (bucket , consts .BucketFinalizer )
176175 if bucket , err = b .buckets ().Update (ctx , bucket , metav1.UpdateOptions {}); err != nil {
177176 klog .V (3 ).ErrorS (err , "Failed to update bucket finalizers" , "bucket" , bucket .ObjectMeta .Name )
178- return errors .Wrap (err , "Failed to update bucket finalizers" )
177+ return b .recordError (bucket , v1 .EventTypeWarning , events .FailedCreateBucket ,
178+ fmt .Errorf ("failed to update bucket finalizers: %w" , err ))
179179 }
180180
181181 klog .V (5 ).Infof ("Successfully added finalizer to bucket: %s" , bucket .ObjectMeta .Name )
@@ -188,7 +188,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
188188 if _ , err = b .buckets ().UpdateStatus (ctx , bucket , metav1.UpdateOptions {}); err != nil {
189189 klog .V (3 ).ErrorS (err , "Failed to update bucket status" ,
190190 "bucket" , bucket .ObjectMeta .Name )
191- return errors .Wrap (err , "Failed to update bucket status" )
191+ return b .recordError (bucket , v1 .EventTypeWarning , events .FailedCreateBucket ,
192+ fmt .Errorf ("failed to update bucket status: %w" , err ))
192193 }
193194
194195 klog .V (3 ).InfoS ("Add Bucket success" ,
@@ -201,8 +202,8 @@ func (b *BucketListener) Add(ctx context.Context, inputBucket *v1alpha1.Bucket)
201202
202203// Update attempts to reconcile changes to a given bucket. This function must be idempotent
203204// Return values
204- // nil - Bucket successfully reconciled
205- // non-nil err - Internal error [requeue'd with exponential backoff]
205+ // - nil - Bucket successfully reconciled
206+ // - non-nil err - Internal error [requeue'd with exponential backoff]
206207func (b * BucketListener ) Update (ctx context.Context , old , new * v1alpha1.Bucket ) error {
207208 klog .V (3 ).InfoS ("Update Bucket" ,
208209 "name" , old .Name )
@@ -216,6 +217,11 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
216217 bucketClaimNs := bucket .Spec .BucketClaim .Namespace
217218 bucketClaimName := bucket .Spec .BucketClaim .Name
218219 bucketAccessList , err := b .bucketAccesses (bucketClaimNs ).List (ctx , metav1.ListOptions {})
220+ if err != nil {
221+ klog .V (3 ).ErrorS (err , "Error fetching BucketAccessList" ,
222+ "bucket" , bucket .ObjectMeta .Name )
223+ return err
224+ }
219225
220226 for _ , bucketAccess := range bucketAccessList .Items {
221227 if strings .EqualFold (bucketAccess .Spec .BucketClaimName , bucketClaimName ) {
@@ -238,7 +244,7 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
238244 if controllerutil .ContainsFinalizer (bucket , consts .BucketFinalizer ) {
239245 err = b .deleteBucketOp (ctx , bucket )
240246 if err != nil {
241- return err
247+ return b . recordError ( bucket , v1 . EventTypeWarning , events . FailedDeleteBucket , err )
242248 }
243249
244250 controllerutil .RemoveFinalizer (bucket , consts .BucketFinalizer )
@@ -263,8 +269,8 @@ func (b *BucketListener) Update(ctx context.Context, old, new *v1alpha1.Bucket)
263269// Delete function is called when the bucket was not able to add finalizers while creation.
264270// Hence we will take care of removing the BucketClaim finalizer before deleting the Bucket object.
265271// Return values
266- // nil - Bucket successfully deleted
267- // non-nil err - Internal error [requeue'd with exponential backoff]
272+ // - nil - Bucket successfully deleted
273+ // - non-nil err - Internal error [requeue'd with exponential backoff]
268274func (b * BucketListener ) Delete (ctx context.Context , inputBucket * v1alpha1.Bucket ) error {
269275 klog .V (3 ).InfoS ("Delete Bucket" ,
270276 "name" , inputBucket .ObjectMeta .Name ,
@@ -294,26 +300,23 @@ func (b *BucketListener) Delete(ctx context.Context, inputBucket *v1alpha1.Bucke
294300 }
295301
296302 return nil
297-
298303}
299304
300305// InitializeKubeClient initializes the kubernetes client
301306func (b * BucketListener ) InitializeKubeClient (k kube.Interface ) {
302307 b .kubeClient = k
303-
304- serverVersion , err := k .Discovery ().ServerVersion ()
305- if err != nil {
306- klog .V (3 ).ErrorS (err , "Cannot determine server version" )
307- } else {
308- b .kubeVersion = utilversion .MustParseSemantic (serverVersion .GitVersion )
309- }
310308}
311309
312310// InitializeBucketClient initializes the object storage bucket client
313311func (b * BucketListener ) InitializeBucketClient (bc buckets.Interface ) {
314312 b .bucketClient = bc
315313}
316314
315+ // InitializeEventRecorder initializes the event recorder
316+ func (b * BucketListener ) InitializeEventRecorder (er record.EventRecorder ) {
317+ b .eventRecorder = er
318+ }
319+
317320func (b * BucketListener ) deleteBucketOp (ctx context.Context , bucket * v1alpha1.Bucket ) error {
318321 if ! strings .EqualFold (bucket .Spec .DriverName , b .driverName ) {
319322 klog .V (5 ).InfoS ("Skipping bucket for provisioner" ,
@@ -333,10 +336,7 @@ func (b *BucketListener) deleteBucketOp(ctx context.Context, bucket *v1alpha1.Bu
333336
334337 if _ , err := b .provisionerClient .DriverDeleteBucket (ctx , req ); err != nil {
335338 if status .Code (err ) != codes .NotFound {
336- klog .V (3 ).ErrorS (err , "Failed to delete bucket" ,
337- "bucket" , bucket .ObjectMeta .Name ,
338- )
339- return err
339+ return fmt .Errorf ("failed to delete bucket: %w" , err )
340340 }
341341 }
342342
@@ -396,3 +396,21 @@ func (b *BucketListener) bucketAccesses(namespace string) bucketapi.BucketAccess
396396 }
397397 panic ("uninitialized listener" )
398398}
399+
400+ // recordError during the processing of the objects
401+ func (b * BucketListener ) recordError (subject runtime.Object , eventtype , reason string , err error ) error {
402+ if b .eventRecorder == nil {
403+ return err
404+ }
405+ b .eventRecorder .Event (subject , eventtype , reason , err .Error ())
406+
407+ return err
408+ }
409+
410+ // recordEvent during the processing of the objects
411+ func (b * BucketListener ) recordEvent (subject runtime.Object , eventtype , reason , message string , args ... any ) {
412+ if b .eventRecorder == nil {
413+ return
414+ }
415+ b .eventRecorder .Event (subject , eventtype , reason , fmt .Sprintf (message , args ... ))
416+ }
0 commit comments