Skip to content

Commit 7cc0731

Browse files
committed
Merge branch 's3_multiproc' into resource_multiproc
2 parents 5dac574 + fe0a352 commit 7cc0731

File tree

12 files changed

+688
-291
lines changed

12 files changed

+688
-291
lines changed

doc/users/aws.rst

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
.. _aws:
2+
3+
============================================
4+
Using Nipype with Amazon Web Services (AWS)
5+
============================================
6+
Several groups have been successfully using Nipype on AWS. This procedure
7+
involves setting a temporary cluster using StarCluster and potentially
8+
transferring files to/from S3. The latter is supported by Nipype through
9+
DataSink and S3DataGrabber.
10+
11+
12+
Using DataSink with S3
13+
======================
14+
The DataSink class now supports sending output data directly to an AWS S3
15+
bucket. It does this through the introduction of several input attributes to the
16+
DataSink interface and by parsing the `base_directory` attribute. This class
17+
uses the `boto3 <https://boto3.readthedocs.org/en/latest/>`_ and
18+
`botocore <https://botocore.readthedocs.org/en/latest/>`_ Python packages to
19+
interact with AWS. To configure the DataSink to write data to S3, the user must
20+
set the ``base_directory`` property to an S3-style filepath. For example:
21+
22+
::
23+
24+
import nipype.interfaces.io as nio
25+
ds = nio.DataSink()
26+
ds.inputs.base_directory = 's3://mybucket/path/to/output/dir'
27+
28+
With the "s3://" prefix in the path, the DataSink knows that the output
29+
directory to send files is on S3 in the bucket "mybucket". "path/to/output/dir"
30+
is the relative directory path within the bucket "mybucket" where output data
31+
will be uploaded to (NOTE: if the relative path specified contains folders that
32+
don’t exist in the bucket, the DataSink will create them). The DataSink treats
33+
the S3 base directory exactly as it would a local directory, maintaining support
34+
for containers, substitutions, subfolders, "." notation, etc to route output
35+
data appropriately.
36+
37+
There are four new attributes introduced with S3-compatibility: ``creds_path``,
38+
``encrypt_bucket_keys``, ``local_copy``, and ``bucket``.
39+
40+
::
41+
42+
ds.inputs.creds_path = '/home/user/aws_creds/credentials.csv'
43+
ds.inputs.encrypt_bucket_keys = True
44+
ds.local_copy = '/home/user/workflow_outputs/local_backup'
45+
46+
``creds_path`` is a file path where the user's AWS credentials file (typically
47+
a csv) is stored. This credentials file should contain the AWS access key id and
48+
secret access key and should be formatted as one of the following (these formats
49+
are how Amazon provides the credentials file by default when first downloaded).
50+
51+
Root-account user:
52+
53+
::
54+
55+
AWSAccessKeyID=ABCDEFGHIJKLMNOP
56+
AWSSecretKey=zyx123wvu456/ABC890+gHiJk
57+
58+
IAM-user:
59+
60+
::
61+
62+
User Name,Access Key Id,Secret Access Key
63+
"username",ABCDEFGHIJKLMNOP,zyx123wvu456/ABC890+gHiJk
64+
65+
The ``creds_path`` is necessary when writing files to a bucket that has
66+
restricted access (almost no buckets are publicly writable). If ``creds_path``
67+
is not specified, the DataSink will check the ``AWS_ACCESS_KEY_ID`` and
68+
``AWS_SECRET_ACCESS_KEY`` environment variables and use those values for bucket
69+
access.
70+
71+
``encrypt_bucket_keys`` is a boolean flag that indicates whether to encrypt the
72+
output data on S3, using server-side AES-256 encryption. This is useful if the
73+
data being output is sensitive and one desires an extra layer of security on the
74+
data. By default, this is turned off.
75+
76+
``local_copy`` is a string of the filepath where local copies of the output data
77+
are stored in addition to those sent to S3. This is useful if one wants to keep
78+
a backup version of the data stored on their local computer. By default, this is
79+
turned off.
80+
81+
``bucket`` is a boto3 Bucket object that the user can use to overwrite the
82+
bucket specified in their ``base_directory``. This can be useful if one has to
83+
manually create a bucket instance on their own using special credentials (or
84+
using a mock server like `fakes3 <https://github.com/jubos/fake-s3>`_). This is
85+
typically used for developers unit-testing the DataSink class. Most users do not
86+
need to use this attribute for actual workflows. This is an optional argument.
87+
88+
Finally, the user needs only to specify the input attributes for any incoming
89+
data to the node, and the outputs will be written to their S3 bucket.
90+
91+
::
92+
93+
workflow.connect(inputnode, 'subject_id', ds, 'container')
94+
workflow.connect(realigner, 'realigned_files', ds, 'motion')
95+
96+
So, for example, outputs for sub001’s realigned_file1.nii.gz will be in:
97+
s3://mybucket/path/to/output/dir/sub001/motion/realigned_file1.nii.gz
98+
99+
100+
Using S3DataGrabber
101+
======================
102+
Coming soon...

doc/users/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
spmmcr
3939
mipav
4040
nipypecmd
41+
aws
4142

4243

4344

nipype/interfaces/base.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,7 @@ def _read(self, drain):
12041204
self._lastidx = len(self._rows)
12051205

12061206

1207+
# Get number of threads for process
12071208
def _get_num_threads(proc):
12081209
'''
12091210
'''
@@ -1223,6 +1224,29 @@ def _get_num_threads(proc):
12231224
return num_threads
12241225

12251226

1227+
# Get max resources used for process
1228+
def _get_max_resources_used(proc, mem_mb, num_threads, poll=False):
1229+
'''
1230+
docstring
1231+
'''
1232+
1233+
# Import packages
1234+
from memory_profiler import _get_memory
1235+
import psutil
1236+
1237+
try:
1238+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1239+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1240+
if poll:
1241+
proc.poll()
1242+
except Exception as exc:
1243+
iflogger.info('Could not get resources used by process. Error: %s'\
1244+
% exc)
1245+
1246+
# Return resources
1247+
return mem_mb, num_threads
1248+
1249+
12261250
def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12271251
"""Run a command, read stdout and stderr, prefix with timestamp.
12281252
@@ -1231,7 +1255,7 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12311255

12321256
# Import packages
12331257
try:
1234-
from memory_profiler import _get_memory
1258+
import memory_profiler
12351259
import psutil
12361260
mem_prof = True
12371261
except:
@@ -1292,8 +1316,8 @@ def _process(drain=0):
12921316
stream.read(drain)
12931317
while proc.returncode is None:
12941318
if mem_prof:
1295-
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1296-
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1319+
mem_mb, num_threads = \
1320+
_get_max_resources_used(proc, mem_mb, num_threads)
12971321
proc.poll()
12981322
_process()
12991323
_process(drain=1)
@@ -1311,9 +1335,8 @@ def _process(drain=0):
13111335
if output == 'allatonce':
13121336
if mem_prof:
13131337
while proc.returncode is None:
1314-
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1315-
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1316-
proc.poll()
1338+
mem_mb, num_threads = \
1339+
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13171340
stdout, stderr = proc.communicate()
13181341
if stdout and isinstance(stdout, bytes):
13191342
try:
@@ -1332,9 +1355,8 @@ def _process(drain=0):
13321355
if output == 'file':
13331356
if mem_prof:
13341357
while proc.returncode is None:
1335-
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1336-
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1337-
proc.poll()
1358+
mem_mb, num_threads = \
1359+
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13381360
ret_code = proc.wait()
13391361
stderr.flush()
13401362
stdout.flush()
@@ -1344,9 +1366,8 @@ def _process(drain=0):
13441366
if output == 'none':
13451367
if mem_prof:
13461368
while proc.returncode is None:
1347-
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1348-
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1349-
proc.poll()
1369+
mem_mb, num_threads = \
1370+
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13501371
proc.communicate()
13511372
result['stdout'] = []
13521373
result['stderr'] = []

0 commit comments

Comments
 (0)