@@ -205,6 +205,11 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
205
205
'access' )
206
206
encrypt_bucket_keys = traits .Bool (desc = 'Flag indicating whether to use S3 ' \
207
207
'server-side AES-256 encryption' )
208
+ # Set this if user wishes to override the bucket with their own
209
+ bucket = traits .Generic (mandatory = False ,
210
+ desc = 'Boto3 S3 bucket for manual override of bucket' )
211
+ # Set this if user wishes to have local copy of files as well
212
+ local_dir = traits .Str (desc = 'Copy files locally as well as to S3 bucket' )
208
213
209
214
# Set call-able inputs attributes
210
215
def __setattr__ (self , key , value ):
@@ -385,7 +390,6 @@ def _check_s3_base_dir(self):
385
390
386
391
# Init variables
387
392
s3_str = 's3://'
388
- sep = os .path .sep
389
393
base_directory = self .inputs .base_directory
390
394
391
395
# Explicitly lower-case the "s3"
@@ -396,11 +400,16 @@ def _check_s3_base_dir(self):
396
400
397
401
# Check if 's3://' in base dir
398
402
if base_directory .startswith (s3_str ):
403
+ # Attempt to access bucket
399
404
try :
400
405
# Expects bucket name to be 's3://bucket_name/base_dir/..'
401
- bucket_name = base_directory .split (s3_str )[1 ].split (sep )[0 ]
406
+ bucket_name = base_directory .split (s3_str )[1 ].split ('/' )[0 ]
402
407
# Get the actual bucket object
403
- self .bucket = self ._fetch_bucket (bucket_name )
408
+ if self .inputs .bucket :
409
+ self .bucket = self .inputs .bucket
410
+ else :
411
+ self .bucket = self ._fetch_bucket (bucket_name )
412
+ # Report error in case of exception
404
413
except Exception as exc :
405
414
err_msg = 'Unable to access S3 bucket. Error:\n %s. Exiting...' \
406
415
% exc
@@ -566,7 +575,7 @@ def _upload_to_s3(self, src, dst):
566
575
bucket = self .bucket
567
576
iflogger = logging .getLogger ('interface' )
568
577
s3_str = 's3://'
569
- s3_prefix = os . path . join ( s3_str , bucket .name )
578
+ s3_prefix = s3_str + bucket .name
570
579
571
580
# Explicitly lower-case the "s3"
572
581
if dst .lower ().startswith (s3_str ):
@@ -629,41 +638,53 @@ def _list_outputs(self):
629
638
iflogger = logging .getLogger ('interface' )
630
639
outputs = self .output_spec ().get ()
631
640
out_files = []
632
- outdir = self . inputs . base_directory
641
+ # Use hardlink
633
642
use_hardlink = str2bool (config .get ('execution' , 'try_hard_link_datasink' ))
634
643
635
- # If base directory isn't given, assume current directory
636
- if not isdefined (outdir ):
637
- outdir = '.'
644
+ # Set local output directory if specified
645
+ if isdefined (self .inputs .local_copy ):
646
+ outdir = self .inputs .local_copy
647
+ else :
648
+ outdir = self .inputs .base_directory
649
+ # If base directory isn't given, assume current directory
650
+ if not isdefined (outdir ):
651
+ outdir = '.'
638
652
639
- # Check if base directory reflects S3- bucket upload
653
+ # Check if base directory reflects S3 bucket upload
640
654
try :
641
655
s3_flag = self ._check_s3_base_dir ()
656
+ s3dir = self .inputs .base_directory
657
+ if isdefined (self .inputs .container ):
658
+ s3dir = os .path .join (s3dir , self .inputs .container )
642
659
# If encountering an exception during bucket access, set output
643
660
# base directory to a local folder
644
661
except Exception as exc :
645
- local_out_exception = os .path .join (os .path .expanduser ('~' ),
646
- 'data_output' )
662
+ if not isdefined (self .inputs .local_copy ):
663
+ local_out_exception = os .path .join (os .path .expanduser ('~' ),
664
+ 's3_datasink_' + self .bucket .name )
665
+ outdir = local_out_exception
666
+ else :
667
+ outdir = self .inputs .local_copy
668
+ # Log local copying directory
647
669
iflogger .info ('Access to S3 failed! Storing outputs locally at: ' \
648
- '%s\n Error: %s' % (local_out_exception , exc ))
649
- self .inputs .base_directory = local_out_exception
650
-
651
- # If not accessing S3, just set outdir to local absolute path
652
- if not s3_flag :
653
- outdir = os .path .abspath (outdir )
670
+ '%s\n Error: %s' % (outdir , exc ))
654
671
655
672
# If container input is given, append that to outdir
656
673
if isdefined (self .inputs .container ):
657
674
outdir = os .path .join (outdir , self .inputs .container )
658
- # Create the directory if it doesn't exist
659
- if not os .path .exists (outdir ):
660
- try :
661
- os .makedirs (outdir )
662
- except OSError , inst :
663
- if 'File exists' in inst :
664
- pass
665
- else :
666
- raise (inst )
675
+
676
+ # If doing a localy output
677
+ if not outdir .lower ().startswith ('s3://' ):
678
+ outdir = os .path .abspath (outdir )
679
+ # Create the directory if it doesn't exist
680
+ if not os .path .exists (outdir ):
681
+ try :
682
+ os .makedirs (outdir )
683
+ except OSError , inst :
684
+ if 'File exists' in inst :
685
+ pass
686
+ else :
687
+ raise (inst )
667
688
668
689
# Iterate through outputs attributes {key : path(s)}
669
690
for key , files in self .inputs ._outputs .items ():
@@ -672,10 +693,14 @@ def _list_outputs(self):
672
693
iflogger .debug ("key: %s files: %s" % (key , str (files )))
673
694
files = filename_to_list (files )
674
695
tempoutdir = outdir
696
+ if s3_flag :
697
+ s3tempoutdir = s3dir
675
698
for d in key .split ('.' ):
676
699
if d [0 ] == '@' :
677
700
continue
678
701
tempoutdir = os .path .join (tempoutdir , d )
702
+ if s3_flag :
703
+ s3tempoutdir = os .path .join (s3tempoutdir , d )
679
704
680
705
# flattening list
681
706
if isinstance (files , list ):
@@ -690,25 +715,26 @@ def _list_outputs(self):
690
715
src = os .path .join (src , '' )
691
716
dst = self ._get_dst (src )
692
717
dst = os .path .join (tempoutdir , dst )
718
+ s3dst = os .path .join (s3tempoutdir , dst )
693
719
dst = self ._substitute (dst )
694
720
path , _ = os .path .split (dst )
695
721
696
- # Create output directory if it doesnt exist
697
- if not os .path .exists (path ):
698
- try :
699
- os .makedirs (path )
700
- except OSError , inst :
701
- if 'File exists' in inst :
702
- pass
703
- else :
704
- raise (inst )
705
-
706
722
# If we're uploading to S3
707
723
if s3_flag :
724
+ dst = dst .replace (outdir , self .inputs .base_directory )
708
725
self ._upload_to_s3 (src , dst )
709
726
out_files .append (dst )
710
727
# Otherwise, copy locally src -> dst
711
728
else :
729
+ # Create output directory if it doesnt exist
730
+ if not os .path .exists (path ):
731
+ try :
732
+ os .makedirs (path )
733
+ except OSError , inst :
734
+ if 'File exists' in inst :
735
+ pass
736
+ else :
737
+ raise (inst )
712
738
# If src is a file, copy it to dst
713
739
if os .path .isfile (src ):
714
740
iflogger .debug ('copyfile: %s %s' % (src , dst ))
0 commit comments