Skip to content

Commit be1ec62

Browse files
committed
Added global watcher
1 parent 7a8383b commit be1ec62

File tree

4 files changed

+52
-15
lines changed

4 files changed

+52
-15
lines changed

nipype/interfaces/base.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,19 +1222,32 @@ def _get_num_threads(proc):
12221222

12231223
# Import packages
12241224
import psutil
1225+
import logging
12251226

12261227
# Init variables
1227-
num_threads = proc.num_threads()
1228-
alive_procs = 0
1228+
cb_log = logging.getLogger('callback')
1229+
cb_log.propogate = False
1230+
cb_log.debug('proc pid: %d, parent pid: %d, name: %s, exe: %s, cmdline: %s, status: %s, num_threads: %d' \
1231+
% (proc.pid, proc.ppid(), proc.name(), proc.exe(), proc.cmdline(), proc.status(), proc.num_threads()))
1232+
if proc.status() == psutil.STATUS_RUNNING:
1233+
num_threads = proc.num_threads()
1234+
else:
1235+
num_threads = 0
1236+
child_threads = 0
12291237
# Iterate through child processes and get number of their threads
12301238
try:
1231-
#num_children = len(proc.children())
12321239
for child in proc.children(recursive=True):
12331240
if child.status() == psutil.STATUS_RUNNING:
1234-
alive_procs += 1
1235-
num_threads += max(alive_procs, child.num_threads()) #child.num_threads()
1241+
# If leaf child process
1242+
if len(child.children()) == 0:
1243+
child_threads += child.num_threads()
1244+
#num_threads = max(num_threads, child.num_threads()) #child.num_threads()
1245+
cb_log.debug('child pid: %d, parent pid: %d, name: %s, exe: %s, cmdline: %s, status: %s, num_threads: %d' \
1246+
% (proc.pid, proc.ppid(), proc.name(), proc.exe(), proc.cmdline(), proc.status(), child.num_threads()))
1247+
12361248
#num_threads = max(num_threads, num_children,
12371249
# child.num_threads(), len(child.children()))
1250+
num_threads = max(child_threads, num_threads)
12381251
except psutil.NoSuchProcess:
12391252
pass
12401253

@@ -1380,8 +1393,8 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
13801393
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
13811394

13821395
# Init variables for memory profiling
1383-
mem_mb = -1
1384-
num_threads = -1
1396+
mem_mb = 0
1397+
num_threads = 0
13851398
interval = .5
13861399

13871400
if output == 'stream':
@@ -1405,7 +1418,7 @@ def _process(drain=0):
14051418
get_max_resources_used(proc.pid, mem_mb, num_threads)
14061419
proc.poll()
14071420
_process()
1408-
#time.sleep(interval)
1421+
time.sleep(interval)
14091422
_process(drain=1)
14101423

14111424
# collect results, merge and return
@@ -1424,7 +1437,7 @@ def _process(drain=0):
14241437
mem_mb, num_threads = \
14251438
get_max_resources_used(proc.pid, mem_mb, num_threads)
14261439
proc.poll()
1427-
#time.sleep(interval)
1440+
time.sleep(interval)
14281441
stdout, stderr = proc.communicate()
14291442
if stdout and isinstance(stdout, bytes):
14301443
try:
@@ -1446,7 +1459,7 @@ def _process(drain=0):
14461459
mem_mb, num_threads = \
14471460
get_max_resources_used(proc.pid, mem_mb, num_threads)
14481461
proc.poll()
1449-
#time.sleep(interval)
1462+
time.sleep(interval)
14501463
ret_code = proc.wait()
14511464
stderr.flush()
14521465
stdout.flush()
@@ -1459,7 +1472,7 @@ def _process(drain=0):
14591472
mem_mb, num_threads = \
14601473
get_max_resources_used(proc.pid, mem_mb, num_threads)
14611474
proc.poll()
1462-
#time.sleep(interval)
1475+
time.sleep(interval)
14631476
proc.communicate()
14641477
result['stdout'] = []
14651478
result['stderr'] = []

nipype/interfaces/utility.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,8 @@ def _function_handle_wrapper(queue, **kwargs):
484484
args=(queue,), kwargs=args)
485485

486486
# Init memory and threads before profiling
487-
mem_mb = -1
488-
num_threads = -1
487+
mem_mb = 0
488+
num_threads = 0
489489

490490
# Start process and profile while it's alive
491491
proc.start()

nipype/pipeline/plugins/base.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,22 @@ def __init__(self, plugin_args=None):
223223
def run(self, graph, config, updatehash=False):
224224
"""Executes a pre-defined pipeline using distributed approaches
225225
"""
226+
# Global watcher inits
227+
from nipype.interfaces.base import get_max_resources_used
228+
gpid = os.getpid()
229+
num_threads = 0
230+
memory_mb = 0
231+
# Init logger
232+
import logging as lg
233+
gw_log = lg.getLogger('global_watcher')
234+
gw_log.setLevel(lg.INFO)
235+
formatter = lg.Formatter('%(asctime)s : %(message)s')
236+
237+
# Write logs to file
238+
file_handler = lg.FileHandler('/home/dclark/work-dir/gw.log')
239+
file_handler.setFormatter(formatter)
240+
gw_log.addHandler(file_handler)
241+
226242
logger.info("Running in parallel.")
227243
self._config = config
228244
# Generate appropriate structures for worker-manager model
@@ -235,6 +251,11 @@ def run(self, graph, config, updatehash=False):
235251
notrun = []
236252
while np.any(self.proc_done == False) | \
237253
np.any(self.proc_pending == True):
254+
255+
mem_mb, num_thr = get_max_resources_used(gpid, memory_mb, num_threads)
256+
memory_mb = max(mem_mb, memory_mb)
257+
num_threads = max(num_thr, num_threads)
258+
gw_log.info('Memory GB usage: %.4f, Threads usage: %d' % (memory_mb/1024.0, num_threads))
238259
toappend = []
239260
# trigger callbacks for any pending results
240261
while self.pending_tasks:
@@ -266,6 +287,7 @@ def run(self, graph, config, updatehash=False):
266287
else:
267288
logger.debug('Not submitting')
268289
self._wait()
290+
269291
self._remove_node_dirs()
270292
report_nodes_not_run(notrun)
271293

nipype/utils/draw_gantt_chart.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ def calculate_resource_timeseries(events, resource):
183183

184184
# Iterate through the events
185185
for tdelta, event in sorted(events.items()):
186+
if tdelta > 70.7:
187+
print 'hi'
186188
if event['event'] == "start":
187189
if resource in event and event[resource] != 'Unkown':
188190
all_res += float(event[resource])
@@ -520,8 +522,8 @@ def generate_gantt_chart(logfile, cores, minute_scale=10,
520522
close_header = '''
521523
</div>
522524
<div style="display:inline-block;margin-left:60px;vertical-align: top;">
523-
<p><span><div class="label" style="background-color:#03969D;"></div> Estimated Resource</span></p>
524-
<p><span><div class="label" style="background-color:#90BBD7;"></div> Actual Resource</span></p>
525+
<p><span><div class="label" style="background-color:#90BBD7;"></div> Estimated Resource</span></p>
526+
<p><span><div class="label" style="background-color:#03969D;"></div> Actual Resource</span></p>
525527
<p><span><div class="label" style="background-color:#f00;"></div> Failed Node</span></p>
526528
</div>
527529
'''

0 commit comments

Comments
 (0)