@@ -28,7 +28,7 @@ class UseResourcesInputSpec(CommandLineInputSpec):
28
28
# Init attributes
29
29
num_gb = traits .Float (desc = 'Number of GB of RAM to use' ,
30
30
argstr = '-g %f' )
31
- num_procs = traits .Int (desc = 'Number of processors to use' ,
31
+ num_threads = traits .Int (desc = 'Number of threads to use' ,
32
32
argstr = '-p %d' )
33
33
34
34
@@ -52,8 +52,8 @@ class UseResources(CommandLine):
52
52
_cmd = exec_path
53
53
54
54
55
- # Spin multiple processors
56
- def use_resources (num_procs , num_gb ):
55
+ # Spin multiple threads
56
+ def use_resources (num_threads , num_gb ):
57
57
'''
58
58
Function to execute multiple use_gb_ram functions in parallel
59
59
'''
@@ -82,19 +82,19 @@ def _use_gb_ram(num_gb):
82
82
# Init variables
83
83
num_gb = float (num_gb )
84
84
85
- # Build proc list
86
- proc_list = []
87
- for idx in range (num_procs ):
88
- proc = Thread (target = _use_gb_ram , args = (num_gb / num_procs ,), name = str (idx ))
89
- proc_list .append (proc )
85
+ # Build thread list
86
+ thread_list = []
87
+ for idx in range (num_threads ):
88
+ thread = Thread (target = _use_gb_ram , args = (num_gb / num_threads ,), name = str (idx ))
89
+ thread_list .append (thread )
90
90
91
91
# Run multi-threaded
92
- print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb , num_procs )
93
- for idx , proc in enumerate (proc_list ):
94
- proc .start ()
92
+ print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb , num_threads )
93
+ for idx , thread in enumerate (thread_list ):
94
+ thread .start ()
95
95
96
- for proc in proc_list :
97
- proc .join ()
96
+ for thread in thread_list :
97
+ thread .join ()
98
98
99
99
100
100
# Test case for the run function
@@ -134,9 +134,9 @@ def setUp(self):
134
134
self .mem_err_percent = 5
135
135
136
136
# ! Only used for benchmarking the profiler over a range of
137
- # ! processors and RAM usage
138
- # ! Requires a LOT of RAM and PROCS to be tested
139
- def _collect_range_runtime_stats (self ):
137
+ # ! RAM usage
138
+ # ! Requires a LOT of RAM to be tested
139
+ def _collect_range_runtime_stats (self , num_threads ):
140
140
'''
141
141
Function to collect a range of runtime stats
142
142
'''
@@ -147,54 +147,58 @@ def _collect_range_runtime_stats(self):
147
147
import pandas as pd
148
148
149
149
# Init variables
150
- num_procs_range = 8
151
150
ram_gb_range = 10.0
152
151
ram_gb_step = 0.25
153
152
dict_list = []
154
153
155
154
# Iterate through all combos
156
- for num_procs in np .arange (1 , num_procs_range + 1 , 1 ):
157
- for num_gb in np .arange (0.25 , ram_gb_range + ram_gb_step , ram_gb_step ):
158
- # Cmd-level
159
- cmd_fin_str = self ._run_cmdline_workflow (num_gb , num_procs )
160
- cmd_node_stats = json .loads (cmd_fin_str )
161
- cmd_runtime_procs = int (cmd_node_stats ['runtime_threads' ])
162
- cmd_runtime_gb = float (cmd_node_stats ['runtime_memory_gb' ])
163
-
164
- # Func-level
165
- func_fin_str = self ._run_function_workflow (num_gb , num_procs )
166
- func_node_stats = json .loads (func_fin_str )
167
- func_runtime_procs = int (func_node_stats ['runtime_threads' ])
168
- func_runtime_gb = float (func_node_stats ['runtime_memory_gb' ])
169
-
170
- # Calc errors
171
- cmd_procs_err = cmd_runtime_procs - num_procs
172
- cmd_gb_err = cmd_runtime_gb - num_gb
173
- func_procs_err = func_runtime_procs - num_procs
174
- func_gb_err = func_runtime_gb - num_gb
175
-
176
- # Node dictionary
177
- results_dict = {'input_procs' : num_procs ,
178
- 'input_gb' : num_gb ,
179
- 'cmd_runtime_procs' : cmd_runtime_procs ,
180
- 'cmd_runtime_gb' : cmd_runtime_gb ,
181
- 'func_runtime_procs' : func_runtime_procs ,
182
- 'func_runtime_gb' : func_runtime_gb ,
183
- 'cmd_procs_err' : cmd_procs_err ,
184
- 'cmd_gb_err' : cmd_gb_err ,
185
- 'func_procs_err' : func_procs_err ,
186
- 'func_gb_err' : func_gb_err }
187
- # Append to list
188
- dict_list .append (results_dict )
155
+ for num_gb in np .arange (0.25 , ram_gb_range + ram_gb_step , ram_gb_step ):
156
+ # Cmd-level
157
+ cmd_fin_str = self ._run_cmdline_workflow (num_gb , num_threads )
158
+ cmd_node_stats = json .loads (cmd_fin_str )
159
+ cmd_runtime_threads = int (cmd_node_stats ['runtime_threads' ])
160
+ cmd_runtime_gb = float (cmd_node_stats ['runtime_memory_gb' ])
161
+
162
+ # Func-level
163
+ func_fin_str = self ._run_function_workflow (num_gb , num_threads )
164
+ func_node_stats = json .loads (func_fin_str )
165
+ func_runtime_threads = int (func_node_stats ['runtime_threads' ])
166
+ func_runtime_gb = float (func_node_stats ['runtime_memory_gb' ])
167
+
168
+ # Calc errors
169
+ cmd_threads_err = cmd_threads_threads - num_threads
170
+ cmd_gb_err = cmd_runtime_gb - num_gb
171
+ func_threads_err = func_runtime_threads - num_threads
172
+ func_gb_err = func_runtime_gb - num_gb
173
+
174
+ # Node dictionary
175
+ results_dict = {'input_threads' : num_threads ,
176
+ 'input_gb' : num_gb ,
177
+ 'cmd_runtime_threads' : cmd_runtime_threads ,
178
+ 'cmd_runtime_gb' : cmd_runtime_gb ,
179
+ 'func_runtime_threads' : func_runtime_threads ,
180
+ 'func_runtime_gb' : func_runtime_gb ,
181
+ 'cmd_thread_err' : cmd_thread_err ,
182
+ 'cmd_gb_err' : cmd_gb_err ,
183
+ 'func_thread_err' : func_thread_err ,
184
+ 'func_gb_err' : func_gb_err }
185
+ # Append to list
186
+ dict_list .append (results_dict )
189
187
190
188
# Create dataframe
191
189
runtime_results_df = pd .DataFrame (dict_list )
192
190
193
191
# Return dataframe
194
192
return runtime_results_df
195
193
194
+ def tiest_collect_range (self ):
195
+ num_threads = 1
196
+ df = self ._collect_range_suntime_stats (num_threads )
197
+
198
+ df .to_csv ('/root/%d_thread_df.csv' )
199
+
196
200
# Test node
197
- def _run_cmdline_workflow (self , num_gb , num_procs ):
201
+ def _run_cmdline_workflow (self , num_gb , num_threads ):
198
202
'''
199
203
Function to run the use_resources cmdline script in a nipype workflow
200
204
and return the runtime stats recorded by the profiler
@@ -237,22 +241,22 @@ def _run_cmdline_workflow(self, num_gb, num_procs):
237
241
238
242
# Input node
239
243
input_node = pe .Node (util .IdentityInterface (fields = ['num_gb' ,
240
- 'num_procs ' ]),
244
+ 'num_threads ' ]),
241
245
name = 'input_node' )
242
246
input_node .inputs .num_gb = num_gb
243
- input_node .inputs .num_procs = num_procs
247
+ input_node .inputs .num_threads = num_threads
244
248
245
249
# Resources used node
246
250
resource_node = pe .Node (UseResources (), name = 'resource_node' )
247
251
resource_node .interface .estimated_memory_gb = num_gb
248
- resource_node .interface .num_threads = num_procs
252
+ resource_node .interface .num_threads = num_threads
249
253
250
254
# Connect workflow
251
255
wf .connect (input_node , 'num_gb' , resource_node , 'num_gb' )
252
- wf .connect (input_node , 'num_procs ' , resource_node , 'num_procs ' )
256
+ wf .connect (input_node , 'num_threads ' , resource_node , 'num_threads ' )
253
257
254
258
# Run workflow
255
- plugin_args = {'n_procs' : num_procs ,
259
+ plugin_args = {'n_procs' : num_threads ,
256
260
'memory' : num_gb ,
257
261
'status_callback' : log_nodes_cb }
258
262
wf .run (plugin = 'MultiProc' , plugin_args = plugin_args )
@@ -267,7 +271,7 @@ def _run_cmdline_workflow(self, num_gb, num_procs):
267
271
return finish_str
268
272
269
273
# Test node
270
- def _run_function_workflow (self , num_gb , num_procs ):
274
+ def _run_function_workflow (self , num_gb , num_threads ):
271
275
'''
272
276
Function to run the use_resources() function in a nipype workflow
273
277
and return the runtime stats recorded by the profiler
@@ -310,26 +314,26 @@ def _run_function_workflow(self, num_gb, num_procs):
310
314
311
315
# Input node
312
316
input_node = pe .Node (util .IdentityInterface (fields = ['num_gb' ,
313
- 'num_procs ' ]),
317
+ 'num_threads ' ]),
314
318
name = 'input_node' )
315
319
input_node .inputs .num_gb = num_gb
316
- input_node .inputs .num_procs = num_procs
320
+ input_node .inputs .num_threads = num_threads
317
321
318
322
# Resources used node
319
- resource_node = pe .Node (util .Function (input_names = ['num_procs ' ,
323
+ resource_node = pe .Node (util .Function (input_names = ['num_threads ' ,
320
324
'num_gb' ],
321
325
output_names = [],
322
326
function = use_resources ),
323
327
name = 'resource_node' )
324
328
resource_node .interface .estimated_memory_gb = num_gb
325
- resource_node .interface .num_threads = num_procs
329
+ resource_node .interface .num_threads = num_threads
326
330
327
331
# Connect workflow
328
332
wf .connect (input_node , 'num_gb' , resource_node , 'num_gb' )
329
- wf .connect (input_node , 'num_procs ' , resource_node , 'num_procs ' )
333
+ wf .connect (input_node , 'num_threads ' , resource_node , 'num_threads ' )
330
334
331
335
# Run workflow
332
- plugin_args = {'n_procs' : num_procs ,
336
+ plugin_args = {'n_procs' : num_threads ,
333
337
'memory' : num_gb ,
334
338
'status_callback' : log_nodes_cb }
335
339
wf .run (plugin = 'MultiProc' , plugin_args = plugin_args )
@@ -378,12 +382,12 @@ def test_cmdline_profiling(self):
378
382
# Error message formatting
379
383
mem_err = 'Input memory: %f is not within %.1f%% of runtime ' \
380
384
'memory: %f' % (num_gb , self .mem_err_percent , runtime_gb )
381
- procs_err = 'Input threads: %d is not equal to runtime threads: %d' \
385
+ threads_err = 'Input threads: %d is not equal to runtime threads: %d' \
382
386
% (expected_runtime_threads , runtime_threads )
383
387
384
388
# Assert runtime stats are what was input
385
389
self .assertLessEqual (runtime_gb_err , allowed_gb_err , msg = mem_err )
386
- self .assertEqual (expected_runtime_threads , runtime_threads , msg = procs_err )
390
+ self .assertEqual (expected_runtime_threads , runtime_threads , msg = threads_err )
387
391
388
392
# Test resources were used as expected
389
393
@unittest .skipIf (run_profiler == False , skip_profile_msg )
@@ -420,12 +424,12 @@ def test_function_profiling(self):
420
424
# Error message formatting
421
425
mem_err = 'Input memory: %f is not within %.1f%% of runtime ' \
422
426
'memory: %f' % (num_gb , self .mem_err_percent , runtime_gb )
423
- procs_err = 'Input procs : %d is not equal to runtime procs : %d' \
427
+ threads_err = 'Input threads : %d is not equal to runtime threads : %d' \
424
428
% (expected_runtime_threads , runtime_threads )
425
429
426
430
# Assert runtime stats are what was input
427
431
self .assertLessEqual (runtime_gb_err , allowed_gb_err , msg = mem_err )
428
- self .assertEqual (expected_runtime_threads , runtime_threads , msg = procs_err )
432
+ self .assertEqual (expected_runtime_threads , runtime_threads , msg = threads_err )
429
433
430
434
431
435
# Command-line run-able unittest module
0 commit comments