Skip to content

Commit ac19d23

Browse files
committed
address @effigies' comments
1 parent 983ac37 commit ac19d23

File tree

19 files changed

+110
-83
lines changed

19 files changed

+110
-83
lines changed

docker/files/run_examples.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ exit_code=$?
3333

3434
# Collect crashfiles and generate xml report
3535
coverage xml -o ${WORKDIR}/tests/smoketest_${example_id}.xml
36-
find /work -name "crash-*" -maxdepth 1 -exec mv {} ${WORKDIR}/crashfiles/ \;
36+
find /work -maxdepth 1 -name "crash-*" -exec mv {} ${WORKDIR}/crashfiles/ \;
3737
exit $exit_code
3838

docker/files/run_pytests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ if [[ "${PYTHON_VERSION}" -ge "30" ]]; then
3838
fi
3939

4040
# Collect crashfiles
41-
find ${WORKDIR} -name "crash-*" -maxdepth 1 -exec mv {} ${WORKDIR}/crashfiles/ \;
41+
find ${WORKDIR} -maxdepth 1 -name "crash-*" -exec mv {} ${WORKDIR}/crashfiles/ \;
4242

4343
echo "Unit tests finished with exit code ${exit_code}"
4444
exit ${exit_code}

nipype/interfaces/base.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -751,13 +751,26 @@ class BaseInterface(Interface):
751751
752752
This class cannot be instantiated.
753753
754+
755+
Relevant Interface attributes
756+
-----------------------------
757+
758+
``input_spec`` points to the traited class for the inputs
759+
``output_spec`` points to the traited class for the outputs
760+
``_redirect_x`` should be set to ``True`` when the interface requires
761+
connecting to a ``$DISPLAY`` (default is ``False``).
762+
``resource_monitor`` if ``False`` prevents resource-monitoring this
763+
interface, if ``True`` monitoring will be enabled IFF the general
764+
Nipype config is set on (``resource_monitor = true``).
765+
766+
754767
"""
755768
input_spec = BaseInterfaceInputSpec
756769
_version = None
757770
_additional_metadata = []
758771
_redirect_x = False
759772
references_ = []
760-
resource_monitor = True
773+
resource_monitor = True # Enabled for this interface IFF enabled in the config
761774

762775
def __init__(self, from_file=None, resource_monitor=None, **inputs):
763776
if not self.input_spec:
@@ -1133,7 +1146,7 @@ def run(self, **inputs):
11331146

11341147
# Read .prof file in and set runtime values
11351148
vals = np.loadtxt(mon_fname, delimiter=',')
1136-
if vals.tolist():
1149+
if vals.size:
11371150
vals = np.atleast_2d(vals)
11381151
_, mem_peak_mb, nthreads = vals.max(0).astype(float).tolist()
11391152
runtime.mem_peak_gb = mem_peak_mb / 1024
@@ -1310,7 +1323,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
13101323
"""
13111324

13121325
# Init variables
1313-
PIPE = sp.PIPE
13141326
cmdline = runtime.cmdline
13151327

13161328
if redirect_x:
@@ -1338,8 +1350,8 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
13381350
env=env)
13391351
else:
13401352
proc = sp.Popen(cmdline,
1341-
stdout=PIPE,
1342-
stderr=PIPE,
1353+
stdout=sp.PIPE,
1354+
stderr=sp.PIPE,
13431355
shell=True,
13441356
cwd=runtime.cwd,
13451357
env=env)
@@ -1414,17 +1426,16 @@ def get_dependencies(name, environ):
14141426
Uses otool on darwin, ldd on linux. Currently doesn't support windows.
14151427
14161428
"""
1417-
PIPE = sp.PIPE
14181429
if sys.platform == 'darwin':
14191430
proc = sp.Popen('otool -L `which %s`' % name,
1420-
stdout=PIPE,
1421-
stderr=PIPE,
1431+
stdout=sp.PIPE,
1432+
stderr=sp.PIPE,
14221433
shell=True,
14231434
env=environ)
14241435
elif 'linux' in sys.platform:
14251436
proc = sp.Popen('ldd `which %s`' % name,
1426-
stdout=PIPE,
1427-
stderr=PIPE,
1437+
stdout=sp.PIPE,
1438+
stderr=sp.PIPE,
14281439
shell=True,
14291440
env=environ)
14301441
else:

nipype/pipeline/engine/nodes.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
logger = logging.getLogger('workflow')
5757

58+
5859
class Node(EngineBase):
5960
"""Wraps interface objects for use in pipeline
6061
@@ -78,7 +79,7 @@ class Node(EngineBase):
7879

7980
def __init__(self, interface, name, iterables=None, itersource=None,
8081
synchronize=False, overwrite=None, needed_outputs=None,
81-
run_without_submitting=False, n_procs=1, mem_gb=0.25,
82+
run_without_submitting=False, n_procs=1, mem_gb=0.20,
8283
**kwargs):
8384
"""
8485
Parameters
@@ -200,6 +201,26 @@ def outputs(self):
200201
"""Return the output fields of the underlying interface"""
201202
return self._interface._outputs()
202203

204+
@property
205+
def mem_gb(self):
206+
"""Get estimated memory (GB)"""
207+
if hasattr(self._interface, 'estimated_memory_gb'):
208+
self._mem_gb = self._interface.estimated_memory_gb
209+
logger.warning('Setting "estimated_memory_gb" on Interfaces has been '
210+
'deprecated as of nipype 1.0, please use Node.mem_gb.')
211+
del self._interface.estimated_memory_gb
212+
return self._mem_gb
213+
214+
@property
215+
def n_procs(self):
216+
"""Get estimated number of processes"""
217+
if hasattr(self._interface, 'num_threads'):
218+
self._n_procs = self._interface.num_threads
219+
logger.warning('Setting "num_threads" on Interfaces has been '
220+
'deprecated as of nipype 1.0, please use Node.n_procs')
221+
del self._interface.num_threads
222+
return self._n_procs
223+
203224
def output_dir(self):
204225
"""Return the location of the output directory for the node"""
205226
if self.base_dir is None:
@@ -685,24 +706,6 @@ def _copyfiles_to_wd(self, outdir, execute, linksonly=False):
685706
if execute and linksonly:
686707
rmtree(outdir)
687708

688-
def get_mem_gb(self):
689-
"""Get estimated memory (GB)"""
690-
if hasattr(self._interface, 'estimated_memory_gb'):
691-
self._mem_gb = self._interface.estimated_memory_gb
692-
logger.warning('Setting "estimated_memory_gb" on Interfaces has been '
693-
'deprecated as of nipype 1.0')
694-
del self._interface.estimated_memory_gb
695-
return self._mem_gb
696-
697-
def get_n_procs(self):
698-
"""Get estimated number of processes"""
699-
if hasattr(self._interface, 'num_threads'):
700-
self._n_procs = self._interface.num_threads
701-
logger.warning('Setting "num_threads" on Interfaces has been '
702-
'deprecated as of nipype 1.0')
703-
del self._interface.num_threads
704-
return self._n_procs
705-
706709
def update(self, **opts):
707710
self.inputs.update(**opts)
708711

@@ -1129,8 +1132,8 @@ def _make_nodes(self, cwd=None):
11291132
for i in range(nitems):
11301133
nodename = '_' + self.name + str(i)
11311134
node = Node(deepcopy(self._interface),
1132-
n_procs=self.get_n_procs(),
1133-
mem_gb=self.get_mem_gb(),
1135+
n_procs=self.n_procs,
1136+
mem_gb=self.mem_gb,
11341137
overwrite=self.overwrite,
11351138
needed_outputs=self.needed_outputs,
11361139
run_without_submitting=self.run_without_submitting,

nipype/pipeline/plugins/base.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,28 @@ class PluginBase(object):
3131
"""
3232
Base class for plugins
3333
34-
Execution plugin API
35-
====================
36-
37-
Current status::
38-
39-
class plugin_runner(PluginBase):
40-
41-
def run(graph, config, updatehash)
42-
4334
"""
4435

4536
def __init__(self, plugin_args=None):
4637
if plugin_args is None:
4738
plugin_args = {}
4839
self.plugin_args = plugin_args
4940
self._config = None
50-
5141
self._status_callback = plugin_args.get('status_callback')
52-
return
5342

5443
def run(self, graph, config, updatehash=False):
44+
"""
45+
The core plugin member that should be implemented by
46+
all plugins.
47+
48+
graph: a networkx, flattened :abbr:`DAG (Directed Acyclic Graph)`
49+
to be executed
50+
51+
config: a nipype.config object
52+
53+
updatehash:
54+
55+
"""
5556
raise NotImplementedError
5657

5758

@@ -63,9 +64,9 @@ def __init__(self, plugin_args=None):
6364
"""Initialize runtime attributes to none
6465
6566
procs: list (N) of underlying interface elements to be processed
66-
proc_done: a boolean numpy array (N) signifying whether a process has been
67+
proc_done: a boolean numpy array (N,) signifying whether a process has been
6768
executed
68-
proc_pending: a boolean numpy array (N) signifying whether a
69+
proc_pending: a boolean numpy array (N,) signifying whether a
6970
process is currently running. Note: A process is finished only when
7071
both proc_done==True and
7172
proc_pending==False
@@ -84,7 +85,7 @@ def __init__(self, plugin_args=None):
8485
self.max_jobs = self.plugin_args.get('max_jobs', np.inf)
8586

8687
def _prerun_check(self, graph):
87-
"""Stub."""
88+
"""Stub method to validate/massage graph and nodes before running"""
8889

8990
def run(self, graph, config, updatehash=False):
9091
"""
@@ -227,9 +228,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
227228
logger.debug('Slots available: %s' % slots)
228229
if (num_jobs >= self.max_jobs) or (slots == 0):
229230
break
230-
# Check to see if a job is available
231-
jobids = np.flatnonzero(
232-
~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__())
231+
232+
# Check to see if a job is available (jobs without dependencies not run)
233+
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
234+
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
233235

234236
if len(jobids) > 0:
235237
# send all available jobs

nipype/pipeline/plugins/condor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
from ...interfaces.base import CommandLine
1010
from ... import logging
11-
from .base import (SGELikeBatchManagerBase, logger, logging)
11+
from .base import SGELikeBatchManagerBase, logger
1212
iflogger = logging.getLogger('interface')
1313

14+
1415
class CondorPlugin(SGELikeBatchManagerBase):
1516
"""Execute using Condor
1617

nipype/pipeline/plugins/dagman.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import time
1111
from warnings import warn
1212

13-
from .base import (GraphPluginBase, logger)
13+
from .base import GraphPluginBase, logger
1414
from ...interfaces.base import CommandLine
1515

1616

nipype/pipeline/plugins/lsf.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
import re
88
from time import sleep
99

10-
from .base import (SGELikeBatchManagerBase, logger, logging)
1110
from ... import logging
1211
from ...interfaces.base import CommandLine
12+
from .base import SGELikeBatchManagerBase, logger
1313
iflogger = logging.getLogger('interface')
1414

15+
1516
class LSFPlugin(SGELikeBatchManagerBase):
1617
"""Execute using LSF Cluster Submission
1718

nipype/pipeline/plugins/multiproc.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from copy import deepcopy
1717
import numpy as np
1818

19-
from ... import logging, config
19+
from ... import logging
2020
from ...utils.misc import str2bool
2121
from ...utils.profiler import get_system_total_memory_gb
2222
from ..engine import MapNode
@@ -155,8 +155,8 @@ def _prerun_check(self, graph):
155155
tasks_mem_gb = []
156156
tasks_num_th = []
157157
for node in graph.nodes():
158-
tasks_mem_gb.append(node.get_mem_gb())
159-
tasks_num_th.append(node.get_n_procs())
158+
tasks_mem_gb.append(node.mem_gb)
159+
tasks_num_th.append(node.n_procs)
160160

161161
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
162162
logger.warning(
@@ -179,8 +179,8 @@ def _check_resources(self, running_tasks):
179179
free_memory_gb = self.memory_gb
180180
free_processors = self.processors
181181
for _, jobid in running_tasks:
182-
free_memory_gb -= min(self.procs[jobid].get_mem_gb(), self.memory_gb)
183-
free_processors -= min(self.procs[jobid].get_n_procs(), self.processors)
182+
free_memory_gb -= min(self.procs[jobid].get_mem_gb(), free_memory_gb)
183+
free_processors -= min(self.procs[jobid].get_n_procs(), free_processors)
184184

185185
return free_memory_gb, free_processors
186186

@@ -189,29 +189,30 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
189189
Sends jobs to workers when system resources are available.
190190
"""
191191

192-
# Check all jobs without dependency not run
192+
# Check to see if a job is available (jobs without dependencies not run)
193+
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
194+
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
193195
jobids = np.flatnonzero(
194196
~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__())
195197

196198
# Check available system resources by summing all threads and memory used
197199
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
198200

199201
logger.info('Currently running %d tasks, and %d jobs ready. '
200-
'Free memory (GB): %0.2f/%0.2f, Free processors: %d/%d',
201-
len(self.pending_tasks), len(jobids),
202-
free_memory_gb, self.memory_gb, free_processors, self.processors)
202+
'Free memory (GB): %0.2f/%0.2f, Free processors: %d/%d',
203+
len(self.pending_tasks), len(jobids),
204+
free_memory_gb, self.memory_gb, free_processors, self.processors)
203205

204-
205-
if (len(jobids) + len(self.pending_tasks)) == 0:
206+
if len(jobids) + len(self.pending_tasks) == 0:
206207
logger.debug('No tasks are being run, and no jobs can '
207208
'be submitted to the queue. Potential deadlock')
208209
return
209210

210211
# Sort jobs ready to run first by memory and then by number of threads
211212
# The most resource consuming jobs run first
212213
# jobids = sorted(jobids,
213-
# key=lambda item: (self.procs[item]._get_mem_gb(),
214-
# self.procs[item]._get_n_procs()))
214+
# key=lambda item: (self.procs[item]._mem_gb,
215+
# self.procs[item]._n_procs))
215216

216217
# While have enough memory and processors for first job
217218
# Submit first job on the list
@@ -232,8 +233,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232233
continue
233234

234235
# Check requirements of this job
235-
next_job_gb = min(self.procs[jobid].get_mem_gb(), self.memory_gb)
236-
next_job_th = min(self.procs[jobid].get_n_procs(), self.processors)
236+
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
237+
next_job_th = min(self.procs[jobid].n_procs, self.processors)
237238

238239
# If node does not fit, skip at this moment
239240
if next_job_th > free_processors or next_job_gb > free_memory_gb:
@@ -257,8 +258,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
257258
hash_exists, _, _, _ = self.procs[jobid].hash_exists()
258259
overwrite = self.procs[jobid].overwrite
259260
always_run = self.procs[jobid]._interface.always_run
260-
if (hash_exists and (overwrite is False or
261-
(overwrite is None and not always_run))):
261+
if hash_exists and (overwrite is False or
262+
overwrite is None and not always_run):
262263
logger.debug('Skipping cached node %s with ID %s.',
263264
self.procs[jobid]._id, jobid)
264265
self._task_finished_cb(jobid)

nipype/pipeline/plugins/oar.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
import subprocess
1111
import simplejson as json
1212

13-
from .base import (SGELikeBatchManagerBase, logger, logging)
1413
from ... import logging
1514
from ...interfaces.base import CommandLine
15+
from .base import SGELikeBatchManagerBase, logger
1616
iflogger = logging.getLogger('interface')
1717

1818
class OARPlugin(SGELikeBatchManagerBase):

0 commit comments

Comments
 (0)