Skip to content

Commit 9d19e14

Browse files
committed
Changed memory parameters to be memory_gb to be more explicit, used runtime Bunch object only for runtime stats storage instead of using results dictionary, renamed ResourceMultiProc to MultiProc for backwards-compatiblity
1 parent f4b0b73 commit 9d19e14

File tree

15 files changed

+73
-89
lines changed

15 files changed

+73
-89
lines changed

nipype/interfaces/ants/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# -Using -1 gives primary responsibilty to ITKv4 to do the correct
1313
# thread limitings.
1414
# -Using 1 takes a very conservative approach to avoid overloading
15-
# the computer (when running ResourceMultiProc) by forcing everything to
15+
# the computer (when running MultiProc) by forcing everything to
1616
# single threaded. This can be a severe penalty for registration
1717
# performance.
1818
LOCAL_DEFAULT_NUMBER_OF_THREADS = 1

nipype/interfaces/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ def __init__(self, **inputs):
764764
raise Exception('No input_spec in class: %s' %
765765
self.__class__.__name__)
766766
self.inputs = self.input_spec(**inputs)
767-
self.estimated_memory = 1
767+
self.estimated_memory_gb = 1
768768
self.num_threads = 1
769769

770770
@classmethod
@@ -1406,7 +1406,7 @@ def _process(drain=0):
14061406
result['stderr'] = []
14071407
result['merged'] = ''
14081408

1409-
setattr(runtime, 'runtime_memory', mem_mb/1024.0)
1409+
setattr(runtime, 'runtime_memory_gb', mem_mb/1024.0)
14101410
setattr(runtime, 'runtime_threads', num_threads)
14111411
runtime.stderr = '\n'.join(result['stderr'])
14121412
runtime.stdout = '\n'.join(result['stdout'])

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ class UseResources(CommandLine):
4747
# Test case for the run function
4848
class RuntimeProfilerTestCase(unittest.TestCase):
4949
'''
50-
This class is a test case for the ResourceMultiProc plugin runtime
51-
profiler
50+
This class is a test case for the runtime profiler
5251
5352
Inherits
5453
--------
@@ -129,7 +128,7 @@ def _run_workflow(self):
129128

130129
# Resources used node
131130
resource_node = pe.Node(UseResources(), name='resource_node')
132-
resource_node.interface.estimated_memory = num_gb
131+
resource_node.interface.estimated_memory_gb = num_gb
133132
resource_node.interface.num_threads = num_procs
134133

135134
# Connect workflow
@@ -140,7 +139,7 @@ def _run_workflow(self):
140139
plugin_args = {'n_procs' : num_procs,
141140
'memory' : num_gb,
142141
'status_callback' : log_nodes_cb}
143-
wf.run(plugin='ResourceMultiProc', plugin_args=plugin_args)
142+
wf.run(plugin='MultiProc', plugin_args=plugin_args)
144143

145144
# Get runtime stats from log file
146145
finish_str = open(log_file, 'r').readlines()[1].rstrip('\n')
@@ -169,7 +168,7 @@ def test_wf_logfile(self):
169168
node_stats = json.loads(finish_str)
170169

171170
# Read out runtime stats
172-
runtime_gb = float(node_stats['runtime_memory'])
171+
runtime_gb = float(node_stats['runtime_memory_gb'])
173172
runtime_procs = int(node_stats['runtime_threads'])
174173

175174
# Error message formatting

nipype/pipeline/engine/nodes.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -737,15 +737,20 @@ def write_report(self, report_type=None, cwd=None):
737737
fp.close()
738738
return
739739
fp.writelines(write_rst_header('Runtime info', level=1))
740+
# Init rst dictionary of runtime stats
741+
rst_dict = {'hostname' : self.result.runtime.hostname,
742+
'duration' : self.result.runtime.duration}
743+
# Try and insert memory/threads usage if available
744+
try:
745+
rst_dict['runtime_memory_gb'] = self.result.runtime.runtime_memory_gb
746+
rst_dict['runtime_threads'] = self.result.runtime.runtime_threads
747+
except:
748+
logger.info('Runtime memory and threads stats unavailable')
740749
if hasattr(self.result.runtime, 'cmdline'):
741-
fp.writelines(write_rst_dict(
742-
{'hostname': self.result.runtime.hostname,
743-
'duration': self.result.runtime.duration,
744-
'command': self.result.runtime.cmdline}))
750+
rst_dict['command'] = self.result.runtime.cmdline
751+
fp.writelines(write_rst_dict(rst_dict))
745752
else:
746-
fp.writelines(write_rst_dict(
747-
{'hostname': self.result.runtime.hostname,
748-
'duration': self.result.runtime.duration}))
753+
fp.writelines(write_rst_dict(rst_dict))
749754
if hasattr(self.result.runtime, 'merged'):
750755
fp.writelines(write_rst_header('Terminal output', level=2))
751756
fp.writelines(write_rst_list(self.result.runtime.merged))

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ def func1(in1):
723723
# test running the workflow on default conditions
724724
error_raised = False
725725
try:
726-
w1.run(plugin='ResourceMultiProc')
726+
w1.run(plugin='MultiProc')
727727
except Exception as e:
728728
from nipype.pipeline.engine.base import logger
729729
logger.info('Exception: %s' % str(e))
@@ -737,7 +737,7 @@ def func1(in1):
737737
# test running the workflow on serial conditions
738738
error_raised = False
739739
try:
740-
w1.run(plugin='ResourceMultiProc')
740+
w1.run(plugin='MultiProc')
741741
except Exception as e:
742742
from nipype.pipeline.engine.base import logger
743743
logger.info('Exception: %s' % str(e))

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def test_function3(arg):
214214

215215
out_dir = mkdtemp()
216216

217-
for plugin in ('Linear',): # , 'ResourceMultiProc'):
217+
for plugin in ('Linear',): # , 'MultiProc'):
218218
n1 = pe.Node(niu.Function(input_names=['arg1'],
219219
output_names=['out_file1', 'out_file2', 'dir'],
220220
function=test_function),

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from .sge import SGEPlugin
1010
from .condor import CondorPlugin
1111
from .dagman import CondorDAGManPlugin
12-
from .multiproc import ResourceMultiProcPlugin
12+
from .multiproc import MultiProcPlugin
1313
from .ipython import IPythonPlugin
1414
from .somaflow import SomaFlowPlugin
1515
from .pbsgraph import PBSGraphPlugin

nipype/pipeline/plugins/base.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def run(self, graph, config, updatehash=False):
246246
notrun.append(self._clean_queue(jobid, graph,
247247
result=result))
248248
else:
249-
self._task_finished_cb(jobid, result)
249+
self._task_finished_cb(jobid)
250250
self._remove_node_dirs()
251251
self._clear_task(taskid)
252252
else:
@@ -415,18 +415,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
415415
else:
416416
break
417417

418-
def _task_finished_cb(self, jobid, result=None):
418+
def _task_finished_cb(self, jobid):
419419
""" Extract outputs and assign to inputs of dependent tasks
420420
421421
This is called when a job is completed.
422422
"""
423423
logger.info('[Job finished] jobname: %s jobid: %d' %
424424
(self.procs[jobid]._id, jobid))
425425
if self._status_callback:
426-
if result == None:
427-
if self._taskresult.has_key(jobid):
428-
result = self._taskresult[jobid].get()
429-
self._status_callback(self.procs[jobid], 'end', result)
426+
self._status_callback(self.procs[jobid], 'end')
430427
# Update job and worker queues
431428
self.proc_pending[jobid] = False
432429
# update the job dependency structure

nipype/pipeline/plugins/callback_log.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99

1010
# Log node stats function
11-
def log_nodes_cb(node, status, result=None):
11+
def log_nodes_cb(node, status):
1212
"""Function to record node run statistics to a log file as json
1313
dictionaries
1414
"""
@@ -17,47 +17,40 @@ def log_nodes_cb(node, status, result=None):
1717
logger = logging.getLogger('callback')
1818

1919
# Check runtime profile stats
20-
if result is None:
21-
runtime_memory = runtime_seconds = runtime_threads = 'N/A'
22-
else:
23-
try:
24-
runtime_memory = result['runtime_memory']
25-
except KeyError:
26-
runtime_memory = 'Unknown'
20+
if node.result is not None:
2721
try:
28-
runtime_seconds = result['runtime_seconds']
29-
except KeyError:
30-
runtime_seconds = 'Unknown'
31-
try:
32-
runtime_threads = result['runtime_threads']
22+
runtime = node.result.runtime
23+
runtime_memory_gb = runtime.runtime_memory_gb
24+
runtime_threads = runtime.runtime_threads
3325
except:
34-
runtime_threads = 'Unknown'
26+
runtime_memory_gb = runtime_threads = 'Unkown'
27+
else:
28+
runtime_memory_gb = runtime_threads = 'N/A'
3529

3630
# Check status and write to log
3731
# Start
3832
if status == 'start':
3933
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
4034
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
41-
'"' + ',"estimated_memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
42-
+ str(node._interface.num_threads) + '}'
35+
'"' + ',"estimated_memory_gb":' + str(node._interface.estimated_memory_gb) + \
36+
',"num_threads":' + str(node._interface.num_threads) + '}'
4337

4438
logger.debug(message)
4539
# End
4640
elif status == 'end':
4741
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
4842
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) + \
49-
'"' + ',"estimated_memory":' + '"'+ str(node._interface.estimated_memory) + '"'+ \
50-
',"num_threads":' + '"'+ str(node._interface.num_threads) + '"'+ \
43+
'"' + ',"estimated_memory_gb":' + '"'+ str(node._interface.estimated_memory_gb) + \
44+
'"'+ ',"num_threads":' + '"'+ str(node._interface.num_threads) + '"'+ \
5145
',"runtime_threads":' + '"'+ str(runtime_threads) + '"'+ \
52-
',"runtime_memory":' + '"'+ str(runtime_memory) + '"' + \
53-
',"runtime_seconds":' + '"'+ str(runtime_seconds) + '"'+ '}'
46+
',"runtime_memory_gb":' + '"'+ str(runtime_memory_gb) + '"' + '}'
5447

5548
logger.debug(message)
5649
# Other
5750
else:
5851
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
5952
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
60-
'"' + ',"estimated_memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
61-
+ str(node._interface.num_threads) + ',"error":"True"}'
53+
'"' + ',"estimated_memory_gb":' + str(node._interface.estimated_memory_gb) + \
54+
',"num_threads":' + str(node._interface.num_threads) + ',"error":"True"}'
6255

6356
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,12 @@ def run_node(node, updatehash):
4141
dictionary containing the node runtime results and stats
4242
"""
4343

44-
# Import packages
45-
import datetime
46-
4744
# Init variables
4845
result = dict(result=None, traceback=None)
4946

50-
#
47+
# Try and execute the node via node.run()
5148
try:
52-
start = datetime.datetime.now()
53-
retval = node.run(updatehash=updatehash)
54-
run_secs = (datetime.datetime.now() - start).total_seconds()
55-
result['result'] = retval
56-
result['runtime_seconds'] = run_secs
57-
if hasattr(retval.runtime, 'get'):
58-
result['runtime_memory'] = retval.runtime.get('runtime_memory')
59-
result['runtime_threads'] = retval.runtime.get('runtime_threads')
49+
result['result'] = node.run(updatehash=updatehash)
6050
except:
6151
etype, eval, etr = sys.exc_info()
6252
result['traceback'] = format_exception(etype, eval, etr)
@@ -88,7 +78,7 @@ def release_lock(args):
8878
semaphore_singleton.semaphore.release()
8979

9080

91-
class ResourceMultiProcPlugin(DistributedPluginBase):
81+
class MultiProcPlugin(DistributedPluginBase):
9282
"""Execute workflow with multiprocessing, not sending more jobs at once
9383
than the system can support.
9484
@@ -98,7 +88,7 @@ class ResourceMultiProcPlugin(DistributedPluginBase):
9888
the number of threads and memory of the system is used.
9989
10090
System consuming nodes should be tagged:
101-
memory_consuming_node.interface.estimated_memory = 8 #Gb
91+
memory_consuming_node.interface.estimated_memory_gb = 8
10292
thread_consuming_node.interface.num_threads = 16
10393
10494
The default number of threads and memory for a node is 1.
@@ -107,12 +97,12 @@ class ResourceMultiProcPlugin(DistributedPluginBase):
10797
10898
- non_daemon : boolean flag to execute as non-daemon processes
10999
- num_threads: maximum number of threads to be executed in parallel
110-
- estimated_memory: maximum memory that can be used at once.
100+
- estimated_memory_gb: maximum memory (in GB) that can be used at once.
111101
112102
"""
113103

114104
def __init__(self, plugin_args=None):
115-
super(ResourceMultiProcPlugin, self).__init__(plugin_args=plugin_args)
105+
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
116106
self._taskresult = {}
117107
self._taskid = 0
118108
non_daemon = True
@@ -186,8 +176,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
186176
busy_memory = 0
187177
busy_processors = 0
188178
for jobid in jobids:
189-
busy_memory+= self.procs[jobid]._interface.estimated_memory
190-
busy_processors+= self.procs[jobid]._interface.num_threads
179+
busy_memory += self.procs[jobid]._interface.estimated_memory_gb
180+
busy_processors += self.procs[jobid]._interface.num_threads
191181

192182
free_memory = self.memory - busy_memory
193183
free_processors = self.processors - busy_processors
@@ -201,21 +191,21 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
201191
#sort jobs ready to run first by memory and then by number of threads
202192
#The most resource consuming jobs run first
203193
jobids = sorted(jobids,
204-
key=lambda item: (self.procs[item]._interface.estimated_memory,
194+
key=lambda item: (self.procs[item]._interface.estimated_memory_gb,
205195
self.procs[item]._interface.num_threads))
206196

207-
logger.debug('Free memory: %d, Free processors: %d',
197+
logger.debug('Free memory (GB): %d, Free processors: %d',
208198
free_memory, free_processors)
209199

210200

211201
#while have enough memory and processors for first job
212202
#submit first job on the list
213203
for jobid in jobids:
214-
logger.debug('Next Job: %d, memory: %d, threads: %d' \
215-
% (jobid, self.procs[jobid]._interface.estimated_memory,
204+
logger.debug('Next Job: %d, memory (GB): %d, threads: %d' \
205+
% (jobid, self.procs[jobid]._interface.estimated_memory_gb,
216206
self.procs[jobid]._interface.num_threads))
217207

218-
if self.procs[jobid]._interface.estimated_memory <= free_memory and \
208+
if self.procs[jobid]._interface.estimated_memory_gb <= free_memory and \
219209
self.procs[jobid]._interface.num_threads <= free_processors:
220210
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
221211
executing_now.append(self.procs[jobid])
@@ -236,7 +226,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
236226
self.proc_done[jobid] = True
237227
self.proc_pending[jobid] = True
238228

239-
free_memory -= self.procs[jobid]._interface.estimated_memory
229+
free_memory -= self.procs[jobid]._interface.estimated_memory_gb
240230
free_processors -= self.procs[jobid]._interface.num_threads
241231

242232
# Send job to task manager and add to pending tasks

0 commit comments

Comments
 (0)