Skip to content

Commit de3b298

Browse files
committed
Modified thread-monitoring logic and ensured unit tests pass
1 parent 0189dd8 commit de3b298

File tree

5 files changed

+45
-12
lines changed

5 files changed

+45
-12
lines changed

doc/users/resource_sched_profiler.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,12 @@ Here it can be seen that the number of threads was underestimated while the
117117
amount of memory needed was overestimated. The next time this workflow is run
118118
the user can change the node interface ``num_threads`` and
119119
``estimated_memory_gb`` parameters to reflect this for a higher pipeline
120-
throughput.
120+
throughput. Note, sometimes the "runtime_threads" value is higher than expected,
121+
particularly for multi-threaded applications. Tools can implement
122+
multi-threading in different ways under-the-hood; the profiler merely traverses
123+
the process tree to return all running threads associated with that process,
124+
some of which may include active thread-monitoring daemons or transient
125+
processes.
121126

122127

123128
Visualizing Pipeline Resources

nipype/interfaces/base.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,16 +1216,34 @@ def _get_num_threads(proc):
12161216
# If process is running
12171217
if proc.status() == psutil.STATUS_RUNNING:
12181218
num_threads = proc.num_threads()
1219+
elif proc.num_threads() > 1:
1220+
tprocs = [psutil.Process(thr.id) for thr in proc.threads()]
1221+
alive_tprocs = [tproc for tproc in tprocs if tproc.status() == psutil.STATUS_RUNNING]
1222+
num_threads = len(alive_tprocs)
12191223
else:
1220-
num_threads = 0
1224+
num_threads = 1
12211225

12221226
# Try-block for errors
12231227
try:
12241228
child_threads = 0
12251229
# Iterate through child processes and get number of their threads
12261230
for child in proc.children(recursive=True):
1227-
if child.status() == psutil.STATUS_RUNNING and len(child.children()) == 0:
1228-
child_threads += child.num_threads()
1231+
# Leaf process
1232+
if len(child.children()) == 0:
1233+
# If process is running, get its number of threads
1234+
if child.status() == psutil.STATUS_RUNNING:
1235+
child_thr = child.num_threads()
1236+
# If its not necessarily running, but still multi-threaded
1237+
elif child.num_threads() > 1:
1238+
# Cast each thread as a process and check for only running
1239+
tprocs = [psutil.Process(thr.id) for thr in child.threads()]
1240+
alive_tprocs = [tproc for tproc in tprocs if tproc.status() == psutil.STATUS_RUNNING]
1241+
child_thr = len(alive_tprocs)
1242+
# Otherwise, no threads are running
1243+
else:
1244+
child_thr = 0
1245+
# Increment child threads
1246+
child_threads += child_thr
12291247
# Catch any NoSuchProcess errors
12301248
except psutil.NoSuchProcess:
12311249
pass

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,21 @@ def _use_gb_ram(num_gb):
7878

7979
# Import packages
8080
from threading import Thread
81+
from multiprocessing import Process
8182

8283
# Init variables
8384
num_gb = float(num_gb)
8485

8586
# Build thread list
8687
thread_list = []
8788
for idx in range(num_threads):
88-
thread = Thread(target=_use_gb_ram, args=(num_gb/num_threads,), name=str(idx))
89+
thread = Thread(target=_use_gb_ram, args=(num_gb/num_threads,),
90+
name=str(idx))
8991
thread_list.append(thread)
9092

9193
# Run multi-threaded
92-
print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb, num_threads)
94+
print('Using %.3f GB of memory over %d sub-threads...' % \
95+
(num_gb, num_threads))
9396
for idx, thread in enumerate(thread_list):
9497
thread.start()
9598

@@ -127,11 +130,11 @@ def setUp(self):
127130

128131
# Init parameters
129132
# Input RAM GB to occupy
130-
self.num_gb = 2
133+
self.num_gb = 4
131134
# Input number of sub-threads (not including parent threads)
132135
self.num_threads = 4
133136
# Acceptable percent error for memory profiled against input
134-
self.mem_err_percent = 5
137+
self.mem_err_percent = 10
135138

136139
# ! Only used for benchmarking the profiler over a range of
137140
# ! RAM usage and number of threads

nipype/interfaces/tests/use_resources

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ if __name__ == '__main__':
3535
# Import packages
3636
import argparse
3737
from threading import Thread
38+
from multiprocessing import Process
3839

3940
# Init argparser
4041
parser = argparse.ArgumentParser(description=__doc__)
@@ -55,10 +56,11 @@ if __name__ == '__main__':
5556
# Build thread list
5657
thread_list = []
5758
for idx in range(num_threads):
58-
thread_list.append(Thread(target=use_gb_ram, args=(num_gb/num_threads,)))
59+
thread_list.append(Process(target=use_gb_ram, args=(num_gb/num_threads,)))
5960

6061
# Run multi-threaded
61-
print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb, num_threads)
62+
print('Using %.3f GB of memory over %d sub-threads...' % \
63+
(num_gb, num_threads))
6264
for thread in thread_list:
6365
thread.start()
6466

nipype/utils/draw_gantt_chart.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@
99
from dateutil import parser
1010
import datetime
1111
import random
12-
import pandas as pd
1312
from collections import OrderedDict
14-
13+
# Pandas
14+
try:
15+
import pandas as pd
16+
except ImportError:
17+
print('Pandas not found; in order for full functionality of this module '\
18+
'install the pandas package')
19+
pass
1520

1621
def create_event_dict(start_time, nodes_list):
1722
'''

0 commit comments

Comments
 (0)