Skip to content

Commit 1bccef7

Browse files
committed
refactoring multiproc to fix deadlock
1 parent 8e710fa commit 1bccef7

File tree

5 files changed

+237
-347
lines changed

5 files changed

+237
-347
lines changed

nipype/pipeline/plugins/API.rst

Lines changed: 0 additions & 8 deletions
This file was deleted.

nipype/pipeline/plugins/base.py

Lines changed: 28 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -9,188 +9,44 @@
99
from copy import deepcopy
1010
from glob import glob
1111
import os
12-
import getpass
1312
import shutil
14-
from socket import gethostname
1513
import sys
16-
import uuid
17-
from time import strftime, sleep, time
18-
from traceback import format_exception, format_exc
14+
from time import sleep, time
15+
from traceback import format_exc
1916

2017
import numpy as np
2118
import scipy.sparse as ssp
2219

23-
2420
from ... import logging
25-
from ...utils.filemanip import savepkl, loadpkl, crash2txt
21+
from ...utils.filemanip import loadpkl
2622
from ...utils.misc import str2bool
2723
from ..engine.utils import (nx, dfs_preorder, topological_sort)
2824
from ..engine import MapNode
29-
25+
from .tools import report_crash, report_nodes_not_run, create_pyscript
3026

3127
logger = logging.getLogger('workflow')
32-
iflogger = logging.getLogger('interface')
3328

3429

35-
def report_crash(node, traceback=None, hostname=None):
36-
"""Writes crash related information to a file
37-
"""
38-
name = node._id
39-
if node.result and hasattr(node.result, 'runtime') and \
40-
node.result.runtime:
41-
if isinstance(node.result.runtime, list):
42-
host = node.result.runtime[0].hostname
43-
else:
44-
host = node.result.runtime.hostname
45-
else:
46-
if hostname:
47-
host = hostname
48-
else:
49-
host = gethostname()
50-
message = ['Node %s failed to run on host %s.' % (name,
51-
host)]
52-
logger.error(message)
53-
if not traceback:
54-
exc_type, exc_value, exc_traceback = sys.exc_info()
55-
traceback = format_exception(exc_type,
56-
exc_value,
57-
exc_traceback)
58-
timeofcrash = strftime('%Y%m%d-%H%M%S')
59-
login_name = getpass.getuser()
60-
crashfile = 'crash-%s-%s-%s-%s' % (timeofcrash,
61-
login_name,
62-
name,
63-
str(uuid.uuid4()))
64-
crashdir = node.config['execution']['crashdump_dir']
65-
if crashdir is None:
66-
crashdir = os.getcwd()
67-
if not os.path.exists(crashdir):
68-
os.makedirs(crashdir)
69-
crashfile = os.path.join(crashdir, crashfile)
70-
if node.config['execution']['crashfile_format'].lower() in ['text', 'txt']:
71-
crashfile += '.txt'
72-
else:
73-
crashfile += '.pklz'
74-
logger.info('Saving crash info to %s' % crashfile)
75-
logger.info(''.join(traceback))
76-
if node.config['execution']['crashfile_format'].lower() in ['text', 'txt']:
77-
crash2txt(crashfile, dict(node=node, traceback=traceback))
78-
else:
79-
savepkl(crashfile, dict(node=node, traceback=traceback))
80-
return crashfile
81-
82-
83-
def report_nodes_not_run(notrun):
84-
"""List nodes that crashed with crashfile info
85-
86-
Optionally displays dependent nodes that weren't executed as a result of
87-
the crash.
30+
class PluginBase(object):
8831
"""
89-
if notrun:
90-
logger.info("***********************************")
91-
for info in notrun:
92-
logger.error("could not run node: %s" %
93-
'.'.join((info['node']._hierarchy,
94-
info['node']._id)))
95-
logger.info("crashfile: %s" % info['crashfile'])
96-
logger.debug("The following dependent nodes were not run")
97-
for subnode in info['dependents']:
98-
logger.debug(subnode._id)
99-
logger.info("***********************************")
100-
raise RuntimeError(('Workflow did not execute cleanly. '
101-
'Check log for details'))
102-
103-
104-
def create_pyscript(node, updatehash=False, store_exception=True):
105-
# pickle node
106-
timestamp = strftime('%Y%m%d_%H%M%S')
107-
if node._hierarchy:
108-
suffix = '%s_%s_%s' % (timestamp, node._hierarchy, node._id)
109-
batch_dir = os.path.join(node.base_dir,
110-
node._hierarchy.split('.')[0],
111-
'batch')
112-
else:
113-
suffix = '%s_%s' % (timestamp, node._id)
114-
batch_dir = os.path.join(node.base_dir, 'batch')
115-
if not os.path.exists(batch_dir):
116-
os.makedirs(batch_dir)
117-
pkl_file = os.path.join(batch_dir, 'node_%s.pklz' % suffix)
118-
savepkl(pkl_file, dict(node=node, updatehash=updatehash))
119-
mpl_backend = node.config["execution"]["matplotlib_backend"]
120-
# create python script to load and trap exception
121-
cmdstr = """import os
122-
import sys
32+
Base class for plugins
12333
124-
can_import_matplotlib = True #Silently allow matplotlib to be ignored
125-
try:
126-
import matplotlib
127-
matplotlib.use('%s')
128-
except ImportError:
129-
can_import_matplotlib = False
130-
pass
131-
132-
from nipype import config, logging
133-
from nipype.utils.filemanip import loadpkl, savepkl
134-
from socket import gethostname
135-
from traceback import format_exception
136-
info = None
137-
pklfile = '%s'
138-
batchdir = '%s'
139-
from nipype.utils.filemanip import loadpkl, savepkl
140-
try:
141-
if not sys.version_info < (2, 7):
142-
from collections import OrderedDict
143-
config_dict=%s
144-
config.update_config(config_dict)
145-
## Only configure matplotlib if it was successfully imported,
146-
## matplotlib is an optional component to nipype
147-
if can_import_matplotlib:
148-
config.update_matplotlib()
149-
logging.update_logging(config)
150-
traceback=None
151-
cwd = os.getcwd()
152-
info = loadpkl(pklfile)
153-
result = info['node'].run(updatehash=info['updatehash'])
154-
except Exception as e:
155-
etype, eval, etr = sys.exc_info()
156-
traceback = format_exception(etype,eval,etr)
157-
if info is None or not os.path.exists(info['node'].output_dir()):
158-
result = None
159-
resultsfile = os.path.join(batchdir, 'crashdump_%s.pklz')
160-
else:
161-
result = info['node'].result
162-
resultsfile = os.path.join(info['node'].output_dir(),
163-
'result_%%s.pklz'%%info['node'].name)
164-
"""
165-
if store_exception:
166-
cmdstr += """
167-
savepkl(resultsfile, dict(result=result, hostname=gethostname(),
168-
traceback=traceback))
169-
"""
170-
else:
171-
cmdstr += """
172-
if info is None:
173-
savepkl(resultsfile, dict(result=result, hostname=gethostname(),
174-
traceback=traceback))
175-
else:
176-
from nipype.pipeline.plugins.base import report_crash
177-
report_crash(info['node'], traceback, gethostname())
178-
raise Exception(e)
179-
"""
180-
cmdstr = cmdstr % (mpl_backend, pkl_file, batch_dir, node.config, suffix)
181-
pyscript = os.path.join(batch_dir, 'pyscript_%s.py' % suffix)
182-
with open(pyscript, 'wt') as fp:
183-
fp.writelines(cmdstr)
184-
return pyscript
34+
Execution plugin API
35+
====================
18536
37+
Current status::
18638
187-
class PluginBase(object):
188-
"""Base class for plugins"""
39+
class plugin_runner(PluginBase):
40+
41+
def run(graph, config, updatehash)
42+
43+
"""
18944

19045
def __init__(self, plugin_args=None):
19146
if plugin_args is None:
19247
plugin_args = {}
19348
self.plugin_args = plugin_args
49+
self._config = None
19450

19551
self._status_callback = plugin_args.get('status_callback')
19652
return
@@ -226,11 +82,17 @@ def __init__(self, plugin_args=None):
22682
self.proc_pending = None
22783
self.max_jobs = self.plugin_args.get('max_jobs', np.inf)
22884

85+
def _prerun_check(self, graph):
86+
"""Stub."""
87+
22988
def run(self, graph, config, updatehash=False):
230-
"""Executes a pre-defined pipeline using distributed approaches
89+
"""
90+
Executes a pre-defined pipeline using distributed approaches
23191
"""
23292
logger.info("Running in parallel.")
23393
self._config = config
94+
95+
self._prerun_check(graph)
23496
# Generate appropriate structures for worker-manager model
23597
self._generate_dependency_list(graph)
23698
self.pending_tasks = []
@@ -297,7 +159,12 @@ def _submit_job(self, node, updatehash=False):
297159
raise NotImplementedError
298160

299161
def _report_crash(self, node, result=None):
300-
raise NotImplementedError
162+
tb = None
163+
if result is not None:
164+
node._result = getattr(result, 'result')
165+
tb = getattr(result, 'traceback')
166+
node._traceback = tb
167+
return report_crash(node, traceback=tb)
301168

302169
def _clear_task(self, taskid):
303170
raise NotImplementedError
@@ -584,15 +451,6 @@ def _submit_job(self, node, updatehash=False):
584451
fp.writelines(batchscript)
585452
return self._submit_batchtask(batchscriptfile, node)
586453

587-
def _report_crash(self, node, result=None):
588-
if result and result['traceback']:
589-
node._result = result['result']
590-
node._traceback = result['traceback']
591-
return report_crash(node,
592-
traceback=result['traceback'])
593-
else:
594-
return report_crash(node)
595-
596454
def _clear_task(self, taskid):
597455
del self._pending[taskid]
598456

nipype/pipeline/plugins/multiproc.py

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ...utils.misc import str2bool
2121
from ...utils.profiler import get_system_total_memory_gb
2222
from ..engine import MapNode
23-
from .base import (DistributedPluginBase, report_crash)
23+
from .base import DistributedPluginBase
2424

2525
# Init logger
2626
logger = logging.getLogger('workflow')
@@ -133,37 +133,28 @@ def _async_callback(self, args):
133133
def _get_result(self, taskid):
134134
return self._taskresult.get(taskid)
135135

136-
def _report_crash(self, node, result=None):
137-
if result and result['traceback']:
138-
node._result = result['result']
139-
node._traceback = result['traceback']
140-
return report_crash(node,
141-
traceback=result['traceback'])
142-
else:
143-
return report_crash(node)
144-
145136
def _clear_task(self, taskid):
146137
del self._task_obj[taskid]
147138

148139
def _submit_job(self, node, updatehash=False):
149140
self._taskid += 1
150-
if hasattr(node.inputs, 'terminal_output'):
151-
if node.inputs.terminal_output == 'stream':
152-
node.inputs.terminal_output = 'allatonce'
153-
154-
self._task_obj[self._taskid] = \
155-
self.pool.apply_async(run_node,
156-
(node, updatehash, self._taskid),
157-
callback=self._async_callback)
141+
if getattr(node.inputs, 'terminal_output') == 'stream':
142+
node.inputs.terminal_output = 'allatonce'
143+
144+
self._task_obj[self._taskid] = self.pool.apply_async(
145+
run_node, (node, updatehash, self._taskid),
146+
callback=self._async_callback)
158147
return self._taskid
159148

160149
def _close(self):
161150
self.pool.close()
162151
return True
163152

164153
def _send_procs_to_workers(self, updatehash=False, graph=None):
165-
""" Sends jobs to workers when system resources are available.
166-
Check memory (gb) and cores usage before running jobs.
154+
"""
155+
Sends jobs to workers when system resources are available.
156+
Check memory (gb) and cores usage before running jobs.
157+
167158
"""
168159
executing_now = []
169160

@@ -176,7 +167,6 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
176167
busy_processors = 0
177168
for jobid in currently_running_jobids:
178169
est_mem_gb = self.procs[jobid]._interface.estimated_memory_gb
179-
est_num_th = self.procs[jobid]._interface.num_threads
180170

181171
if est_mem_gb > self.memory_gb:
182172
logger.warning(
@@ -185,6 +175,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
185175
if self.raise_insufficient:
186176
raise RuntimeError('Insufficient resources available for job')
187177

178+
est_num_th = self.procs[jobid]._interface.num_threads
188179
if est_num_th > self.processors:
189180
logger.warning(
190181
'Job %s - Requested %d threads, but only %d are available.',
@@ -232,7 +223,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232223
except Exception:
233224
etype, eval, etr = sys.exc_info()
234225
traceback = format_exception(etype, eval, etr)
235-
report_crash(self.procs[jobid], traceback=traceback)
226+
self._report_crash(self.procs[jobid], traceback=traceback)
236227
self._clean_queue(jobid, graph)
237228
self.proc_pending[jobid] = False
238229
continue
@@ -267,7 +258,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
267258
except Exception:
268259
etype, eval, etr = sys.exc_info()
269260
traceback = format_exception(etype, eval, etr)
270-
report_crash(self.procs[jobid], traceback=traceback)
261+
self._report_crash(self.procs[jobid], traceback=traceback)
271262
self._clean_queue(jobid, graph)
272263
self.proc_pending[jobid] = False
273264
continue
@@ -282,7 +273,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
282273
except Exception:
283274
etype, eval, etr = sys.exc_info()
284275
traceback = format_exception(etype, eval, etr)
285-
report_crash(self.procs[jobid], traceback=traceback)
276+
self._report_crash(self.procs[jobid], traceback=traceback)
286277
finally:
287278
self._task_finished_cb(jobid)
288279
self._remove_node_dirs()

0 commit comments

Comments
 (0)