Skip to content

Commit 9be1b7f

Browse files
author
shariqiqbal2810
committed
Added support for grabbing and putting data on S3.
* Added S3DataGrabber to nipype/interfaces/io.py * Added S3DataSink to nipype/interfaces/io.py * Added tests to nipype/interfaces/tests/test_io.py * Currently the S3 tests are using a bucket on our lab's account.
1 parent 72a2f71 commit 9be1b7f

File tree

2 files changed

+401
-0
lines changed

2 files changed

+401
-0
lines changed

nipype/interfaces/io.py

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
except:
4646
pass
4747

48+
try:
49+
import boto3
50+
except:
51+
pass
52+
4853
from nipype.interfaces.base import (TraitedSpec, traits, File, Directory,
4954
BaseInterface, InputMultiPath, isdefined,
5055
OutputMultiPath, DynamicTraitedSpec,
@@ -371,6 +376,299 @@ def _list_outputs(self):
371376
return outputs
372377

373378

379+
class S3DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
380+
bucket = traits.Str(mandatory=True,
381+
desc='Amazon S3 bucket where your data is stored')
382+
bucket_path = traits.Str('', usedefault=True,
383+
desc='Location within your bucket to store '
384+
'data.')
385+
region = traits.Str('us-east-1', usedefault=True,
386+
desc='Region of s3 bucket')
387+
base_directory = Directory(
388+
desc='Path to the base directory for storing data.')
389+
container = traits.Str(
390+
desc='Folder within base directory in which to store output')
391+
parameterization = traits.Bool(True, usedefault=True,
392+
desc='store output in parametrized structure')
393+
strip_dir = Directory(desc='path to strip out of filename')
394+
substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
395+
desc=('List of 2-tuples reflecting string '
396+
'to substitute and string to replace '
397+
'it with'))
398+
regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
399+
desc=('List of 2-tuples reflecting a pair '
400+
'of a Python regexp pattern and a '
401+
'replacement string. Invoked after '
402+
'string `substitutions`'))
403+
404+
_outputs = traits.Dict(traits.Str, value={}, usedefault=True)
405+
remove_dest_dir = traits.Bool(False, usedefault=True,
406+
desc='remove dest directory when copying dirs')
407+
408+
def __setattr__(self, key, value):
409+
if key not in self.copyable_trait_names():
410+
if not isdefined(value):
411+
super(S3DataSinkInputSpec, self).__setattr__(key, value)
412+
self._outputs[key] = value
413+
else:
414+
if key in self._outputs:
415+
self._outputs[key] = value
416+
super(S3DataSinkInputSpec, self).__setattr__(key, value)
417+
418+
419+
class S3DataSink(DataSink):
420+
""" Works exactly like DataSink, except the specified files will
421+
also be uploaded to Amazon S3 storage in the specified bucket
422+
and location. 'bucket_path' is the s3 analog for
423+
'base_directory'.
424+
425+
"""
426+
input_spec = S3DataSinkInputSpec
427+
428+
def _list_outputs(self):
429+
"""Execute this module.
430+
"""
431+
outputs = super(S3DataSink, self)._list_outputs()
432+
433+
self.localtos3(outputs['out_file'])
434+
435+
return outputs
436+
437+
def localtos3(self, paths):
438+
client = boto3.client('s3', self.inputs.region)
439+
transfer = boto3.s3.transfer.S3Transfer(client)
440+
s3paths = []
441+
442+
for path in paths:
443+
# convert local path to s3 path
444+
bd_index = path.find(self.inputs.base_directory)
445+
if bd_index != -1: # base_directory is in path, maintain directory structure
446+
s3path = path[bd_index+len(self.inputs.base_directory):] # cut out base directory
447+
if s3path[0] == os.path.sep:
448+
s3path = s3path[1:]
449+
else: # base_directory isn't in path, simply place all files in bucket_path folder
450+
s3path = os.path.split(path)[1] # take filename from path
451+
s3path = os.path.join(self.inputs.bucket_path, path)
452+
s3paths.append(s3path)
453+
454+
transfer.upload_file(path, self.inputs.bucket, s3path)
455+
456+
return s3paths
457+
458+
459+
class S3DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
460+
region = traits.Str('us-east-1', usedefault=True,
461+
desc='Region of s3 bucket')
462+
bucket = traits.Str(mandatory=True,
463+
desc='Amazon S3 bucket where your data is stored')
464+
bucket_path = traits.Str('', usedefault=True,
465+
desc='Location within your bucket for subject data.')
466+
local_directory = Directory(exists=True,
467+
desc='Path to the local directory for subject data to be downloaded '
468+
'and accessed. Should be on HDFS for Spark jobs.')
469+
raise_on_empty = traits.Bool(True, usedefault=True,
470+
desc='Generate exception if list is empty for a given field')
471+
sort_filelist = traits.Bool(mandatory=True,
472+
desc='Sort the filelist that matches the template')
473+
template = traits.Str(mandatory=True,
474+
desc='Layout used to get files. Relative to bucket_path if defined.'
475+
'Uses regex rather than glob style formatting.')
476+
template_args = traits.Dict(key_trait=traits.Str,
477+
value_trait=traits.List(traits.List),
478+
desc='Information to plug into template')
479+
480+
481+
class S3DataGrabber(IOBase):
482+
""" Generic datagrabber module that wraps around glob in an
483+
intelligent way for neuroimaging tasks to grab files from
484+
Amazon S3
485+
486+
Works exactly like DataGrabber, except, you must specify an
487+
S3 "bucket" and "bucket_path" to search for your data and a
488+
"local_directory" to store the data. "local_directory"
489+
should be a location on HDFS for Spark jobs. Additionally,
490+
"template" uses regex style formatting, rather than the
491+
glob-style found in the original DataGrabber.
492+
493+
"""
494+
input_spec = S3DataGrabberInputSpec
495+
output_spec = DynamicTraitedSpec
496+
_always_run = True
497+
498+
def __init__(self, infields=None, outfields=None, **kwargs):
499+
"""
500+
Parameters
501+
----------
502+
infields : list of str
503+
Indicates the input fields to be dynamically created
504+
505+
outfields: list of str
506+
Indicates output fields to be dynamically created
507+
508+
See class examples for usage
509+
510+
"""
511+
if not outfields:
512+
outfields = ['outfiles']
513+
super(S3DataGrabber, self).__init__(**kwargs)
514+
undefined_traits = {}
515+
# used for mandatory inputs check
516+
self._infields = infields
517+
self._outfields = outfields
518+
if infields:
519+
for key in infields:
520+
self.inputs.add_trait(key, traits.Any)
521+
undefined_traits[key] = Undefined
522+
# add ability to insert field specific templates
523+
self.inputs.add_trait('field_template',
524+
traits.Dict(traits.Enum(outfields),
525+
desc="arguments that fit into template"))
526+
undefined_traits['field_template'] = Undefined
527+
if not isdefined(self.inputs.template_args):
528+
self.inputs.template_args = {}
529+
for key in outfields:
530+
if not key in self.inputs.template_args:
531+
if infields:
532+
self.inputs.template_args[key] = [infields]
533+
else:
534+
self.inputs.template_args[key] = []
535+
536+
self.inputs.trait_set(trait_change_notify=False, **undefined_traits)
537+
538+
def _add_output_traits(self, base):
539+
"""
540+
S3 specific: Downloads relevant files to a local folder specified
541+
542+
Using traits.Any instead out OutputMultiPath till add_trait bug
543+
is fixed.
544+
"""
545+
return add_traits(base, self.inputs.template_args.keys())
546+
547+
def _list_outputs(self):
548+
# infields are mandatory, however I could not figure out how to set 'mandatory' flag dynamically
549+
# hence manual check
550+
if self._infields:
551+
for key in self._infields:
552+
value = getattr(self.inputs, key)
553+
if not isdefined(value):
554+
msg = "%s requires a value for input '%s' because it was listed in 'infields'" % \
555+
(self.__class__.__name__, key)
556+
raise ValueError(msg)
557+
558+
outputs = {}
559+
# get list of all files in s3 bucket
560+
s3 = boto3.resource('s3')
561+
bkt = s3.Bucket(self.inputs.bucket)
562+
bkt_files = list(k.key for k in bkt.objects.all())
563+
564+
# keys are outfields, args are template args for the outfield
565+
for key, args in self.inputs.template_args.items():
566+
outputs[key] = []
567+
template = self.inputs.template
568+
if hasattr(self.inputs, 'field_template') and \
569+
isdefined(self.inputs.field_template) and \
570+
key in self.inputs.field_template:
571+
template = self.inputs.field_template[key] # template override for multiple outfields
572+
if isdefined(self.inputs.bucket_path):
573+
template = os.path.join(self.inputs.bucket_path, template)
574+
if not args:
575+
filelist = []
576+
for fname in bkt_files:
577+
if re.match(template, fname):
578+
filelist.append(fname)
579+
if len(filelist) == 0:
580+
msg = 'Output key: %s Template: %s returned no files' % (
581+
key, template)
582+
if self.inputs.raise_on_empty:
583+
raise IOError(msg)
584+
else:
585+
warn(msg)
586+
else:
587+
if self.inputs.sort_filelist:
588+
filelist = human_order_sorted(filelist)
589+
outputs[key] = list_to_filename(filelist)
590+
for argnum, arglist in enumerate(args):
591+
maxlen = 1
592+
for arg in arglist:
593+
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
594+
arg = getattr(self.inputs, arg)
595+
if isinstance(arg, list):
596+
if (maxlen > 1) and (len(arg) != maxlen):
597+
raise ValueError('incompatible number of arguments for %s' % key)
598+
if len(arg) > maxlen:
599+
maxlen = len(arg)
600+
outfiles = []
601+
for i in range(maxlen):
602+
argtuple = []
603+
for arg in arglist:
604+
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
605+
arg = getattr(self.inputs, arg)
606+
if isinstance(arg, list):
607+
argtuple.append(arg[i])
608+
else:
609+
argtuple.append(arg)
610+
filledtemplate = template
611+
if argtuple:
612+
try:
613+
filledtemplate = template % tuple(argtuple)
614+
except TypeError as e:
615+
raise TypeError(e.message + ": Template %s failed to convert with args %s" % (template, str(tuple(argtuple))))
616+
outfiles = []
617+
for fname in bkt_files:
618+
if re.match(filledtemplate, fname):
619+
outfiles.append(fname)
620+
if len(outfiles) == 0:
621+
msg = 'Output key: %s Template: %s returned no files' % (key, filledtemplate)
622+
if self.inputs.raise_on_empty:
623+
raise IOError(msg)
624+
else:
625+
warn(msg)
626+
outputs[key].append(None)
627+
else:
628+
if self.inputs.sort_filelist:
629+
outfiles = human_order_sorted(outfiles)
630+
outputs[key].append(list_to_filename(outfiles))
631+
if any([val is None for val in outputs[key]]):
632+
outputs[key] = []
633+
if len(outputs[key]) == 0:
634+
outputs[key] = None
635+
elif len(outputs[key]) == 1:
636+
outputs[key] = outputs[key][0]
637+
# Outputs are currently stored as locations on S3.
638+
# We must convert to the local location specified
639+
# and download the files.
640+
for key in outputs:
641+
if type(outputs[key]) == list:
642+
paths = outputs[key]
643+
for i in range(len(paths)):
644+
path = paths[i]
645+
outputs[key][i] = self.s3tolocal(path)
646+
elif type(outputs[key]) == str:
647+
outputs[key] = self.s3tolocal(outputs[key])
648+
649+
return outputs
650+
651+
# Takes an s3 address and downloads the file to a local
652+
# directory, returning the local path.
653+
def s3tolocal(self, s3path):
654+
# path formatting
655+
if not os.path.split(self.inputs.local_directory)[1] == '':
656+
self.inputs.local_directory += '/'
657+
if not os.path.split(self.inputs.bucket_path)[1] == '':
658+
self.inputs.bucket_path += '/'
659+
if self.inputs.template[0] == '/':
660+
self.inputs.template = self.inputs.template[1:]
661+
662+
localpath = s3path.replace(self.inputs.bucket_path, self.inputs.local_directory)
663+
localdir = os.path.split(localpath)[0]
664+
if not os.path.exists(localdir):
665+
os.makedirs(localdir)
666+
client = boto3.client('s3', self.inputs.region)
667+
transfer = boto3.s3.transfer.S3Transfer(client)
668+
transfer.download_file(self.inputs.bucket, s3path, localpath)
669+
return localpath
670+
671+
374672
class DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
375673
base_directory = Directory(exists=True,
376674
desc='Path to the base directory consisting of subject data.')

0 commit comments

Comments
 (0)