Skip to content

Commit 371a10b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into ref/Node-cleanup
2 parents a0b2b04 + 373bddd commit 371a10b

File tree

4 files changed

+36
-20
lines changed

4 files changed

+36
-20
lines changed

nipype/interfaces/base/core.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,9 @@ def run(self, **inputs):
530530

531531
runtime.prof_dict = {
532532
'time': vals[:, 0].tolist(),
533-
'mem_gb': (vals[:, 1] / 1024).tolist(),
534-
'cpus': vals[:, 2].tolist(),
533+
'cpus': vals[:, 1].tolist(),
534+
'rss_GiB': (vals[:, 2] / 1024).tolist(),
535+
'vms_GiB': (vals[:, 3] / 1024).tolist(),
535536
}
536537

537538
return results

nipype/pipeline/engine/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,7 +1538,8 @@ def write_workflow_resources(graph, filename=None, append=None):
15381538
'time': [],
15391539
'name': [],
15401540
'interface': [],
1541-
'mem_gb': [],
1541+
'rss_GiB': [],
1542+
'vms_GiB': [],
15421543
'cpus': [],
15431544
'mapnode': [],
15441545
'params': [],
@@ -1579,7 +1580,7 @@ def write_workflow_resources(graph, filename=None, append=None):
15791580
'(mapflow %d/%d).', nodename, subidx + 1, len(rt_list))
15801581
continue
15811582

1582-
for key in ['time', 'mem_gb', 'cpus']:
1583+
for key in ['time', 'cpus', 'rss_GiB', 'vms_GiB']:
15831584
big_dict[key] += runtime.prof_dict[key]
15841585

15851586
big_dict['interface'] += [classname] * nsamples

nipype/pipeline/plugins/base.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ def run(self, graph, config, updatehash=False):
128128
old_progress_stats = None
129129
old_presub_stats = None
130130
while not np.all(self.proc_done) or np.any(self.proc_pending):
131-
# Check to see if a job is available (jobs without dependencies not run)
132-
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
133-
jobs_ready = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
131+
loop_start = time()
132+
# Check if a job is available (jobs with all dependencies run)
133+
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
134+
jobs_ready = np.nonzero(~self.proc_done &
135+
(self.depidx.sum(0) == 0))[1]
134136

135137
progress_stats = (len(self.proc_done),
136138
np.sum(self.proc_done ^ self.proc_pending),
@@ -164,7 +166,8 @@ def run(self, graph, config, updatehash=False):
164166
self._remove_node_dirs()
165167
self._clear_task(taskid)
166168
else:
167-
assert self.proc_done[jobid] and self.proc_pending[jobid]
169+
assert self.proc_done[jobid] and \
170+
self.proc_pending[jobid]
168171
toappend.insert(0, (taskid, jobid))
169172

170173
if toappend:
@@ -183,7 +186,8 @@ def run(self, graph, config, updatehash=False):
183186
elif display_stats:
184187
logger.debug('Not submitting (max jobs reached)')
185188

186-
sleep(poll_sleep_secs)
189+
sleep_til = loop_start + poll_sleep_secs
190+
sleep(max(0, sleep_til - time()))
187191

188192
self._remove_node_dirs()
189193
report_nodes_not_run(notrun)
@@ -271,8 +275,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
271275
if (num_jobs >= self.max_jobs) or (slots == 0):
272276
break
273277

274-
# Check to see if a job is available (jobs without dependencies not run)
275-
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
278+
# Check if a job is available (jobs with all dependencies run)
279+
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
276280
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
277281

278282
if len(jobids) > 0:
@@ -325,7 +329,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
325329
break
326330

327331
def _local_hash_check(self, jobid, graph):
328-
if not str2bool(self.procs[jobid].config['execution']['local_hash_check']):
332+
if not str2bool(self.procs[jobid].config['execution'][
333+
'local_hash_check']):
329334
return False
330335

331336
logger.debug('Checking hash (%d) locally', jobid)
@@ -397,8 +402,8 @@ def _remove_node_dirs(self):
397402
"""Removes directories whose outputs have already been used up
398403
"""
399404
if str2bool(self._config['execution']['remove_node_directories']):
400-
for idx in np.nonzero(
401-
(self.refidx.sum(axis=1) == 0).__array__())[0]:
405+
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
406+
for idx in indices:
402407
if idx in self.mapnodesubids:
403408
continue
404409
if self.proc_done[idx] and (not self.proc_pending[idx]):
@@ -513,7 +518,8 @@ class GraphPluginBase(PluginBase):
513518

514519
def __init__(self, plugin_args=None):
515520
if plugin_args and plugin_args.get('status_callback'):
516-
logger.warning('status_callback not supported for Graph submission plugins')
521+
logger.warning('status_callback not supported for Graph submission'
522+
' plugins')
517523
super(GraphPluginBase, self).__init__(plugin_args=plugin_args)
518524

519525
def run(self, graph, config, updatehash=False):

nipype/utils/profiler.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ def stop(self):
6767

6868
def _sample(self, cpu_interval=None):
6969
cpu = 0.0
70-
mem = 0.0
70+
rss = 0.0
71+
vms = 0.0
7172
try:
7273
with self._process.oneshot():
7374
cpu += self._process.cpu_percent(interval=cpu_interval)
74-
mem += self._process.memory_info().rss
75+
mem_info = self._process.memory_info()
76+
rss += mem_info.rss
77+
vms += mem_info.vms
7578
except psutil.NoSuchProcess:
7679
pass
7780

@@ -85,19 +88,24 @@ def _sample(self, cpu_interval=None):
8588
try:
8689
with child.oneshot():
8790
cpu += child.cpu_percent()
88-
mem += child.memory_info().rss
91+
mem_info = child.memory_info()
92+
rss += mem_info.rss
93+
vms += mem_info.vms
8994
except psutil.NoSuchProcess:
9095
pass
9196

92-
print('%f,%f,%f' % (time(), (mem / _MB), cpu),
97+
print('%f,%f,%f,%f' % (time(), cpu, rss / _MB, vms / _MB),
9398
file=self._logfile)
9499
self._logfile.flush()
95100

96101
def run(self):
97102
"""Core monitoring function, called by start()"""
103+
start_time = time()
104+
wait_til = start_time
98105
while not self._event.is_set():
99106
self._sample()
100-
self._event.wait(self._freq)
107+
wait_til += self._freq
108+
self._event.wait(max(0, wait_til - time()))
101109

102110

103111
# Log node stats function

0 commit comments

Comments
 (0)