Skip to content

Commit f4b0b73

Browse files
committed
Cleaned up some of the code to PEP8 and checked for errors
1 parent 250b6d3 commit f4b0b73

File tree

13 files changed

+179
-94
lines changed

13 files changed

+179
-94
lines changed

nipype/interfaces/afni/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
from .base import Info
1010
from .preprocess import (To3D, Refit, Resample, TStat, Automask, Volreg, Merge,
11-
ZCutUp, Calc, TShift, Warp, Detrend, Despike,
12-
Copy, Fourier, Allineate, Maskave, SkullStrip, TCat, Fim,
11+
ZCutUp, Calc, TShift, Warp, Detrend, Despike, Copy,
12+
Fourier, Allineate, Maskave, SkullStrip, TCat, Fim,
1313
BlurInMask, Autobox, TCorrMap, Bandpass, Retroicor,
1414
TCorrelate, TCorr1D, BrickStat, ROIStats, AutoTcorrelate,
1515
AFNItoNIFTI, Eval, Means)

nipype/interfaces/afni/preprocess.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ class RefitInputSpec(CommandLineInputSpec):
180180
' template type, e.g. TLRC, MNI, ORIG')
181181

182182

183-
184183
class Refit(CommandLine):
185184
"""Changes some of the information inside a 3D dataset's header
186185

nipype/interfaces/base.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,6 @@ def run(self, **inputs):
10321032
self._check_mandatory_inputs()
10331033
self._check_version_requirements(self.inputs)
10341034
interface = self.__class__
1035-
10361035
# initialize provenance tracking
10371036
env = deepcopy(dict(os.environ))
10381037
runtime = Bunch(cwd=os.getcwd(),
@@ -1207,8 +1206,18 @@ def _read(self, drain):
12071206

12081207
# Get number of threads for process
12091208
def _get_num_threads(proc):
1210-
'''
1211-
'''
1209+
"""Function to get the number of threads a process is using
1210+
1211+
Parameters
1212+
----------
1213+
proc : psutil.Process instance
1214+
the process to evaluate thead usage of
1215+
1216+
Returns
1217+
-------
1218+
num_threads : int
1219+
the number of threads that the process is using
1220+
"""
12121221

12131222
# Import packages
12141223
import psutil
@@ -1223,14 +1232,32 @@ def _get_num_threads(proc):
12231232
except psutil.NoSuchProcess:
12241233
pass
12251234

1235+
# Return the number of threads found
12261236
return num_threads
12271237

12281238

12291239
# Get max resources used for process
12301240
def _get_max_resources_used(proc, mem_mb, num_threads, poll=False):
1231-
'''
1232-
docstring
1233-
'''
1241+
"""Function to get the RAM and threads usage of a process
1242+
1243+
Paramters
1244+
---------
1245+
proc : subprocess.Popen instance
1246+
the process to profile
1247+
mem_mb : float
1248+
the high memory watermark so far during process execution (in MB)
1249+
num_threads: int
1250+
the high thread watermark so far during process execution
1251+
poll : boolean
1252+
whether to poll the process or not
1253+
1254+
Returns
1255+
-------
1256+
mem_mb : float
1257+
the new high memory watermark of process (MB)
1258+
num_threads : float
1259+
the new high thread watermark of process
1260+
"""
12341261

12351262
# Import packages
12361263
from memory_profiler import _get_memory
@@ -1264,9 +1291,8 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12641291
import psutil
12651292
runtime_profile = True
12661293
except ImportError as exc:
1267-
logger.info('Unable to import packages needed for runtime '\
1268-
'profiling. Turning off runtime profiler.\n'\
1269-
'Error: %s' % exc)
1294+
logger.info('Unable to import packages needed for runtime profiling. '\
1295+
'Turning off runtime profiler.\nError: %s' % exc)
12701296
runtime_profile = False
12711297

12721298
# Init variables
@@ -1305,7 +1331,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
13051331
# Init variables for memory profiling
13061332
mem_mb = -1
13071333
num_threads = -1
1308-
interval = 1
13091334

13101335
if output == 'stream':
13111336
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# UseResources inputspec
1515
class UseResourcesInputSpec(CommandLineInputSpec):
1616
'''
17+
use_resources cmd interface inputspec
1718
'''
1819

1920
# Init attributes
@@ -26,6 +27,7 @@ class UseResourcesInputSpec(CommandLineInputSpec):
2627
# UseResources interface
2728
class UseResources(CommandLine):
2829
'''
30+
use_resources cmd interface
2931
'''
3032

3133
# Import packages

nipype/interfaces/tests/use_resources

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def use_gb_ram(num_gb):
1919
# Eat 1 GB of memory for 1 second
2020
gb_str = ' ' * int(num_gb*1024.0**3)
2121

22+
# Spin CPU
2223
ctr = 0
2324
while ctr < 100e6:
2425
ctr+= 1

nipype/pipeline/engine/nodes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252

5353
from ... import config, logging
5454
logger = logging.getLogger('workflow')
55-
5655
from ...interfaces.base import (traits, InputMultiPath, CommandLine,
5756
Undefined, TraitedSpec, DynamicTraitedSpec,
5857
Bunch, InterfaceResult, md5, Interface,

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ def func1(in1):
715715
w1.config['execution'] = {'stop_on_first_crash': 'true',
716716
'local_hash_check': 'true',
717717
'crashdump_dir': wd,
718-
'poll_sleep_duration' : 2}
718+
'poll_sleep_duration': 2}
719719

720720
# test output of num_subnodes method when serial is default (False)
721721
yield assert_equal, n1.num_subnodes(), len(n1.inputs.in1)

nipype/pipeline/plugins/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import numpy as np
2121
import scipy.sparse as ssp
2222

23+
2324
from ...utils.filemanip import savepkl, loadpkl
2425
from ...utils.misc import str2bool
2526
from ..engine.utils import (nx, dfs_preorder, topological_sort)

nipype/pipeline/plugins/callback_log.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
2+
# vi: set ft=python sts=4 ts=4 sw=4 et:
3+
"""Callback logger for recording workflow and node run stats
4+
"""
5+
6+
# Import packages
17
import datetime
28
import logging
39

10+
# Log node stats function
411
def log_nodes_cb(node, status, result=None):
5-
'''
6-
'''
12+
"""Function to record node run statistics to a log file as json
13+
dictionaries
14+
"""
715

816
# Init variables
917
logger = logging.getLogger('callback')

nipype/pipeline/plugins/multiproc.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from multiprocessing import Process, Pool, cpu_count, pool
1111
from traceback import format_exception
1212
import sys
13+
1314
import numpy as np
1415
from copy import deepcopy
1516
from ..engine import MapNode
@@ -24,7 +25,20 @@
2425

2526
# Run node
2627
def run_node(node, updatehash):
27-
"""docstring
28+
"""Function to execute node.run(), catch and log any errors and
29+
return the result dictionary
30+
31+
Parameters
32+
----------
33+
node : nipype Node instance
34+
the node to run
35+
updatehash : boolean
36+
flag for updating hash
37+
38+
Returns
39+
-------
40+
result : dictionary
41+
dictionary containing the node runtime results and stats
2842
"""
2943

3044
# Import packages
@@ -45,7 +59,7 @@ def run_node(node, updatehash):
4559
result['runtime_threads'] = retval.runtime.get('runtime_threads')
4660
except:
4761
etype, eval, etr = sys.exc_info()
48-
result['traceback'] = format_exception(etype,eval,etr)
62+
result['traceback'] = format_exception(etype, eval, etr)
4963
result['result'] = node.result
5064

5165
# Return the result dictionary
@@ -125,15 +139,13 @@ def _wait(self):
125139
semaphore_singleton.semaphore.acquire()
126140
semaphore_singleton.semaphore.release()
127141

128-
129142
def _get_result(self, taskid):
130143
if taskid not in self._taskresult:
131144
raise RuntimeError('Multiproc task %d not found' % taskid)
132145
if not self._taskresult[taskid].ready():
133146
return None
134147
return self._taskresult[taskid].get()
135148

136-
137149
def _report_crash(self, node, result=None):
138150
if result and result['traceback']:
139151
node._result = result['result']
@@ -167,7 +179,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
167179
executing_now = []
168180

169181
# Check to see if a job is available
170-
jobids = np.flatnonzero((self.proc_pending == True) & (self.depidx.sum(axis=0) == 0).__array__())
182+
jobids = np.flatnonzero((self.proc_pending == True) & \
183+
(self.depidx.sum(axis=0) == 0).__array__())
171184

172185
#check available system resources by summing all threads and memory used
173186
busy_memory = 0
@@ -181,22 +194,29 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
181194

182195

183196
#check all jobs without dependency not run
184-
jobids = np.flatnonzero((self.proc_done == False) & (self.depidx.sum(axis=0) == 0).__array__())
197+
jobids = np.flatnonzero((self.proc_done == False) & \
198+
(self.depidx.sum(axis=0) == 0).__array__())
185199

186200

187201
#sort jobs ready to run first by memory and then by number of threads
188202
#The most resource consuming jobs run first
189-
jobids = sorted(jobids, key=lambda item: (self.procs[item]._interface.estimated_memory, self.procs[item]._interface.num_threads))
203+
jobids = sorted(jobids,
204+
key=lambda item: (self.procs[item]._interface.estimated_memory,
205+
self.procs[item]._interface.num_threads))
190206

191-
logger.debug('Free memory: %d, Free processors: %d', free_memory, free_processors)
207+
logger.debug('Free memory: %d, Free processors: %d',
208+
free_memory, free_processors)
192209

193210

194211
#while have enough memory and processors for first job
195212
#submit first job on the list
196213
for jobid in jobids:
197-
logger.debug('Next Job: %d, memory: %d, threads: %d' %(jobid, self.procs[jobid]._interface.estimated_memory, self.procs[jobid]._interface.num_threads))
214+
logger.debug('Next Job: %d, memory: %d, threads: %d' \
215+
% (jobid, self.procs[jobid]._interface.estimated_memory,
216+
self.procs[jobid]._interface.num_threads))
198217

199-
if self.procs[jobid]._interface.estimated_memory <= free_memory and self.procs[jobid]._interface.num_threads <= free_processors:
218+
if self.procs[jobid]._interface.estimated_memory <= free_memory and \
219+
self.procs[jobid]._interface.num_threads <= free_processors:
200220
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
201221
executing_now.append(self.procs[jobid])
202222

@@ -228,7 +248,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
228248
hash_exists, _, _, _ = self.procs[
229249
jobid].hash_exists()
230250
logger.debug('Hash exists %s' % str(hash_exists))
231-
if (hash_exists and (self.procs[jobid].overwrite == False or (self.procs[jobid].overwrite == None and not self.procs[jobid]._interface.always_run))):
251+
if (hash_exists and (self.procs[jobid].overwrite == False or \
252+
(self.procs[jobid].overwrite == None and \
253+
not self.procs[jobid]._interface.always_run))):
232254
self._task_finished_cb(jobid)
233255
self._remove_node_dirs()
234256
continue
@@ -239,7 +261,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
239261
logger.debug('Finished checking hash')
240262

241263
if self.procs[jobid].run_without_submitting:
242-
logger.debug('Running node %s on master thread' %self.procs[jobid])
264+
logger.debug('Running node %s on master thread' \
265+
% self.procs[jobid])
243266
try:
244267
self.procs[jobid].run()
245268
except Exception:
@@ -249,7 +272,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
249272

250273
else:
251274
logger.debug('submitting %s' % str(jobid))
252-
tid = self._submit_job(deepcopy(self.procs[jobid]), updatehash=updatehash)
275+
tid = self._submit_job(deepcopy(self.procs[jobid]),
276+
updatehash=updatehash)
253277
if tid is None:
254278
self.proc_done[jobid] = False
255279
self.proc_pending[jobid] = False

0 commit comments

Comments
 (0)