Skip to content

Commit c83c407

Browse files
committed
collect resource_monitor info after run
1 parent 04adabd commit c83c407

File tree

3 files changed

+58
-2
lines changed

3 files changed

+58
-2
lines changed

nipype/interfaces/base.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,10 +1135,17 @@ def run(self, **inputs):
11351135
# Read .prof file in and set runtime values
11361136
vals = np.loadtxt(mon_fname, delimiter=',')
11371137
if vals.tolist():
1138-
_, mem_peak_gb, nthreads = np.atleast_2d(vals).max(0).astype(float).tolist()
1139-
runtime.mem_peak_gb = mem_peak_gb / 1024
1138+
vals = np.atleast_2d(vals)
1139+
_, mem_peak_mb, nthreads = vals.max(0).astype(float).tolist()
1140+
runtime.mem_peak_gb = mem_peak_mb / 1024
11401141
runtime.nthreads_max = int(nthreads)
11411142

1143+
runtime.prof_dict = {
1144+
'time': vals[:, 0].tolist(),
1145+
'mem_gb': (vals[:, 1] / 1024).tolist(),
1146+
'cpus': vals[:, 2].astype(int).tolist(),
1147+
}
1148+
11421149
return results
11431150

11441151
def _list_outputs(self):

nipype/pipeline/engine/utils.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,51 @@ def write_workflow_prov(graph, filename=None, format='all'):
12961296
return ps.g
12971297

12981298

1299+
def write_workflow_resources(graph, filename=None):
1300+
import simplejson as json
1301+
if not filename:
1302+
filename = os.path.join(os.getcwd(), 'resource_monitor.json')
1303+
1304+
big_dict = {
1305+
'time': [],
1306+
'name': [],
1307+
'interface': [],
1308+
'mem_gb': [],
1309+
'cpus': [],
1310+
'mapnode': [],
1311+
'params': [],
1312+
}
1313+
1314+
for idx, node in enumerate(graph.nodes()):
1315+
nodename = node.fullname
1316+
classname = node._interface.__class__.__name__
1317+
1318+
params = ''
1319+
if node.parameterization:
1320+
params = '_'.join(['{}'.format(p)
1321+
for p in node.parameterization])
1322+
1323+
rt_list = node.result.runtime
1324+
if not isinstance(rt_list, list):
1325+
rt_list = [rt_list]
1326+
1327+
for subidx, runtime in enumerate(rt_list):
1328+
nsamples = len(runtime.prof_dict['time'])
1329+
1330+
for key in ['time', 'mem_gb', 'cpus']:
1331+
big_dict[key] += runtime.prof_dict[key]
1332+
1333+
big_dict['interface'] += [classname] * nsamples
1334+
big_dict['name'] += [nodename] * nsamples
1335+
big_dict['mapnode'] += [subidx] * nsamples
1336+
big_dict['params'] += [params] * nsamples
1337+
1338+
with open(filename, 'w') as rsf:
1339+
json.dump(big_dict, rsf)
1340+
1341+
return filename
1342+
1343+
12991344
def topological_sort(graph, depth_first=False):
13001345
"""Returns a depth first sorted order if depth_first is True
13011346
"""

nipype/pipeline/engine/workflows.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
write_rst_list, to_str)
5252
from .utils import (generate_expanded_graph, modify_paths,
5353
export_graph, make_output_dir, write_workflow_prov,
54+
write_workflow_resources,
5455
clean_working_directory, format_dot, topological_sort,
5556
get_print_name, merge_dict, evaluate_connect_function,
5657
_write_inputs, format_node)
@@ -593,6 +594,9 @@ def run(self, plugin=None, plugin_args=None, updatehash=False):
593594
'workflow_provenance_%s' % datestr)
594595
logger.info('Provenance file prefix: %s' % prov_base)
595596
write_workflow_prov(execgraph, prov_base, format='all')
597+
598+
if str2bool(self.config['execution'].get('resource_monitor', 'false')):
599+
write_workflow_resources(execgraph)
596600
return execgraph
597601

598602
# PRIVATE API AND FUNCTIONS

0 commit comments

Comments
 (0)