Skip to content

Commit 9393555

Browse files
committed
Merge pull request #1025 from satra/enh/workflow
Enh/workflow
2 parents e6b46ba + 03a7287 commit 9393555

File tree

6 files changed

+22
-62
lines changed

6 files changed

+22
-62
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Next release
1212
undefined (issue #994)(https://https://github.com/nipy/nipype/pull/996)
1313
* FIX: OpenfMRI support and FSL 5.0.7 changes (https://github.com/nipy/nipype/pull/1006)
1414
* FIX: Output prefix in SPM Normalize with modulation (https://github.com/nipy/nipype/pull/1023)
15+
* ENH: Usability improvements in cluster environments (https://github.com/nipy/nipype/pull/1025)
1516

1617
Release 0.10.0 (October 10, 2014)
1718
============

doc/users/config_file.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ Execution
120120
characters will be replaced by their hash. (possible values: ``true`` and
121121
``false``; default value: ``true``)
122122

123+
*poll_sleep_duration*
124+
This controls how long the job submission loop will sleep between submitting
125+
all pending jobs and checking for job completion. To be nice to cluster
126+
schedulers the default is set to 60 seconds.
127+
123128
Example
124129
~~~~~~~
125130

nipype/algorithms/tests/test_auto_Overlap.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

nipype/pipeline/plugins/base.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -254,16 +254,11 @@ def run(self, graph, config, updatehash=False):
254254
num_jobs = len(self.pending_tasks)
255255
logger.debug('Number of pending tasks: %d' % num_jobs)
256256
if num_jobs < self.max_jobs:
257-
if np.isinf(self.max_jobs):
258-
slots = None
259-
else:
260-
slots = max(0, self.max_jobs - num_jobs)
261-
logger.debug('Slots available: %s' % slots)
262257
self._send_procs_to_workers(updatehash=updatehash,
263-
slots=slots, graph=graph)
258+
graph=graph)
264259
else:
265260
logger.debug('Not submitting')
266-
sleep(2)
261+
sleep(float(self._config['execution']['poll_sleep_duration']))
267262
self._remove_node_dirs()
268263
report_nodes_not_run(notrun)
269264

@@ -324,16 +319,21 @@ def _submit_mapnode(self, jobid):
324319
np.zeros(numnodes, dtype=bool)))
325320
return False
326321

327-
def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
328-
""" Sends jobs to workers using ipython's taskclient interface
322+
def _send_procs_to_workers(self, updatehash=False, graph=None):
323+
""" Sends jobs to workers
329324
"""
330325
while np.any(self.proc_done == False):
326+
num_jobs = len(self.pending_tasks)
327+
if np.isinf(self.max_jobs):
328+
slots = None
329+
else:
330+
slots = max(0, self.max_jobs - num_jobs)
331+
logger.debug('Slots available: %s' % slots)
332+
if (num_jobs >= self.max_jobs) or (slots == 0):
333+
break
331334
# Check to see if a job is available
332335
jobids = np.flatnonzero((self.proc_done == False) &
333336
(self.depidx.sum(axis=0) == 0).__array__())
334-
num_jobs = len(self.pending_tasks)
335-
if num_jobs >= self.max_jobs:
336-
break
337337
if len(jobids) > 0:
338338
# send all available jobs
339339
logger.info('Submitting %d jobs' % len(jobids[:slots]))
@@ -346,8 +346,6 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
346346
self.proc_pending[jobid] = False
347347
continue
348348
if num_subnodes > 1:
349-
if num_subnodes > (self.max_jobs - len(self.pending_tasks)):
350-
break
351349
submit = self._submit_mapnode(jobid)
352350
if not submit:
353351
continue

nipype/pipeline/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,11 +1101,12 @@ def write_workflow_prov(graph, filename=None, format='turtle'):
11011101
subresult = InterfaceResult(result.interface[idx],
11021102
runtime, outputs={})
11031103
if result.inputs:
1104-
subresult.inputs = result.inputs[idx]
1104+
if idx < len(result.inputs):
1105+
subresult.inputs = result.inputs[idx]
11051106
if result.outputs:
11061107
for key, value in result.outputs.items():
11071108
values = getattr(result.outputs, key)
1108-
if isdefined(values):
1109+
if isdefined(values) and idx < len(values):
11091110
subresult.outputs[key] = values[idx]
11101111
sub_bundle = ProvStore().add_results(subresult)
11111112
ps.g = merge_bundles(ps.g, sub_bundle)

nipype/utils/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
stop_on_unknown_version = false
4949
write_provenance = false
5050
parameterize_dirs = true
51+
poll_sleep_duration = 60
5152
5253
[check]
5354
interval = 1209600

0 commit comments

Comments
 (0)