Skip to content

Commit 427e668

Browse files
committed
improve documentation of DistributedBasePlugin
1 parent 013be14 commit 427e668

File tree

2 files changed

+52
-34
lines changed

2 files changed

+52
-34
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,40 @@ def run(self, graph, config, updatehash=False):
5757

5858

5959
class DistributedPluginBase(PluginBase):
60-
"""Execute workflow with a distribution engine
60+
"""
61+
Execute workflow with a distribution engine
62+
63+
Relevant class attributes
64+
-------------------------
65+
66+
procs: list (N) of underlying interface elements to be processed
67+
proc_done: a boolean numpy array (N,) signifying whether a process has been
68+
submitted for execution
69+
proc_pending: a boolean numpy array (N,) signifying whether a
70+
process is currently running.
71+
depidx: a boolean matrix (NxN) storing the dependency structure accross
72+
processes. Process dependencies are derived from each column.
73+
74+
Combinations of ``proc_done`` and ``proc_pending``
75+
--------------------------------------------------
76+
77+
+------------+---------------+--------------------------------+
78+
| proc_done | proc_pending | outcome |
79+
+============+===============+================================+
80+
| True | False | Process is finished |
81+
+------------+---------------+--------------------------------+
82+
| True | True | Process is currently being run |
83+
+------------+---------------+--------------------------------+
84+
| False | False | Process is queued |
85+
+------------+---------------+--------------------------------+
86+
| False | True | INVALID COMBINATION |
87+
+------------+---------------+--------------------------------+
6188
"""
6289

6390
def __init__(self, plugin_args=None):
64-
"""Initialize runtime attributes to none
65-
66-
procs: list (N) of underlying interface elements to be processed
67-
proc_done: a boolean numpy array (N,) signifying whether a process has been
68-
executed
69-
proc_pending: a boolean numpy array (N,) signifying whether a
70-
process is currently running. Note: A process is finished only when
71-
both proc_done==True and
72-
proc_pending==False
73-
depidx: a boolean matrix (NxN) storing the dependency structure accross
74-
processes. Process dependencies are derived from each column.
91+
"""
92+
Initialize runtime attributes to none
93+
7594
"""
7695
super(DistributedPluginBase, self).__init__(plugin_args=plugin_args)
7796
self.procs = None
@@ -87,12 +106,16 @@ def __init__(self, plugin_args=None):
87106
def _prerun_check(self, graph):
88107
"""Stub method to validate/massage graph and nodes before running"""
89108

109+
def _postrun_check(self):
110+
"""Stub method to close any open resources"""
111+
90112
def run(self, graph, config, updatehash=False):
91113
"""
92114
Executes a pre-defined pipeline using distributed approaches
93115
"""
94116
logger.info("Running in parallel.")
95117
self._config = config
118+
poll_sleep_secs = float(config['execution']['poll_sleep_duration'])
96119

97120
self._prerun_check(graph)
98121
# Generate appropriate structures for worker-manager model
@@ -107,12 +130,14 @@ def run(self, graph, config, updatehash=False):
107130
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
108131
jobs_ready = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
109132

110-
logger.info('Progress: %d jobs, %d/%d/%d/%d (done/running/pending/ready).',
133+
logger.info('Progress: %d jobs, %d/%d/%d (done/running/ready),'
134+
' %d/%d (pending_tasks/waiting).',
111135
len(self.proc_done),
112-
np.sum(self.proc_done & ~self.proc_pending),
136+
np.sum(self.proc_done ^ self.proc_pending),
113137
np.sum(self.proc_done & self.proc_pending),
138+
len(jobs_ready),
114139
len(self.pending_tasks),
115-
len(jobs_ready))
140+
np.sum(~self.proc_done & ~self.proc_pending))
116141
toappend = []
117142
# trigger callbacks for any pending results
118143
while self.pending_tasks:
@@ -139,27 +164,21 @@ def run(self, graph, config, updatehash=False):
139164
if toappend:
140165
self.pending_tasks.extend(toappend)
141166
num_jobs = len(self.pending_tasks)
142-
logger.debug('Tasks currently running (%d).', num_jobs)
167+
logger.debug('Tasks currently running: %d. Pending: %d.', num_jobs,
168+
np.sum(self.proc_done & self.proc_pending))
143169
if num_jobs < self.max_jobs:
144170
self._send_procs_to_workers(updatehash=updatehash,
145171
graph=graph)
146172
else:
147173
logger.debug('Not submitting (max jobs reached)')
148-
self._wait()
174+
175+
sleep(poll_sleep_secs)
149176

150177
self._remove_node_dirs()
151178
report_nodes_not_run(notrun)
152179

153180
# close any open resources
154-
self._close()
155-
156-
def _wait(self):
157-
sleep(float(self._config['execution']['poll_sleep_duration']))
158-
159-
def _close(self):
160-
# close any open resources, this could raise NotImplementedError
161-
# but I didn't want to break other plugins
162-
return True
181+
self._postrun_check()
163182

164183
def _get_result(self, taskid):
165184
raise NotImplementedError

nipype/pipeline/plugins/multiproc.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ def __init__(self, plugin_args=None):
118118
self._taskresult = {}
119119
self._task_obj = {}
120120
self._taskid = 0
121-
self._timeout = 2.0
122121

123122
# Read in options or set defaults.
124123
non_daemon = self.plugin_args.get('non_daemon', True)
@@ -151,11 +150,8 @@ def _submit_job(self, node, updatehash=False):
151150
callback=self._async_callback)
152151
return self._taskid
153152

154-
def _close(self):
155-
self.pool.close()
156-
return True
157-
158153
def _prerun_check(self, graph):
154+
"""Check if any node exeeds the available resources"""
159155
tasks_mem_gb = []
160156
tasks_num_th = []
161157
for node in graph.nodes():
@@ -176,6 +172,9 @@ def _prerun_check(self, graph):
176172
if self.raise_insufficient:
177173
raise RuntimeError('Insufficient resources available for job')
178174

175+
def _postrun_check(self):
176+
self.pool.close()
177+
179178
def _check_resources(self, running_tasks):
180179
"""
181180
Make sure there are resources available
@@ -241,9 +240,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
241240

242241
free_memory_gb -= next_job_gb
243242
free_processors -= next_job_th
244-
logger.info('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.',
245-
self.procs[jobid]._id, jobid, next_job_gb, next_job_th,
246-
free_memory_gb, free_processors)
243+
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.',
244+
self.procs[jobid]._id, jobid, next_job_gb, next_job_th,
245+
free_memory_gb, free_processors)
247246

248247
# change job status in appropriate queues
249248
self.proc_done[jobid] = True

0 commit comments

Comments
 (0)