Skip to content

Commit 6e3a7b5

Browse files
committed
Merged resource_multiproc into s3_multiproc
2 parents 9cb7a68 + 5dac574 commit 6e3a7b5

File tree

14 files changed

+340
-108
lines changed

14 files changed

+340
-108
lines changed

nipype/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
try:
2121
import faulthandler
2222
faulthandler.enable()
23-
except:
23+
except (ImportError,IOError) as e:
2424
pass
2525

2626

nipype/algorithms/metrics.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ def _eucl_min(self, nii1, nii2):
105105
set2_coordinates.T[point2, :])
106106

107107
def _eucl_cog(self, nii1, nii2):
108-
origdata1 = nii1.get_data().astype(np.bool)
109-
cog_t = np.array(center_of_mass(origdata1)).reshape(-1, 1)
108+
origdata1 = np.logical_and(nii1.get_data() != 0, np.logical_not(np.isnan(nii1.get_data())))
109+
cog_t = np.array(center_of_mass(origdata1.copy())).reshape(-1, 1)
110110
cog_t = np.vstack((cog_t, np.array([1])))
111111
cog_t_coor = np.dot(nii1.affine, cog_t)[:3, :]
112112

113-
origdata2 = nii2.get_data().astype(np.bool)
113+
origdata2 = np.logical_and(nii2.get_data() != 0, np.logical_not(np.isnan(nii2.get_data())))
114114
(labeled_data, n_labels) = label(origdata2)
115115

116116
cogs = np.ones((4, n_labels))
@@ -181,8 +181,9 @@ def _eucl_max(self, nii1, nii2):
181181
return np.max(mins)
182182

183183
def _run_interface(self, runtime):
184-
nii1 = nb.load(self.inputs.volume1)
185-
nii2 = nb.load(self.inputs.volume2)
184+
# there is a bug in some scipy ndimage methods that gets tripped by memory mapped objects
185+
nii1 = nb.load(self.inputs.volume1, mmap=False)
186+
nii2 = nb.load(self.inputs.volume2, mmap=False)
186187

187188
if self.inputs.method == "eucl_min":
188189
self._distance, self._point1, self._point2 = self._eucl_min(

nipype/interfaces/ants/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# -Using -1 gives primary responsibilty to ITKv4 to do the correct
1313
# thread limitings.
1414
# -Using 1 takes a very conservative approach to avoid overloading
15-
# the computer (when running MultiProc) by forcing everything to
15+
# the computer (when running ResourceMultiProc) by forcing everything to
1616
# single threaded. This can be a severe penalty for registration
1717
# performance.
1818
LOCAL_DEFAULT_NUMBER_OF_THREADS = 1

nipype/interfaces/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1297,6 +1297,7 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12971297
# Init variables for memory profiling
12981298
mem_mb = -1
12991299
num_threads = -1
1300+
interval = 1
13001301

13011302
if output == 'stream':
13021303
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
@@ -1346,7 +1347,7 @@ def _process(drain=0):
13461347
try:
13471348
stderr = stderr.decode()
13481349
except UnicodeDecodeError:
1349-
stdout = stdout.decode("ISO-8859-1")
1350+
stderr = stderr.decode("ISO-8859-1")
13501351

13511352
result['stdout'] = str(stdout).split('\n')
13521353
result['stderr'] = str(stderr).split('\n')

nipype/interfaces/io.py

Lines changed: 11 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -134,54 +134,7 @@ def _add_output_traits(self, base):
134134
return base
135135

136136

137-
# Class to track percentage of S3 file upload
138-
class ProgressPercentage(object):
139-
'''
140-
Callable class instsance (via __call__ method) that displays
141-
upload percentage of a file to S3
142-
'''
143-
144-
def __init__(self, filename):
145-
'''
146-
'''
147-
148-
# Import packages
149-
import threading
150-
151-
# Initialize data attributes
152-
self._filename = filename
153-
self._size = float(os.path.getsize(filename))
154-
self._seen_so_far = 0
155-
self._lock = threading.Lock()
156-
157-
def __call__(self, bytes_amount):
158-
'''
159-
'''
160-
161-
# Import packages
162-
import sys
163-
164-
# With the lock on, print upload status
165-
with self._lock:
166-
self._seen_so_far += bytes_amount
167-
if self._size != 0:
168-
percentage = (self._seen_so_far / self._size) * 100
169-
else:
170-
percentage = 0
171-
progress_str = '%d / %d (%.2f%%)\r'\
172-
% (self._seen_so_far, self._size, percentage)
173-
174-
# Write to stdout
175-
sys.stdout.write(progress_str)
176-
sys.stdout.flush()
177-
178-
179-
# DataSink inputs
180137
class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
181-
'''
182-
'''
183-
184-
# Init inputspec data attributes
185138
base_directory = Directory(
186139
desc='Path to the base directory for storing data.')
187140
container = traits.Str(
@@ -193,11 +146,11 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
193146
desc=('List of 2-tuples reflecting string '
194147
'to substitute and string to replace '
195148
'it with'))
196-
regexp_substitutions = \
197-
InputMultiPath(traits.Tuple(traits.Str, traits.Str),
198-
desc=('List of 2-tuples reflecting a pair of a '\
199-
'Python regexp pattern and a replacement '\
200-
'string. Invoked after string `substitutions`'))
149+
regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
150+
desc=('List of 2-tuples reflecting a pair '
151+
'of a Python regexp pattern and a '
152+
'replacement string. Invoked after '
153+
'string `substitutions`'))
201154

202155
_outputs = traits.Dict(traits.Str, value={}, usedefault=True)
203156
remove_dest_dir = traits.Bool(False, usedefault=True,
@@ -218,7 +171,6 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
218171

219172
# Set call-able inputs attributes
220173
def __setattr__(self, key, value):
221-
222174
if key not in self.copyable_trait_names():
223175
if not isdefined(value):
224176
super(DataSinkInputSpec, self).__setattr__(key, value)
@@ -229,14 +181,12 @@ def __setattr__(self, key, value):
229181
super(DataSinkInputSpec, self).__setattr__(key, value)
230182

231183

232-
# DataSink outputs
233184
class DataSinkOutputSpec(TraitedSpec):
234185

235186
# Init out file
236187
out_file = traits.Any(desc='datasink output')
237188

238189

239-
# Custom DataSink class
240190
class DataSink(IOBase):
241191
""" Generic datasink module to store structured outputs
242192
@@ -298,12 +248,9 @@ class DataSink(IOBase):
298248
>>> ds.run() # doctest: +SKIP
299249
300250
"""
301-
302-
# Give obj .inputs and .outputs
303251
input_spec = DataSinkInputSpec
304252
output_spec = DataSinkOutputSpec
305253

306-
# Initialization method to set up datasink
307254
def __init__(self, infields=None, force_run=True, **kwargs):
308255
"""
309256
Parameters
@@ -325,7 +272,6 @@ def __init__(self, infields=None, force_run=True, **kwargs):
325272
if force_run:
326273
self._always_run = True
327274

328-
# Get destination paths
329275
def _get_dst(self, src):
330276
# If path is directory with trailing os.path.sep,
331277
# then remove that for a more robust behavior
@@ -349,7 +295,6 @@ def _get_dst(self, src):
349295
dst = dst[1:]
350296
return dst
351297

352-
# Substitute paths in substitutions dictionary parameter
353298
def _substitute(self, pathstr):
354299
pathstr_ = pathstr
355300
if isdefined(self.inputs.substitutions):
@@ -639,9 +584,6 @@ def _upload_to_s3(self, bucket, src, dst):
639584
def _list_outputs(self):
640585
"""Execute this module.
641586
"""
642-
643-
# Init variables
644-
iflogger = logging.getLogger('interface')
645587
outputs = self.output_spec().get()
646588
out_files = []
647589
# Use hardlink
@@ -706,23 +648,17 @@ def _list_outputs(self):
706648
iflogger.debug("key: %s files: %s" % (key, str(files)))
707649
files = filename_to_list(files)
708650
tempoutdir = outdir
709-
if s3_flag:
710-
s3tempoutdir = s3dir
711651
for d in key.split('.'):
712652
if d[0] == '@':
713653
continue
714654
tempoutdir = os.path.join(tempoutdir, d)
715-
if s3_flag:
716-
s3tempoutdir = os.path.join(s3tempoutdir, d)
717655

718656
# flattening list
719657
if isinstance(files, list):
720658
if isinstance(files[0], list):
721659
files = [item for sublist in files for item in sublist]
722660

723-
# Iterate through passed-in source files
724661
for src in filename_to_list(files):
725-
# Format src and dst files
726662
src = os.path.abspath(src)
727663
if not os.path.isfile(src):
728664
src = os.path.join(src, '')
@@ -749,22 +685,12 @@ def _list_outputs(self):
749685
pass
750686
else:
751687
raise(inst)
752-
# If src is a file, copy it to dst
753-
if os.path.isfile(src):
754-
iflogger.debug('copyfile: %s %s' % (src, dst))
755-
copyfile(src, dst, copy=True, hashmethod='content',
756-
use_hardlink=use_hardlink)
757-
out_files.append(dst)
758-
# If src is a directory, copy entire contents to dst dir
759-
elif os.path.isdir(src):
760-
if os.path.exists(dst) and self.inputs.remove_dest_dir:
761-
iflogger.debug('removing: %s' % dst)
762-
shutil.rmtree(dst)
763-
iflogger.debug('copydir: %s %s' % (src, dst))
764-
copytree(src, dst)
765-
out_files.append(dst)
766-
767-
# Return outputs dictionary
688+
if os.path.exists(dst) and self.inputs.remove_dest_dir:
689+
iflogger.debug("removing: %s" % dst)
690+
shutil.rmtree(dst)
691+
iflogger.debug("copydir: %s %s" % (src, dst))
692+
copytree(src, dst)
693+
out_files.append(dst)
768694
outputs['out_file'] = out_files
769695

770696
return outputs

nipype/interfaces/tests/test_io.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import nipype.interfaces.io as nio
1919
from nipype.interfaces.base import Undefined
2020

21-
# Check for boto
2221
noboto = False
2322
try:
2423
import boto
@@ -113,7 +112,7 @@ def test_selectfiles_valueerror():
113112
yield assert_raises, ValueError, sf.run
114113

115114

116-
@skipif(noboto)
115+
@skip
117116
def test_s3datagrabber_communication():
118117
dg = nio.S3DataGrabber(
119118
infields=['subj_id', 'run_num'], outfields=['func', 'struct'])

nipype/interfaces/utility.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -450,14 +450,14 @@ def _run_interface(self, runtime):
450450
args[name] = value
451451

452452
# Record memory of function_handle
453-
#try:
454-
# import memory_profiler
455-
# proc = (function_handle, (), args)
456-
# mem_mb, out = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
457-
# setattr(runtime, 'cmd_memory', mem_mb[0]/1024.0)
453+
try:
454+
import memory_profiler
455+
proc = (function_handle, (), args)
456+
mem_mb, out = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
457+
setattr(runtime, 'cmd_memory', mem_mb[0]/1024.0)
458458
# If no memory_profiler package, run without recording memory
459-
#except:
460-
out = function_handle(**args)
459+
except ImportError:
460+
out = function_handle(**args)
461461

462462
if len(self._output_names) == 1:
463463
self._out[self._output_names[0]] = out

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -714,8 +714,7 @@ def func1(in1):
714714
# set local check
715715
w1.config['execution'] = {'stop_on_first_crash': 'true',
716716
'local_hash_check': 'true',
717-
'crashdump_dir': wd,
718-
'poll_sleep_duration': 2}
717+
'crashdump_dir': wd}
719718

720719
# test output of num_subnodes method when serial is default (False)
721720
yield assert_equal, n1.num_subnodes(), len(n1.inputs.in1)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import datetime
2+
import logging
3+
4+
def log_nodes_cb(node, status, result=None):
5+
logger = logging.getLogger('callback')
6+
try:
7+
node_mem = result['node_memory']
8+
cmd_mem = result['cmd_memory']
9+
run_seconds = result['run_seconds']
10+
cmd_threads = result['cmd_threads']
11+
except Exception as exc:
12+
node_mem = cmd_mem = run_seconds = cmd_threads = 'N/A'
13+
if status == 'start':
14+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
15+
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
16+
'"' + ',"estimated_memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
17+
+ str(node._interface.num_threads) + '}'
18+
19+
logger.debug(message)
20+
21+
elif status == 'end':
22+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
23+
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) + \
24+
'"' + ',"estimated_memory":' + '"'+ str(node._interface.estimated_memory) + '"'+ \
25+
',"num_threads":' + '"'+ str(node._interface.num_threads) + '"'+ \
26+
',"cmd-level_threads":' + '"'+ str(cmd_threads) + '"'+ \
27+
',"node-level_memory":' + '"'+ str(node_mem) + '"'+ \
28+
',"cmd-level_memory":' + '"'+ str(cmd_mem) + '"' + \
29+
',"run_seconds":' + '"'+ str(run_seconds) + '"'+ '}'
30+
31+
logger.debug(message)
32+
33+
else:
34+
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
35+
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
36+
'"' + ',"estimated_memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
37+
+ str(node._interface.num_threads) + ',"error":"True"}'
38+
39+
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def run_node(node, updatehash, runtime_profile=False):
3535
# Init variables
3636
result = dict(result=None, traceback=None)
3737
runtime_profile = False
38+
3839
# If we're profiling the run
3940
if runtime_profile:
4041
try:

0 commit comments

Comments
 (0)