Skip to content

Commit ca4bed5

Browse files
committed
Removed all of the ResourceMultiProc plugin so the S3 datasink
1 parent 15f3ced commit ca4bed5

File tree

9 files changed

+14
-675
lines changed

9 files changed

+14
-675
lines changed

nipype/interfaces/base.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -750,8 +750,6 @@ def __init__(self, **inputs):
750750
raise Exception('No input_spec in class: %s' %
751751
self.__class__.__name__)
752752
self.inputs = self.input_spec(**inputs)
753-
self.estimated_memory = 1
754-
self.num_threads = 1
755753

756754
@classmethod
757755
def help(cls, returnhelp=False):
@@ -1197,11 +1195,9 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
11971195
11981196
The returned runtime contains a merged stdout+stderr log with timestamps
11991197
"""
1200-
1201-
# Init variables
12021198
PIPE = subprocess.PIPE
1203-
cmdline = runtime.cmdline
12041199

1200+
cmdline = runtime.cmdline
12051201
if redirect_x:
12061202
exist_xvfb, _ = _exists_in_path('xvfb-run', runtime.environ)
12071203
if not exist_xvfb:
@@ -1230,8 +1226,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12301226
result = {}
12311227
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
12321228
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
1233-
1234-
12351229
if output == 'stream':
12361230
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
12371231

@@ -1247,6 +1241,7 @@ def _process(drain=0):
12471241
else:
12481242
for stream in res[0]:
12491243
stream.read(drain)
1244+
12501245
while proc.returncode is None:
12511246
proc.poll()
12521247
_process()
@@ -1261,7 +1256,6 @@ def _process(drain=0):
12611256
result[stream._name] = [r[2] for r in rows]
12621257
temp.sort()
12631258
result['merged'] = [r[1] for r in temp]
1264-
12651259
if output == 'allatonce':
12661260
stdout, stderr = proc.communicate()
12671261
result['stdout'] = stdout.split('\n')
@@ -1279,7 +1273,6 @@ def _process(drain=0):
12791273
result['stdout'] = []
12801274
result['stderr'] = []
12811275
result['merged'] = ''
1282-
12831276
runtime.stderr = '\n'.join(result['stderr'])
12841277
runtime.stdout = '\n'.join(result['stdout'])
12851278
runtime.merged = result['merged']

nipype/interfaces/fsl/model.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,14 @@ def _create_ev_files(
250250
element=count,
251251
ctype=ctype, val=val)
252252
ev_txt += "\n"
253-
254-
for fconidx in ftest_idx:
255-
fval=0
256-
if con[0] in con_map.keys() and fconidx in con_map[con[0]]:
257-
fval=1
258-
ev_txt += contrast_ftest_element.substitute(
259-
cnum=ftest_idx.index(fconidx) + 1,
260-
element=tidx,
261-
ctype=ctype,
262-
val=fval)
263-
ev_txt += "\n"
253+
if con[0] in con_map.keys():
254+
for fconidx in con_map[con[0]]:
255+
ev_txt += contrast_ftest_element.substitute(
256+
cnum=ftest_idx.index(fconidx) + 1,
257+
element=tidx,
258+
ctype=ctype,
259+
val=1)
260+
ev_txt += "\n"
264261

265262
# add contrast mask info
266263
ev_txt += contrastmask_header.substitute()
@@ -1959,4 +1956,3 @@ def _list_outputs(self):
19591956
self.inputs.out_vnscales_name)
19601957

19611958
return outputs
1962-

nipype/pipeline/plugins/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,10 @@
99
from .condor import CondorPlugin
1010
from .dagman import CondorDAGManPlugin
1111
from .multiproc import MultiProcPlugin
12-
from .multiproc import ResourceMultiProcPlugin
1312
from .ipython import IPythonPlugin
1413
from .somaflow import SomaFlowPlugin
1514
from .pbsgraph import PBSGraphPlugin
1615
from .sgegraph import SGEGraphPlugin
1716
from .lsf import LSFPlugin
1817
from .slurm import SLURMPlugin
1918
from .slurmgraph import SLURMGraphPlugin
20-
21-
from .callback_log import log_nodes_cb

nipype/pipeline/plugins/base.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,10 @@ def run(self, graph, config, updatehash=False):
260260
graph=graph)
261261
else:
262262
logger.debug('Not submitting')
263-
self._wait()
263+
sleep(float(self._config['execution']['poll_sleep_duration']))
264264
self._remove_node_dirs()
265265
report_nodes_not_run(notrun)
266266

267-
268-
269-
def _wait(self):
270-
sleep(float(self._config['execution']['poll_sleep_duration']))
271-
272267
def _get_result(self, taskid):
273268
raise NotImplementedError
274269

nipype/pipeline/plugins/callback_log.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

nipype/pipeline/plugins/multiproc.py

Lines changed: 2 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
from .base import (DistributedPluginBase, report_crash)
1414

15-
1615
def run_node(node, updatehash):
1716
result = dict(result=None, traceback=None)
1817
try:
@@ -23,7 +22,6 @@ def run_node(node, updatehash):
2322
result['result'] = node.result
2423
return result
2524

26-
2725
class NonDaemonProcess(Process):
2826
"""A non-daemon process to support internal multiprocessing.
2927
"""
@@ -68,7 +66,6 @@ def __init__(self, plugin_args=None):
6866
else:
6967
self.pool = Pool(processes=n_procs)
7068

71-
7269
def _get_result(self, taskid):
7370
if taskid not in self._taskresult:
7471
raise RuntimeError('Multiproc task %d not found'%taskid)
@@ -84,7 +81,8 @@ def _submit_job(self, node, updatehash=False):
8481
except:
8582
pass
8683
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
87-
(node, updatehash,))
84+
(node,
85+
updatehash,))
8886
return self._taskid
8987

9088
def _report_crash(self, node, result=None):
@@ -98,169 +96,3 @@ def _report_crash(self, node, result=None):
9896

9997
def _clear_task(self, taskid):
10098
del self._taskresult[taskid]
101-
102-
103-
104-
import numpy as np
105-
from copy import deepcopy
106-
from ..engine import (MapNode, str2bool)
107-
import datetime
108-
import psutil
109-
from ... import logging
110-
import semaphore_singleton
111-
logger = logging.getLogger('workflow')
112-
113-
def release_lock(args):
114-
semaphore_singleton.semaphore.release()
115-
116-
class ResourceMultiProcPlugin(MultiProcPlugin):
117-
"""Execute workflow with multiprocessing not sending more jobs at once
118-
than the system can support.
119-
120-
The plugin_args input to run can be used to control the multiprocessing
121-
execution and defining the maximum amount of memory and threads that
122-
should be used. When those parameters are not specified,
123-
the number of threads and memory of the system is used.
124-
125-
System consuming nodes should be tagged:
126-
memory_consuming_node.interface.memory = 8 #Gb
127-
thread_consuming_node.interface.num_threads = 16
128-
129-
The default number of threads and memory for a node is 1.
130-
131-
Currently supported options are:
132-
133-
- num_thread: maximum number of threads to be executed in parallel
134-
- memory: maximum memory that can be used at once.
135-
136-
"""
137-
138-
def __init__(self, plugin_args=None):
139-
super(ResourceMultiProcPlugin, self).__init__(plugin_args=plugin_args)
140-
self.plugin_args = plugin_args
141-
self.processors = cpu_count()
142-
memory = psutil.virtual_memory()
143-
self.memory = memory.total / (1024*1024*1024)
144-
if self.plugin_args:
145-
if 'n_procs' in self.plugin_args:
146-
self.processors = self.plugin_args['n_procs']
147-
if 'memory' in self.plugin_args:
148-
self.memory = self.plugin_args['memory']
149-
150-
def _wait(self):
151-
if len(self.pending_tasks) > 0:
152-
semaphore_singleton.semaphore.acquire()
153-
semaphore_singleton.semaphore.release()
154-
155-
156-
def _submit_job(self, node, updatehash=False):
157-
self._taskid += 1
158-
try:
159-
if node.inputs.terminal_output == 'stream':
160-
node.inputs.terminal_output = 'allatonce'
161-
except:
162-
pass
163-
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
164-
(node, updatehash,),
165-
callback=release_lock)
166-
return self._taskid
167-
168-
def _send_procs_to_workers(self, updatehash=False, graph=None):
169-
""" Sends jobs to workers when system resources are available.
170-
Check memory (gb) and cores usage before running jobs.
171-
"""
172-
executing_now = []
173-
174-
# Check to see if a job is available
175-
jobids = np.flatnonzero((self.proc_pending == True) & (self.depidx.sum(axis=0) == 0).__array__())
176-
177-
#check available system resources by summing all threads and memory used
178-
busy_memory = 0
179-
busy_processors = 0
180-
for jobid in jobids:
181-
busy_memory+= self.procs[jobid]._interface.estimated_memory
182-
busy_processors+= self.procs[jobid]._interface.num_threads
183-
184-
free_memory = self.memory - busy_memory
185-
free_processors = self.processors - busy_processors
186-
187-
188-
#check all jobs without dependency not run
189-
jobids = np.flatnonzero((self.proc_done == False) & (self.depidx.sum(axis=0) == 0).__array__())
190-
191-
192-
#sort jobs ready to run first by memory and then by number of threads
193-
#The most resource consuming jobs run first
194-
jobids = sorted(jobids, key=lambda item: (self.procs[item]._interface.estimated_memory, self.procs[item]._interface.num_threads))
195-
196-
logger.debug('Free memory: %d, Free processors: %d', free_memory, free_processors)
197-
198-
199-
#while have enough memory and processors for first job
200-
#submit first job on the list
201-
for jobid in jobids:
202-
logger.debug('Next Job: %d, memory: %d, threads: %d' %(jobid, self.procs[jobid]._interface.estimated_memory, self.procs[jobid]._interface.num_threads))
203-
204-
if self.procs[jobid]._interface.estimated_memory <= free_memory and self.procs[jobid]._interface.num_threads <= free_processors:
205-
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
206-
executing_now.append(self.procs[jobid])
207-
208-
if isinstance(self.procs[jobid], MapNode):
209-
try:
210-
num_subnodes = self.procs[jobid].num_subnodes()
211-
except Exception:
212-
self._clean_queue(jobid, graph)
213-
self.proc_pending[jobid] = False
214-
continue
215-
if num_subnodes > 1:
216-
submit = self._submit_mapnode(jobid)
217-
if not submit:
218-
continue
219-
220-
# change job status in appropriate queues
221-
self.proc_done[jobid] = True
222-
self.proc_pending[jobid] = True
223-
224-
free_memory -= self.procs[jobid]._interface.estimated_memory
225-
free_processors -= self.procs[jobid]._interface.num_threads
226-
227-
# Send job to task manager and add to pending tasks
228-
if self._status_callback:
229-
self._status_callback(self.procs[jobid], 'start')
230-
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
231-
logger.debug('checking hash locally')
232-
try:
233-
hash_exists, _, _, _ = self.procs[
234-
jobid].hash_exists()
235-
logger.debug('Hash exists %s' % str(hash_exists))
236-
if (hash_exists and (self.procs[jobid].overwrite == False or (self.procs[jobid].overwrite == None and not self.procs[jobid]._interface.always_run))):
237-
self._task_finished_cb(jobid)
238-
self._remove_node_dirs()
239-
continue
240-
except Exception:
241-
self._clean_queue(jobid, graph)
242-
self.proc_pending[jobid] = False
243-
continue
244-
logger.debug('Finished checking hash')
245-
246-
if self.procs[jobid].run_without_submitting:
247-
logger.debug('Running node %s on master thread' %self.procs[jobid])
248-
try:
249-
self.procs[jobid].run()
250-
except Exception:
251-
self._clean_queue(jobid, graph)
252-
self._task_finished_cb(jobid)
253-
self._remove_node_dirs()
254-
255-
else:
256-
logger.debug('submitting', jobid)
257-
tid = self._submit_job(deepcopy(self.procs[jobid]), updatehash=updatehash)
258-
if tid is None:
259-
self.proc_done[jobid] = False
260-
self.proc_pending[jobid] = False
261-
else:
262-
self.pending_tasks.insert(0, (tid, jobid))
263-
else:
264-
break
265-
266-
logger.debug('No jobs waiting to execute')

nipype/pipeline/plugins/semaphore_singleton.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)