@@ -77,5 +77,51 @@ def data(output_dir):
77
77
print ('done' )
78
78
79
79
80
+ @click .command ()
81
+ @click .argument ('output_dir' )
82
+ @click .argument ('bucket' )
83
+ @click .option ('--access_key' , help = "your AWS access key" )
84
+ @click .option ('--secret_key' , help = "your AWS access secret" )
85
+ @click .option ('--provider' , default = 's3' , help = "Cloud storage provider. Only S3 is supported right now." )
86
+ @click .option ('--subject' , default = None , help = "Subject id to upload (optional)" )
87
+ def upload (output_dir , bucket , access_key , secret_key , provider = 's3' , subject = None ):
88
+ """
89
+ OUTPUT_DIR: The directory where the output files were stored.
90
+
91
+ BUCKET: The cloud bucket name to upload data to.
92
+ """
93
+ import boto3
94
+ from dask import compute , delayed
95
+ from glob import glob
96
+ from tqdm .auto import tqdm
97
+
98
+ output_dir = os .path .abspath (output_dir )
99
+ if not output_dir .endswith ('/' ):
100
+ output_dir += '/'
101
+
102
+ if provider == 's3' or provider == 'S3' :
103
+ client = boto3 .client ('s3' , aws_access_key_id = access_key , aws_secret_access_key = secret_key )
104
+
105
+ if subject is not None :
106
+ assert os .path .exists (os .path .join (output_dir , subject )), 'this subject id does not exist!'
107
+ subjects = [subject ]
108
+ else :
109
+ subjects = [os .path .split (s )[1 ] for s in glob (os .path .join (output_dir , 'sub-*' ))]
110
+
111
+ def upload_subject (sub , sub_idx ):
112
+ base_dir = os .path .join (output_dir , sub , 'dmriprep' )
113
+ for root , dirs , files in os .walk (base_dir ):
114
+ if len (files ):
115
+ for f in tqdm (files , desc = f"Uploading { sub } { root .split ('/' )[- 1 ]} " , position = sub_idx ):
116
+ filepath = os .path .join (root , f )
117
+ key = root .replace (output_dir , '' )
118
+ client .upload_file (filepath , bucket , os .path .join (key , f ))
119
+
120
+ uploads = [delayed (upload_subject )(s , idx ) for idx , s in enumerate (subjects )]
121
+ _ = list (compute (* uploads , scheduler = "threads" ))
122
+ else :
123
+ raise NotImplementedError ('Only S3 is the only supported provider for data uploads at the moment' )
124
+
125
+
80
126
if __name__ == "__main__" :
81
127
sys .exit (main ()) # pragma: no cover
0 commit comments