@@ -104,7 +104,7 @@ type BucketReconcilerOptions struct {
104
104
105
105
// bucketReconcilerFunc is the function type for all the bucket reconciler
106
106
// functions.
107
- type bucketReconcilerFunc func (ctx context.Context , obj * sourcev1.Bucket , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error )
107
+ type bucketReconcilerFunc func (ctx context.Context , obj * sourcev1.Bucket , index etagIndex , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error )
108
108
109
109
func (r * BucketReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
110
110
return r .SetupWithManagerAndOptions (mgr , BucketReconcilerOptions {})
@@ -199,6 +199,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
199
199
conditions .MarkReconciling (obj , "NewGeneration" , "reconciling new object generation (%d)" , obj .Generation )
200
200
}
201
201
202
+ index := make (etagIndex )
202
203
var artifact sourcev1.Artifact
203
204
204
205
// Create temp working dir
@@ -215,7 +216,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
215
216
var res sreconcile.Result
216
217
var resErr error
217
218
for _ , rec := range reconcilers {
218
- recResult , err := rec (ctx , obj , & artifact , tmpDir )
219
+ recResult , err := rec (ctx , obj , index , & artifact , tmpDir )
219
220
// Exit immediately on ResultRequeue.
220
221
if recResult == sreconcile .ResultRequeue {
221
222
return sreconcile .ResultRequeue , nil
@@ -238,7 +239,8 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
238
239
// All artifacts for the resource except for the current one are garbage collected from the storage.
239
240
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
240
241
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
241
- func (r * BucketReconciler ) reconcileStorage (ctx context.Context , obj * sourcev1.Bucket , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
242
+ func (r * BucketReconciler ) reconcileStorage (ctx context.Context ,
243
+ obj * sourcev1.Bucket , _ etagIndex , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
242
244
// Garbage collect previous advertised artifact(s) from storage
243
245
_ = r .garbageCollect (ctx , obj )
244
246
@@ -266,7 +268,8 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B
266
268
// result.
267
269
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
268
270
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
269
- func (r * BucketReconciler ) reconcileSource (ctx context.Context , obj * sourcev1.Bucket , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
271
+ func (r * BucketReconciler ) reconcileSource (ctx context.Context ,
272
+ obj * sourcev1.Bucket , index etagIndex , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
270
273
var secret * corev1.Secret
271
274
if obj .Spec .SecretRef != nil {
272
275
secretName := types.NamespacedName {
@@ -287,9 +290,9 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
287
290
288
291
switch obj .Spec .Provider {
289
292
case sourcev1 .GoogleBucketProvider :
290
- return r .reconcileGCPSource (ctx , obj , artifact , secret , dir )
293
+ return r .reconcileGCPSource (ctx , obj , index , artifact , secret , dir )
291
294
default :
292
- return r .reconcileMinioSource (ctx , obj , artifact , secret , dir )
295
+ return r .reconcileMinioSource (ctx , obj , index , artifact , secret , dir )
293
296
}
294
297
}
295
298
@@ -302,8 +305,8 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
302
305
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
303
306
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
304
307
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
305
- func (r * BucketReconciler ) reconcileMinioSource (ctx context.Context , obj * sourcev1. Bucket , artifact * sourcev1. Artifact ,
306
- secret * corev1.Secret , dir string ) (sreconcile.Result , error ) {
308
+ func (r * BucketReconciler ) reconcileMinioSource (ctx context.Context ,
309
+ obj * sourcev1. Bucket , index etagIndex , artifact * sourcev1. Artifact , secret * corev1.Secret , dir string ) (sreconcile.Result , error ) {
307
310
// Build the client with the configuration from the object and secret
308
311
s3Client , err := r .buildMinioClient (obj , secret )
309
312
if err != nil {
@@ -367,7 +370,6 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
367
370
// Build up an index of object keys and their etags
368
371
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
369
372
// detect both structural and file changes
370
- var index = make (etagIndex )
371
373
for object := range s3Client .ListObjects (ctxTimeout , obj .Spec .BucketName , minio.ListObjectsOptions {
372
374
Recursive : true ,
373
375
UseV1 : s3utils .IsGoogleEndpoint (* s3Client .EndpointURL ()),
@@ -438,8 +440,6 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
438
440
conditions .MarkTrue (obj , sourcev1 .FetchFailedCondition , sourcev1 .BucketOperationFailedReason , e .Err .Error ())
439
441
return sreconcile .ResultEmpty , e
440
442
}
441
- r .eventLogf (ctx , obj , events .EventTypeTrace , sourcev1 .BucketOperationSucceededReason ,
442
- "fetched %d files with revision '%s' from '%s'" , len (index ), revision , obj .Spec .BucketName )
443
443
}
444
444
conditions .Delete (obj , sourcev1 .FetchFailedCondition )
445
445
@@ -457,8 +457,8 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
457
457
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
458
458
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
459
459
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
460
- func (r * BucketReconciler ) reconcileGCPSource (ctx context.Context , obj * sourcev1. Bucket , artifact * sourcev1. Artifact ,
461
- secret * corev1.Secret , dir string ) (sreconcile.Result , error ) {
460
+ func (r * BucketReconciler ) reconcileGCPSource (ctx context.Context ,
461
+ obj * sourcev1. Bucket , index etagIndex , artifact * sourcev1. Artifact , secret * corev1.Secret , dir string ) (sreconcile.Result , error ) {
462
462
gcpClient , err := r .buildGCPClient (ctx , secret )
463
463
if err != nil {
464
464
e := & serror.Event {
@@ -522,7 +522,6 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
522
522
// Build up an index of object keys and their etags
523
523
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
524
524
// detect both structural and file changes
525
- var index = make (etagIndex )
526
525
objects := gcpClient .ListObjects (ctxTimeout , obj .Spec .BucketName , nil )
527
526
for {
528
527
object , err := objects .Next ()
@@ -593,8 +592,6 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
593
592
conditions .MarkTrue (obj , sourcev1 .FetchFailedCondition , sourcev1 .BucketOperationFailedReason , e .Err .Error ())
594
593
return sreconcile .ResultEmpty , e
595
594
}
596
- r .eventLogf (ctx , obj , events .EventTypeTrace , sourcev1 .BucketOperationSucceededReason ,
597
- "fetched %d files from bucket '%s'" , len (index ), obj .Spec .BucketName )
598
595
}
599
596
conditions .Delete (obj , sourcev1 .FetchFailedCondition )
600
597
@@ -610,7 +607,8 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
610
607
// If the given artifact does not differ from the object's current, it returns early.
611
608
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
612
609
// updated to its path.
613
- func (r * BucketReconciler ) reconcileArtifact (ctx context.Context , obj * sourcev1.Bucket , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
610
+ func (r * BucketReconciler ) reconcileArtifact (ctx context.Context ,
611
+ obj * sourcev1.Bucket , index etagIndex , artifact * sourcev1.Artifact , dir string ) (sreconcile.Result , error ) {
614
612
// Always restore the Ready condition in case it got removed due to a transient error
615
613
defer func () {
616
614
if obj .GetArtifact ().HasRevision (artifact .Revision ) {
@@ -666,10 +664,10 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
666
664
Reason : sourcev1 .StorageOperationFailedReason ,
667
665
}
668
666
}
669
- r .AnnotatedEventf ( obj , map [string ]string {
667
+ r .annotatedEventLogf ( ctx , obj , map [string ]string {
670
668
"revision" : artifact .Revision ,
671
669
"checksum" : artifact .Checksum ,
672
- }, corev1 .EventTypeNormal , "NewArtifact" , "stored artifact for revision '%s'" , artifact . Revision )
670
+ }, corev1 .EventTypeNormal , "NewArtifact" , "fetched %d files from '%s'" , len ( index ), obj . Spec . BucketName )
673
671
674
672
// Record it on the object
675
673
obj .Status .Artifact = artifact .DeepCopy ()
@@ -803,16 +801,23 @@ func (i etagIndex) Revision() (string, error) {
803
801
return fmt .Sprintf ("%x" , sum .Sum (nil )), nil
804
802
}
805
803
806
- // eventLog records event and logs at the same time. This log is different from
807
- // the debug log in the event recorder in the sense that this is a simple log,
808
- // the event recorder debug log contains complete details about the event.
804
+ // eventLogf records event and logs at the same time.
809
805
func (r * BucketReconciler ) eventLogf (ctx context.Context , obj runtime.Object , eventType string , reason string , messageFmt string , args ... interface {}) {
806
+ r .annotatedEventLogf (ctx , obj , nil , eventType , reason , messageFmt , args ... )
807
+ }
808
+
809
+ // annotatedEventLogf records annotated event and logs at the same time. This
810
+ // log is different from the debug log in the event recorder in the sense that
811
+ // this is a simple log, the event recorder debug log contains complete details
812
+ // about the event.
813
+ func (r * BucketReconciler ) annotatedEventLogf (ctx context.Context ,
814
+ obj runtime.Object , annotations map [string ]string , eventType string , reason string , messageFmt string , args ... interface {}) {
810
815
msg := fmt .Sprintf (messageFmt , args ... )
811
816
// Log and emit event.
812
817
if eventType == corev1 .EventTypeWarning {
813
818
ctrl .LoggerFrom (ctx ).Error (errors .New (reason ), msg )
814
819
} else {
815
820
ctrl .LoggerFrom (ctx ).Info (msg )
816
821
}
817
- r .Eventf (obj , eventType , reason , msg )
822
+ r .AnnotatedEventf (obj , annotations , eventType , reason , msg )
818
823
}
0 commit comments