Skip to content

Commit 699df35

Browse files
committed
Merge pull request #728 from BRAINSia/EnhanceQSubProcessing
ENH: Large SGE jobs were overloading on qstat
2 parents d5aeeaa + e4dca02 commit 699df35

File tree

2 files changed

+346
-31
lines changed

2 files changed

+346
-31
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ def _submit_mapnode(self, jobid):
305305
self.mapnodesubids[self.depidx.shape[0] + i] = jobid
306306
self.procs.extend(mapnodesubids)
307307
self.depidx = ssp.vstack((self.depidx,
308-
ssp.lil_matrix(np.zeros((numnodes,
309-
self.depidx.shape[1])))),
308+
ssp.lil_matrix(np.zeros(
309+
(numnodes, self.depidx.shape[1])))),
310310
'lil')
311311
self.depidx = ssp.hstack((self.depidx,
312312
ssp.lil_matrix(
@@ -351,16 +351,19 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
351351
if self._status_callback:
352352
self._status_callback(self.procs[jobid], 'start')
353353
continue_with_submission = True
354-
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
354+
if str2bool(self.procs[jobid].config['execution']
355+
['local_hash_check']):
355356
logger.debug('checking hash locally')
356357
try:
357358
hash_exists, _, _, _ = self.procs[
358359
jobid].hash_exists()
359360
logger.debug('Hash exists %s' % str(hash_exists))
360361
if (hash_exists and
361-
(self.procs[jobid].overwrite == False or
362-
(self.procs[jobid].overwrite == None and
363-
not self.procs[jobid]._interface.always_run))):
362+
(self.procs[jobid].overwrite == False or
363+
(self.procs[jobid].overwrite == None and
364+
not self.procs[jobid]._interface.always_run)
365+
)
366+
):
364367
continue_with_submission = False
365368
self._task_finished_cb(jobid)
366369
self._remove_node_dirs()
@@ -438,7 +441,8 @@ def _remove_node_dirs(self):
438441
"""Removes directories whose outputs have already been used up
439442
"""
440443
if str2bool(self._config['execution']['remove_node_directories']):
441-
for idx in np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]:
444+
for idx in np.nonzero(
445+
(self.refidx.sum(axis=1) == 0).__array__())[0]:
442446
if idx in self.mapnodesubids:
443447
continue
444448
if self.proc_done[idx] and (not self.proc_pending[idx]):
@@ -508,9 +512,13 @@ def _get_result(self, taskid):
508512
'traceback': None}
509513
results_file = None
510514
try:
511-
raise IOError(('Job (%s) finished or terminated, but results file '
512-
'does not exist. Batch dir contains crashdump '
513-
'file if node raised an exception' % node_dir))
515+
error_message = ('Job id ({0}) finished or terminated, but '
516+
'results file does not exist after ({1}) '
517+
'seconds. Batch dir contains crashdump file '
518+
'if node raised an exception.\n'
519+
'Node working directory: ({2}) '.format(
520+
taskid,timeout,node_dir) )
521+
raise IOError(error_message)
514522
except IOError, e:
515523
result_data['traceback'] = format_exc()
516524
else:
@@ -584,13 +592,17 @@ def _get_args(self, node, keywords):
584592
value = getattr(self, "_" + keyword)
585593
if keyword == "template" and os.path.isfile(value):
586594
value = open(value).read()
587-
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
588-
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
595+
if (hasattr(node, "plugin_args") and
596+
isinstance(node.plugin_args, dict) and
597+
keyword in node.plugin_args):
598+
if (keyword == "template" and
599+
os.path.isfile(node.plugin_args[keyword])):
589600
tmp_value = open(node.plugin_args[keyword]).read()
590601
else:
591602
tmp_value = node.plugin_args[keyword]
592603

593-
if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
604+
if ('overwrite' in node.plugin_args and
605+
node.plugin_args['overwrite']):
594606
value = tmp_value
595607
else:
596608
value += tmp_value

0 commit comments

Comments
 (0)