@@ -118,11 +118,7 @@ const (
118118 SQSFormatSNS = "sns"
119119)
120120
121- func (s * S3Source ) newS3Client () error {
122- if s .s3Client != nil {
123- return nil
124- }
125-
121+ func (s * S3Source ) newS3Client () (* s3.Client , error ) {
126122 var loadOpts []func (* config.LoadOptions ) error
127123 if s .Config .AwsProfile != nil && * s .Config .AwsProfile != "" {
128124 loadOpts = append (loadOpts , config .WithSharedConfigProfile (* s .Config .AwsProfile ))
@@ -134,11 +130,14 @@ func (s *S3Source) newS3Client() error {
134130 }
135131
136132 loadOpts = append (loadOpts , config .WithRegion (region ))
137- loadOpts = append (loadOpts , config .WithCredentialsProvider (aws.AnonymousCredentials {}))
138133
139- cfg , err := config .LoadDefaultConfig (s .ctx , loadOpts ... )
134+ if c := defaultCreds (); c != nil {
135+ loadOpts = append (loadOpts , config .WithCredentialsProvider (c ))
136+ }
137+
138+ cfg , err := config .LoadDefaultConfig (context .TODO (), loadOpts ... )
140139 if err != nil {
141- return fmt .Errorf ("failed to load aws config: %w" , err )
140+ return nil , fmt .Errorf ("failed to load aws config: %w" , err )
142141 }
143142
144143 var clientOpts []func (* s3.Options )
@@ -148,16 +147,10 @@ func (s *S3Source) newS3Client() error {
148147 })
149148 }
150149
151- s .s3Client = s3 .NewFromConfig (cfg , clientOpts ... )
152-
153- return nil
150+ return s3 .NewFromConfig (cfg , clientOpts ... ), nil
154151}
155152
156- func (s * S3Source ) newSQSClient () error {
157- if s .sqsClient != nil {
158- return nil
159- }
160-
153+ func (s * S3Source ) newSQSClient () (* sqs.Client , error ) {
161154 var loadOpts []func (* config.LoadOptions ) error
162155 if s .Config .AwsProfile != nil && * s .Config .AwsProfile != "" {
163156 loadOpts = append (loadOpts , config .WithSharedConfigProfile (* s .Config .AwsProfile ))
@@ -169,21 +162,22 @@ func (s *S3Source) newSQSClient() error {
169162 }
170163
171164 loadOpts = append (loadOpts , config .WithRegion (region ))
172- loadOpts = append (loadOpts , config .WithCredentialsProvider (aws.AnonymousCredentials {}))
173165
174- cfg , err := config .LoadDefaultConfig (s .ctx , loadOpts ... )
166+ if c := defaultCreds (); c != nil {
167+ loadOpts = append (loadOpts , config .WithCredentialsProvider (c ))
168+ }
169+
170+ cfg , err := config .LoadDefaultConfig (context .TODO (), loadOpts ... )
175171 if err != nil {
176- return fmt .Errorf ("failed to load aws config: %w" , err )
172+ return nil , fmt .Errorf ("failed to load aws config: %w" , err )
177173 }
178174
179175 var clientOpts []func (* sqs.Options )
180176 if s .Config .AwsEndpoint != "" {
181177 clientOpts = append (clientOpts , func (o * sqs.Options ) { o .BaseEndpoint = aws .String (s .Config .AwsEndpoint ) })
182178 }
183179
184- s .sqsClient = sqs .NewFromConfig (cfg , clientOpts ... )
185-
186- return nil
180+ return sqs .NewFromConfig (cfg , clientOpts ... ), nil
187181}
188182
189183func (s * S3Source ) readManager () {
@@ -207,7 +201,7 @@ func (s *S3Source) readManager() {
207201
208202func (s * S3Source ) getBucketContent () ([]s3types.Object , error ) {
209203 logger := s .logger .WithField ("method" , "getBucketContent" )
210- logger .Debugf ("Getting bucket content for %s" , s . Config . BucketName )
204+ logger .Debugf ("Getting bucket content" )
211205
212206 bucketObjects := make ([]s3types.Object , 0 )
213207
@@ -274,10 +268,17 @@ func (s *S3Source) listPoll() error {
274268
275269 logger .Debugf ("Found new object %s" , * bucketObjects [i ].Key )
276270
277- s . readerChan <- S3Object {
271+ obj := S3Object {
278272 Bucket : s .Config .BucketName ,
279273 Key : * bucketObjects [i ].Key ,
280274 }
275+
276+ select {
277+ case s .readerChan <- obj :
278+ case <- s .t .Dying ():
279+ logger .Debug ("tomb is dying, dropping object send" )
280+ return nil
281+ }
281282 }
282283
283284 if newObject {
@@ -391,6 +392,9 @@ func (s *S3Source) sqsPoll() error {
391392 WaitTimeSeconds : 20 , // Probably no need to make it configurable ?
392393 })
393394 if err != nil {
395+ if errors .Is (err , context .Canceled ) {
396+ return nil
397+ }
394398 logger .Errorf ("Error while polling SQS: %s" , err )
395399 continue
396400 }
@@ -421,7 +425,13 @@ func (s *S3Source) sqsPoll() error {
421425
422426 logger .Debugf ("Received SQS message for object %s/%s" , bucket , key )
423427
424- s .readerChan <- S3Object {Key : key , Bucket : bucket }
428+ // don't block if readManager has quit
429+ select {
430+ case s .readerChan <- S3Object {Key : key , Bucket : bucket }:
431+ case <- s .t .Dying ():
432+ logger .Debug ("tomb is dying, dropping object send" )
433+ return nil
434+ }
425435
426436 _ , err = s .sqsClient .DeleteMessage (s .ctx ,
427437 & sqs.DeleteMessageInput {
@@ -516,7 +526,13 @@ func (s *S3Source) readFile(bucket string, key string) error {
516526 evt := types .MakeEvent (s .Config .UseTimeMachine , types .LOG , true )
517527 evt .Line = l
518528
519- s .out <- evt
529+ // don't block in shutdown
530+ select {
531+ case s .out <- evt :
532+ case <- s .t .Dying ():
533+ s .logger .Infof ("tomb is dying, dropping event for %s/%s" , bucket , key )
534+ return nil
535+ }
520536 }
521537 }
522538
@@ -615,16 +631,20 @@ func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel
615631 s .logger .Warning ("Polling method is set to list. This is not recommended as it will not scale well. Consider using SQS instead." )
616632 }
617633
618- err = s .newS3Client ()
634+ client , err : = s .newS3Client ()
619635 if err != nil {
620636 return err
621637 }
622638
639+ s .s3Client = client
640+
623641 if s .Config .PollingMethod == PollMethodSQS {
624- err = s .newSQSClient ()
642+ sqsClient , err : = s .newSQSClient ()
625643 if err != nil {
626644 return err
627645 }
646+
647+ s .sqsClient = sqsClient
628648 }
629649
630650 return nil
@@ -707,11 +727,13 @@ func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *
707727 return fmt .Errorf ("invalid DSN %s for S3 source" , dsn )
708728 }
709729
710- err := s .newS3Client ()
730+ client , err := s .newS3Client ()
711731 if err != nil {
712732 return err
713733 }
714734
735+ s .s3Client = client
736+
715737 return nil
716738}
717739
@@ -768,21 +790,11 @@ func (s *S3Source) StreamingAcquisition(ctx context.Context, out chan types.Even
768790
769791 if s .Config .PollingMethod == PollMethodSQS {
770792 t .Go (func () error {
771- err := s .sqsPoll ()
772- if err != nil {
773- return err
774- }
775-
776- return nil
793+ return s .sqsPoll ()
777794 })
778795 } else {
779796 t .Go (func () error {
780- err := s .listPoll ()
781- if err != nil {
782- return err
783- }
784-
785- return nil
797+ return s .listPoll ()
786798 })
787799 }
788800
0 commit comments