@@ -48,6 +48,7 @@ type Source struct {
48
48
49
49
checkpointer * Checkpointer
50
50
sources.Progress
51
+ metricsCollector metricsCollector
51
52
52
53
errorCount * sync.Map
53
54
jobPool * errgroup.Group
@@ -94,6 +95,7 @@ func (s *Source) Init(
94
95
s .conn = & conn
95
96
96
97
s .checkpointer = NewCheckpointer (ctx , conn .GetEnableResumption (), & s .Progress )
98
+ s .metricsCollector = metricsInstance
97
99
98
100
s .setMaxObjectSize (conn .GetMaxObjectSize ())
99
101
@@ -106,11 +108,12 @@ func (s *Source) Init(
106
108
107
109
func (s * Source ) Validate (ctx context.Context ) []error {
108
110
var errs []error
109
- visitor := func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ) {
111
+ visitor := func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ) error {
110
112
roleErrs := s .validateBucketAccess (c , defaultRegionClient , roleArn , buckets )
111
113
if len (roleErrs ) > 0 {
112
114
errs = append (errs , roleErrs ... )
113
115
}
116
+ return nil
114
117
}
115
118
116
119
if err := s .visitRoles (ctx , visitor ); err != nil {
@@ -307,6 +310,7 @@ func (s *Source) scanBuckets(
307
310
308
311
bucketsToScanCount := len (bucketsToScan )
309
312
for bucketIdx := pos .index ; bucketIdx < bucketsToScanCount ; bucketIdx ++ {
313
+ s .metricsCollector .RecordBucketForRole (role )
310
314
bucket := bucketsToScan [bucketIdx ]
311
315
ctx := context .WithValue (ctx , "bucket" , bucket )
312
316
@@ -385,8 +389,9 @@ func (s *Source) scanBuckets(
385
389
386
390
// Chunks emits chunks of bytes over a channel.
387
391
func (s * Source ) Chunks (ctx context.Context , chunksChan chan * sources.Chunk , _ ... sources.ChunkingTarget ) error {
388
- visitor := func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ) {
392
+ visitor := func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ) error {
389
393
s .scanBuckets (c , defaultRegionClient , roleArn , buckets , chunksChan )
394
+ return nil
390
395
}
391
396
392
397
return s .visitRoles (ctx , visitor )
@@ -427,6 +432,7 @@ func (s *Source) pageChunker(
427
432
428
433
for objIdx , obj := range metadata .page .Contents {
429
434
if obj == nil {
435
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "nil_object" )
430
436
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
431
437
ctx .Logger ().Error (err , "could not update progress for nil object" )
432
438
}
@@ -442,6 +448,7 @@ func (s *Source) pageChunker(
442
448
// Skip GLACIER and GLACIER_IR objects.
443
449
if obj .StorageClass == nil || strings .Contains (* obj .StorageClass , "GLACIER" ) {
444
450
ctx .Logger ().V (5 ).Info ("Skipping object in storage class" , "storage_class" , * obj .StorageClass )
451
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "storage_class" )
445
452
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
446
453
ctx .Logger ().Error (err , "could not update progress for glacier object" )
447
454
}
@@ -451,6 +458,7 @@ func (s *Source) pageChunker(
451
458
// Ignore large files.
452
459
if * obj .Size > s .maxObjectSize {
453
460
ctx .Logger ().V (5 ).Info ("Skipping %d byte file (over maxObjectSize limit)" )
461
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "size_limit" )
454
462
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
455
463
ctx .Logger ().Error (err , "could not update progress for large file" )
456
464
}
@@ -460,6 +468,7 @@ func (s *Source) pageChunker(
460
468
// File empty file.
461
469
if * obj .Size == 0 {
462
470
ctx .Logger ().V (5 ).Info ("Skipping empty file" )
471
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "empty_file" )
463
472
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
464
473
ctx .Logger ().Error (err , "could not update progress for empty file" )
465
474
}
@@ -469,6 +478,7 @@ func (s *Source) pageChunker(
469
478
// Skip incompatible extensions.
470
479
if common .SkipFile (* obj .Key ) {
471
480
ctx .Logger ().V (5 ).Info ("Skipping file with incompatible extension" )
481
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "incompatible_extension" )
472
482
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
473
483
ctx .Logger ().Error (err , "could not update progress for incompatible file" )
474
484
}
@@ -483,6 +493,7 @@ func (s *Source) pageChunker(
483
493
484
494
if strings .HasSuffix (* obj .Key , "/" ) {
485
495
ctx .Logger ().V (5 ).Info ("Skipping directory" )
496
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "directory" )
486
497
return nil
487
498
}
488
499
@@ -508,8 +519,12 @@ func (s *Source) pageChunker(
508
519
Key : obj .Key ,
509
520
})
510
521
if err != nil {
511
- if ! strings .Contains (err .Error (), "AccessDenied" ) {
522
+ if strings .Contains (err .Error (), "AccessDenied" ) {
523
+ ctx .Logger ().Error (err , "could not get S3 object; access denied" )
524
+ s .metricsCollector .RecordObjectSkipped (metadata .bucket , "access_denied" )
525
+ } else {
512
526
ctx .Logger ().Error (err , "could not get S3 object" )
527
+ s .metricsCollector .RecordObjectError (metadata .bucket )
513
528
}
514
529
// According to the documentation for GetObjectWithContext,
515
530
// the response can be non-nil even if there was an error.
@@ -563,6 +578,7 @@ func (s *Source) pageChunker(
563
578
564
579
if err := handlers .HandleFile (ctx , res .Body , chunkSkel , sources.ChanReporter {Ch : chunksChan }); err != nil {
565
580
ctx .Logger ().Error (err , "error handling file" )
581
+ s .metricsCollector .RecordObjectError (metadata .bucket )
566
582
return nil
567
583
}
568
584
@@ -580,6 +596,7 @@ func (s *Source) pageChunker(
580
596
if err := s .checkpointer .UpdateObjectCompletion (ctx , objIdx , metadata .bucket , metadata .page .Contents ); err != nil {
581
597
ctx .Logger ().Error (err , "could not update progress for scanned object" )
582
598
}
599
+ s .metricsCollector .RecordObjectScanned (metadata .bucket )
583
600
584
601
return nil
585
602
})
@@ -633,14 +650,16 @@ func (s *Source) validateBucketAccess(ctx context.Context, client *s3.S3, roleAr
633
650
// If no roles are configured, it will call the function with an empty role ARN.
634
651
func (s * Source ) visitRoles (
635
652
ctx context.Context ,
636
- f func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ),
653
+ f func (c context.Context , defaultRegionClient * s3.S3 , roleArn string , buckets []string ) error ,
637
654
) error {
638
655
roles := s .conn .GetRoles ()
639
656
if len (roles ) == 0 {
640
657
roles = []string {"" }
641
658
}
642
659
643
660
for _ , role := range roles {
661
+ s .metricsCollector .RecordRoleScanned (role )
662
+
644
663
client , err := s .newClient (defaultAWSRegion , role )
645
664
if err != nil {
646
665
return fmt .Errorf ("could not create s3 client: %w" , err )
@@ -651,7 +670,9 @@ func (s *Source) visitRoles(
651
670
return fmt .Errorf ("role %q could not list any s3 buckets for scanning: %w" , role , err )
652
671
}
653
672
654
- f (ctx , client , role , bucketsToScan )
673
+ if err := f (ctx , client , role , bucketsToScan ); err != nil {
674
+ return err
675
+ }
655
676
}
656
677
657
678
return nil
0 commit comments