@@ -29,6 +29,7 @@ import (
2929 "github.com/minio/minio-go/v7"
3030 "github.com/minio/minio-go/v7/pkg/credentials"
3131 "github.com/minio/minio-go/v7/pkg/s3utils"
32+ "google.golang.org/api/option"
3233 corev1 "k8s.io/api/core/v1"
3334 apimeta "k8s.io/apimachinery/pkg/api/meta"
3435 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -46,6 +47,7 @@ import (
4647 "github.com/fluxcd/pkg/runtime/events"
4748 "github.com/fluxcd/pkg/runtime/metrics"
4849 "github.com/fluxcd/pkg/runtime/predicates"
50+ "github.com/fluxcd/source-controller/pkg/gcp"
4951
5052 sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
5153 "github.com/fluxcd/source-controller/pkg/sourceignore"
@@ -176,77 +178,25 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
176178}
177179
178180func (r * BucketReconciler ) reconcile (ctx context.Context , bucket sourcev1.Bucket ) (sourcev1.Bucket , error ) {
179- s3Client , err := r .auth (ctx , bucket )
180- if err != nil {
181- err = fmt .Errorf ("auth error: %w" , err )
182- return sourcev1 .BucketNotReady (bucket , sourcev1 .AuthenticationFailedReason , err .Error ()), err
183- }
184-
185- // create tmp dir
181+ var err error
182+ var sourceBucket sourcev1.Bucket
186183 tempDir , err := os .MkdirTemp ("" , bucket .Name )
187184 if err != nil {
188185 err = fmt .Errorf ("tmp dir error: %w" , err )
189186 return sourcev1 .BucketNotReady (bucket , sourcev1 .StorageOperationFailedReason , err .Error ()), err
190187 }
191188 defer os .RemoveAll (tempDir )
192-
193- ctxTimeout , cancel := context .WithTimeout (ctx , bucket .Spec .Timeout .Duration )
194- defer cancel ()
195-
196- exists , err := s3Client .BucketExists (ctxTimeout , bucket .Spec .BucketName )
197- if err != nil {
198- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
199- }
200- if ! exists {
201- err = fmt .Errorf ("bucket '%s' not found" , bucket .Spec .BucketName )
202- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
203- }
204-
205- // Look for file with ignore rules first
206- // NB: S3 has flat filepath keys making it impossible to look
207- // for files in "subdirectories" without building up a tree first.
208- path := filepath .Join (tempDir , sourceignore .IgnoreFile )
209- if err := s3Client .FGetObject (ctxTimeout , bucket .Spec .BucketName , sourceignore .IgnoreFile , path , minio.GetObjectOptions {}); err != nil {
210- if resp , ok := err .(minio.ErrorResponse ); ok && resp .Code != "NoSuchKey" {
211- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
212- }
213- }
214- ps , err := sourceignore .ReadIgnoreFile (path , nil )
215- if err != nil {
216- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
217- }
218- // In-spec patterns take precedence
219- if bucket .Spec .Ignore != nil {
220- ps = append (ps , sourceignore .ReadPatterns (strings .NewReader (* bucket .Spec .Ignore ), nil )... )
221- }
222- matcher := sourceignore .NewMatcher (ps )
223-
224- // download bucket content
225- for object := range s3Client .ListObjects (ctxTimeout , bucket .Spec .BucketName , minio.ListObjectsOptions {
226- Recursive : true ,
227- UseV1 : s3utils .IsGoogleEndpoint (* s3Client .EndpointURL ()),
228- }) {
229- if object .Err != nil {
230- err = fmt .Errorf ("listing objects from bucket '%s' failed: %w" , bucket .Spec .BucketName , object .Err )
231- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
232- }
233-
234- if strings .HasSuffix (object .Key , "/" ) || object .Key == sourceignore .IgnoreFile {
235- continue
236- }
237-
238- if matcher .Match (strings .Split (object .Key , "/" ), false ) {
239- continue
189+ if bucket .Spec .Provider == sourcev1 .GoogleBucketProvider {
190+ sourceBucket , err = r .reconcileWithGCP (ctx , bucket , tempDir )
191+ if err != nil {
192+ return sourceBucket , err
240193 }
241-
242- localPath := filepath .Join (tempDir , object .Key )
243- err := s3Client .FGetObject (ctxTimeout , bucket .Spec .BucketName , object .Key , localPath , minio.GetObjectOptions {})
194+ } else {
195+ sourceBucket , err = r .reconcileWithMinio (ctx , bucket , tempDir )
244196 if err != nil {
245- err = fmt .Errorf ("downloading object from bucket '%s' failed: %w" , bucket .Spec .BucketName , err )
246- return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
197+ return sourceBucket , err
247198 }
248199 }
249-
250200 revision , err := r .checksum (tempDir )
251201 if err != nil {
252202 return sourcev1 .BucketNotReady (bucket , sourcev1 .StorageOperationFailedReason , err .Error ()), err
@@ -315,7 +265,177 @@ func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.
315265 return ctrl.Result {}, nil
316266}
317267
318- func (r * BucketReconciler ) auth (ctx context.Context , bucket sourcev1.Bucket ) (* minio.Client , error ) {
268+ // reconcileWithGCP handles getting objects from a Google Cloud Platform bucket
269+ // using a gcp client
270+ func (r * BucketReconciler ) reconcileWithGCP (ctx context.Context , bucket sourcev1.Bucket , tempDir string ) (sourcev1.Bucket , error ) {
271+ log := logr .FromContext (ctx )
272+ gcpClient , err := r .authGCP (ctx , bucket )
273+ if err != nil {
274+ err = fmt .Errorf ("auth error: %w" , err )
275+ return sourcev1 .BucketNotReady (bucket , sourcev1 .AuthenticationFailedReason , err .Error ()), err
276+ }
277+ defer gcpClient .Close (log )
278+
279+ ctxTimeout , cancel := context .WithTimeout (ctx , bucket .Spec .Timeout .Duration )
280+ defer cancel ()
281+
282+ exists , err := gcpClient .BucketExists (ctxTimeout , bucket .Spec .BucketName )
283+ if err != nil {
284+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
285+ }
286+ if ! exists {
287+ err = fmt .Errorf ("bucket '%s' not found" , bucket .Spec .BucketName )
288+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
289+ }
290+
291+ // Look for file with ignore rules first.
292+ path := filepath .Join (tempDir , sourceignore .IgnoreFile )
293+ if err := gcpClient .FGetObject (ctxTimeout , bucket .Spec .BucketName , sourceignore .IgnoreFile , path ); err != nil {
294+ if err == gcp .ErrorObjectDoesNotExist && sourceignore .IgnoreFile != ".sourceignore" {
295+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
296+ }
297+ }
298+ ps , err := sourceignore .ReadIgnoreFile (path , nil )
299+ if err != nil {
300+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
301+ }
302+ // In-spec patterns take precedence
303+ if bucket .Spec .Ignore != nil {
304+ ps = append (ps , sourceignore .ReadPatterns (strings .NewReader (* bucket .Spec .Ignore ), nil )... )
305+ }
306+ matcher := sourceignore .NewMatcher (ps )
307+ objects := gcpClient .ListObjects (ctxTimeout , bucket .Spec .BucketName , nil )
308+ // download bucket content
309+ for {
310+ object , err := objects .Next ()
311+ if err == gcp .IteratorDone {
312+ break
313+ }
314+ if err != nil {
315+ err = fmt .Errorf ("listing objects from bucket '%s' failed: %w" , bucket .Spec .BucketName , err )
316+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
317+ }
318+
319+ if strings .HasSuffix (object .Name , "/" ) || object .Name == sourceignore .IgnoreFile {
320+ continue
321+ }
322+
323+ if matcher .Match (strings .Split (object .Name , "/" ), false ) {
324+ continue
325+ }
326+
327+ localPath := filepath .Join (tempDir , object .Name )
328+ if err = gcpClient .FGetObject (ctxTimeout , bucket .Spec .BucketName , object .Name , localPath ); err != nil {
329+ err = fmt .Errorf ("downloading object from bucket '%s' failed: %w" , bucket .Spec .BucketName , err )
330+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
331+ }
332+ }
333+ return sourcev1.Bucket {}, nil
334+ }
335+
336+ // reconcileWithMinio handles getting objects from an S3 compatible bucket
337+ // using a minio client
338+ func (r * BucketReconciler ) reconcileWithMinio (ctx context.Context , bucket sourcev1.Bucket , tempDir string ) (sourcev1.Bucket , error ) {
339+ s3Client , err := r .authMinio (ctx , bucket )
340+ if err != nil {
341+ err = fmt .Errorf ("auth error: %w" , err )
342+ return sourcev1 .BucketNotReady (bucket , sourcev1 .AuthenticationFailedReason , err .Error ()), err
343+ }
344+
345+ ctxTimeout , cancel := context .WithTimeout (ctx , bucket .Spec .Timeout .Duration )
346+ defer cancel ()
347+
348+ exists , err := s3Client .BucketExists (ctxTimeout , bucket .Spec .BucketName )
349+ if err != nil {
350+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
351+ }
352+ if ! exists {
353+ err = fmt .Errorf ("bucket '%s' not found" , bucket .Spec .BucketName )
354+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
355+ }
356+
357+ // Look for file with ignore rules first
358+ // NB: S3 has flat filepath keys making it impossible to look
359+ // for files in "subdirectories" without building up a tree first.
360+ path := filepath .Join (tempDir , sourceignore .IgnoreFile )
361+ if err := s3Client .FGetObject (ctxTimeout , bucket .Spec .BucketName , sourceignore .IgnoreFile , path , minio.GetObjectOptions {}); err != nil {
362+ if resp , ok := err .(minio.ErrorResponse ); ok && resp .Code != "NoSuchKey" {
363+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
364+ }
365+ }
366+ ps , err := sourceignore .ReadIgnoreFile (path , nil )
367+ if err != nil {
368+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
369+ }
370+ // In-spec patterns take precedence
371+ if bucket .Spec .Ignore != nil {
372+ ps = append (ps , sourceignore .ReadPatterns (strings .NewReader (* bucket .Spec .Ignore ), nil )... )
373+ }
374+ matcher := sourceignore .NewMatcher (ps )
375+
376+ // download bucket content
377+ for object := range s3Client .ListObjects (ctxTimeout , bucket .Spec .BucketName , minio.ListObjectsOptions {
378+ Recursive : true ,
379+ UseV1 : s3utils .IsGoogleEndpoint (* s3Client .EndpointURL ()),
380+ }) {
381+ if object .Err != nil {
382+ err = fmt .Errorf ("listing objects from bucket '%s' failed: %w" , bucket .Spec .BucketName , object .Err )
383+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
384+ }
385+
386+ if strings .HasSuffix (object .Key , "/" ) || object .Key == sourceignore .IgnoreFile {
387+ continue
388+ }
389+
390+ if matcher .Match (strings .Split (object .Key , "/" ), false ) {
391+ continue
392+ }
393+
394+ localPath := filepath .Join (tempDir , object .Key )
395+ err := s3Client .FGetObject (ctxTimeout , bucket .Spec .BucketName , object .Key , localPath , minio.GetObjectOptions {})
396+ if err != nil {
397+ err = fmt .Errorf ("downloading object from bucket '%s' failed: %w" , bucket .Spec .BucketName , err )
398+ return sourcev1 .BucketNotReady (bucket , sourcev1 .BucketOperationFailedReason , err .Error ()), err
399+ }
400+ }
401+ return sourcev1.Bucket {}, nil
402+ }
403+
404+ // authGCP creates a new Google Cloud Platform storage client
405+ // to interact with the storage service.
406+ func (r * BucketReconciler ) authGCP (ctx context.Context , bucket sourcev1.Bucket ) (* gcp.GCPClient , error ) {
407+ var client * gcp.GCPClient
408+ var err error
409+ if bucket .Spec .SecretRef != nil {
410+ secretName := types.NamespacedName {
411+ Namespace : bucket .GetNamespace (),
412+ Name : bucket .Spec .SecretRef .Name ,
413+ }
414+
415+ var secret corev1.Secret
416+ if err := r .Get (ctx , secretName , & secret ); err != nil {
417+ return nil , fmt .Errorf ("credentials secret error: %w" , err )
418+ }
419+ if err := gcp .ValidateSecret (secret .Data , secret .Name ); err != nil {
420+ return nil , err
421+ }
422+ client , err = gcp .NewClient (ctx , option .WithCredentialsJSON (secret .Data ["serviceaccount" ]))
423+ if err != nil {
424+ return nil , err
425+ }
426+ } else {
427+ client , err = gcp .NewClient (ctx )
428+ if err != nil {
429+ return nil , err
430+ }
431+ }
432+ return client , nil
433+
434+ }
435+
436+ // authMinio creates a new Minio client to interact with S3
437+ // compatible storage services.
438+ func (r * BucketReconciler ) authMinio (ctx context.Context , bucket sourcev1.Bucket ) (* minio.Client , error ) {
319439 opt := minio.Options {
320440 Region : bucket .Spec .Region ,
321441 Secure : ! bucket .Spec .Insecure ,
0 commit comments