Skip to content

Commit a2f2a96

Browse files
authored
Merge pull request #204 from anilbey/n_processes
Introducing number of processes argument
2 parents cdec8cc + d4ebf55 commit a2f2a96

File tree

11 files changed

+147
-48
lines changed

11 files changed

+147
-48
lines changed

bluepymm/main.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,16 @@ def run(arg_list):
5151

5252
if args.action == "prepare":
5353
prepare_combos.prepare_combos(conf_filename=args.conf_filename,
54-
continu=args.continu)
54+
continu=args.continu,
55+
n_processes=args.n_processes)
5556
elif args.action == "run":
5657
run_combos.run_combos(conf_filename=args.conf_filename,
5758
ipyp=args.ipyp,
58-
ipyp_profile=args.ipyp_profile)
59+
ipyp_profile=args.ipyp_profile,
60+
n_processes=args.n_processes)
5961
elif args.action == "select":
60-
select_combos.select_combos(conf_filename=args.conf_filename)
62+
select_combos.select_combos(conf_filename=args.conf_filename,
63+
n_processes=args.n_processes)
6164
elif args.action == "validate":
6265
validate_output.validate_output(conf_filename=args.conf_filename)
6366

bluepymm/prepare_combos/main.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from . import create_mm_sqlite
3030

3131

32-
def prepare_emodels(conf_dict, continu, scores_db_path):
32+
def prepare_emodels(conf_dict, continu, scores_db_path, n_processes):
3333
"""Prepare emodels"""
3434

3535
tmp_dir = conf_dict['tmp_dir']
@@ -52,7 +52,7 @@ def prepare_emodels(conf_dict, continu, scores_db_path):
5252
# Clone the emodels repo and prepare the dirs for all the emodels
5353
emodel_dirs = prepare_dirs.prepare_emodel_dirs(
5454
final_dict, emodel_etype_map, emodels_dir, opt_dir, emodels_hoc_dir,
55-
emodels_in_repo, continu=continu)
55+
emodels_in_repo, continu=continu, n_processes=n_processes)
5656

5757
if not continu:
5858
print('Creating sqlite db at %s' % scores_db_path)
@@ -97,15 +97,15 @@ def prepare_emodels(conf_dict, continu, scores_db_path):
9797
return final_dict, emodel_dirs
9898

9999

100-
def prepare_combos(conf_filename, continu):
100+
def prepare_combos(conf_filename, continu, n_processes=None):
101101
"""Prepare combos"""
102102

103103
print('Reading configuration at %s' % conf_filename)
104104
conf_dict = tools.load_json(conf_filename)
105105
scores_db_path = os.path.abspath(conf_dict['scores_db'])
106106

107107
final_dict, emodel_dirs = prepare_emodels(
108-
conf_dict, continu, scores_db_path)
108+
conf_dict, continu, scores_db_path, n_processes)
109109

110110
# Save output
111111
# TODO: gather all output business here?
@@ -124,3 +124,5 @@ def add_parser(action):
124124
parser.add_argument('conf_filename')
125125
parser.add_argument('--continu', action='store_true',
126126
help='continue from previous run')
127+
parser.add_argument('--n_processes', help='number of processes',
128+
type=int)

bluepymm/prepare_combos/prepare_emodel_dirs.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,15 @@ def prepare_emodel_dir(input_args):
255255

256256

257257
def prepare_emodel_dirs(
258-
final_dict,
259-
emodel_etype_map,
260-
emodels_dir,
261-
opt_dir,
262-
emodels_hoc_dir,
263-
emodels_in_repo,
264-
continu=False):
258+
final_dict,
259+
emodel_etype_map,
260+
emodels_dir,
261+
opt_dir,
262+
emodels_hoc_dir,
263+
emodels_in_repo,
264+
continu=False,
265+
n_processes=None,
266+
):
265267
"""Prepare the directories for the emodels.
266268
267269
Args:
@@ -278,6 +280,8 @@ def prepare_emodel_dirs(
278280
into separate subdirectories.
279281
continu: True if this BluePyMM run builds on a previous run, False
280282
otherwise. Default is False.
283+
n_processes: the integer number of processes. If `None`,
284+
all processes are going to be used.
281285
282286
Return:
283287
A dict mapping e-models to prepared e-model directories.
@@ -299,10 +303,23 @@ def prepare_emodel_dirs(
299303
emodels_in_repo,
300304
continu))
301305

302-
print('Parallelising preparation of e-model directories')
303-
pool = multiprocessing.Pool(maxtasksperchild=1)
304306
emodel_dirs = {}
305-
for emodel_dir_dict in pool.map(prepare_emodel_dir, arg_list, chunksize=1):
307+
emodel_dir_dicts = []
308+
309+
if n_processes == 1:
310+
for arg in arg_list:
311+
emodel_dir_dict = prepare_emodel_dir(arg)
312+
emodel_dir_dicts.append(emodel_dir_dict)
313+
else:
314+
print('Parallelising preparation of e-model directories')
315+
pool = multiprocessing.Pool(processes=n_processes,
316+
maxtasksperchild=1)
317+
for emodel_dir_dict in pool.map(prepare_emodel_dir, arg_list,
318+
chunksize=1):
319+
emodel_dir_dicts.append(emodel_dir_dict)
320+
321+
for emodel_dir_dict in emodel_dir_dicts:
306322
for emodel, emodel_dir in emodel_dir_dict.items():
307323
emodel_dirs[emodel] = emodel_dir
324+
308325
return emodel_dirs

bluepymm/run_combos/calculate_scores.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def expand_scores_to_score_values_table(scores_sqlite_filename):
321321

322322
def calculate_scores(final_dict, emodel_dirs, scores_db_filename,
323323
use_ipyp=False, ipyp_profile=None, timeout=10,
324-
use_apical_points=True):
324+
use_apical_points=True, n_processes=None):
325325
"""Calculate scores of e-model morphology combinations and update the
326326
database accordingly.
327327
@@ -335,6 +335,8 @@ def calculate_scores(final_dict, emodel_dirs, scores_db_filename,
335335
False.
336336
ipyp_profile: path to ipyparallel profile. Default is None.
337337
use_apical_points: boolean to use apical points or not
338+
n_processes: the integer number of processes. If `None`,
339+
all processes are going to be used.
338340
"""
339341

340342
print('Creating argument list for parallelisation')
@@ -344,16 +346,15 @@ def calculate_scores(final_dict, emodel_dirs, scores_db_filename,
344346
use_apical_points=use_apical_points)
345347

346348
print('Parallelising score evaluation of %d me-combos' % len(arg_list))
347-
348349
if use_ipyp:
349350
# use ipyparallel
350351
client = ipyparallel.Client(profile=ipyp_profile, timeout=timeout)
351-
lview = client.load_balanced_view()
352+
lview = client.load_balanced_view(targets=n_processes)
352353
results = lview.imap(run_emodel_morph_isolated,
353354
arg_list, ordered=False)
354355
else:
355356
# use multiprocessing
356-
pool = tools.NestedPool()
357+
pool = tools.NestedPool(processes=n_processes)
357358
results = pool.imap_unordered(run_emodel_morph_isolated, arg_list)
358359

359360
# keep track of the number of received results

bluepymm/run_combos/main.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ def add_parser(action):
4141
help='Path to ipyparallel profile')
4242
parser.add_argument('--timeout',
4343
help='Timeout for ipyparallel clients')
44+
parser.add_argument('--n_processes', help='number of processes',
45+
type=int)
4446

4547

46-
def run_combos_from_conf(conf_dict, ipyp=None, ipyp_profile=None, timeout=10):
48+
def run_combos_from_conf(conf_dict, ipyp=None, ipyp_profile=None, timeout=10,
49+
n_processes=None):
4750
"""Run combos from conf dictionary"""
4851
output_dir = conf_dict['output_dir']
4952
final_dict = tools.load_json(
@@ -69,13 +72,15 @@ def run_combos_from_conf(conf_dict, ipyp=None, ipyp_profile=None, timeout=10):
6972
use_ipyp=ipyp,
7073
ipyp_profile=ipyp_profile,
7174
timeout=timeout,
72-
use_apical_points=use_apical_points)
75+
use_apical_points=use_apical_points,
76+
n_processes=n_processes)
7377

7478

75-
def run_combos(conf_filename, ipyp=None, ipyp_profile=None):
79+
def run_combos(conf_filename, ipyp=None, ipyp_profile=None, n_processes=None):
7680
"""Run combos"""
7781

7882
print('Reading configuration at %s' % conf_filename)
7983
conf_dict = tools.load_json(conf_filename)
8084

81-
run_combos_from_conf(conf_dict, ipyp, ipyp_profile)
85+
run_combos_from_conf(conf_dict, ipyp, ipyp_profile,
86+
n_processes=n_processes)

bluepymm/select_combos/main.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,21 @@
3131
from . import process_megate_config as proc_config
3232

3333

34-
def select_combos(conf_filename):
34+
def select_combos(conf_filename, n_processes):
3535
"""Parse conf file and run select combos"""
3636
# Parse configuration file
3737
conf_dict = tools.load_json(conf_filename)
3838

39-
select_combos_from_conf(conf_dict)
39+
select_combos_from_conf(conf_dict, n_processes)
4040

4141

42-
def select_combos_from_conf(conf_dict):
42+
def select_combos_from_conf(conf_dict, n_processes=None):
4343
"""Compare scores of me-combinations to thresholds, select successful
4444
combinations, and write results out to file.
4545
4646
Args:
4747
conf_filename: filename of configuration (.json file)
48+
n_processes: integer number of processes, `None` will use all of them
4849
"""
4950
scores_db_filename = conf_dict['scores_db']
5051
pdf_filename = conf_dict['pdf_filename']
@@ -85,7 +86,8 @@ def select_combos_from_conf(conf_dict):
8586
scores, score_values,
8687
conf_dict.get('plot_emodels_per_morphology', False),
8788
output_dir,
88-
select_perc_best)
89+
select_perc_best,
90+
n_processes=n_processes)
8991
print('Wrote pdf to %s' % os.path.abspath(pdf_filename))
9092

9193
# write output files
@@ -112,3 +114,5 @@ def add_parser(action):
112114
parser = action.add_parser('select',
113115
help='Select feasible me-combinations')
114116
parser.add_argument('conf_filename')
117+
parser.add_argument('--n_processes', help='number of processes',
118+
type=int)

bluepymm/select_combos/reporting.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ def create_final_db_and_write_report(pdf_filename,
330330
score_values,
331331
enable_plot_emodels_per_morphology,
332332
output_dir,
333-
select_perc_best):
333+
select_perc_best,
334+
n_processes=None):
334335
"""Create the final output files and report"""
335336
ext_neurondb = pandas.DataFrame()
336337

@@ -362,7 +363,9 @@ def create_final_db_and_write_report(pdf_filename,
362363
megate_patterns,
363364
skip_repaired_exemplar,
364365
check_opt_scores,
365-
select_perc_best)
366+
select_perc_best,
367+
n_processes=n_processes
368+
)
366369

367370
print("All emodels processed, generating output files")
368371

bluepymm/select_combos/table_processing.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ def process_emodels(emodels,
269269
megate_patterns,
270270
skip_repaired_exemplar,
271271
enable_check_opt_scores,
272-
select_perc_best):
272+
select_perc_best,
273+
n_processes=None):
273274

274275
arg_list = [(emodel,
275276
scores,
@@ -280,16 +281,22 @@ def process_emodels(emodels,
280281
enable_check_opt_scores,
281282
select_perc_best) for emodel in emodels]
282283

283-
print('Parallelising selection processing of e-models')
284-
pool = multiprocessing.Pool(maxtasksperchild=1)
285284
emodel_infos = {}
286-
for emodel, emodel_info in pool.imap(process_emodel, arg_list,
287-
chunksize=1):
288-
print('Received processed info from e-model %s' % emodel)
289-
emodel_infos[emodel] = emodel_info
290285

291-
pool.terminate()
292-
pool.join()
286+
if n_processes == 1:
287+
for args in arg_list:
288+
emodel, emodel_info = process_emodel(args)
289+
emodel_infos[emodel] = emodel_info
290+
else:
291+
print('Parallelising selection processing of e-models')
292+
pool = multiprocessing.Pool(maxtasksperchild=1, processes=n_processes)
293+
for emodel, emodel_info in pool.imap(process_emodel, arg_list,
294+
chunksize=1):
295+
print('Received processed info from e-model %s' % emodel)
296+
emodel_infos[emodel] = emodel_info
297+
298+
pool.terminate()
299+
pool.join()
293300

294301
return emodel_infos
295302

bluepymm/tests/test_calculate_scores.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ def test_calculate_scores():
383383
final_dict,
384384
emodel_dirs,
385385
test_db_filename,
386-
use_ipyp=use_ipyp
386+
use_ipyp=use_ipyp,
387+
n_processes=1
387388
)
388389

389390
if use_ipyp:

bluepymm/tests/test_prepare_emodel_dirs.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,65 @@ def test_prepare_emodel_dir():
257257

258258

259259
@attr('unit')
260-
def test_prepare_emodel_dirs():
261-
"""prepare_combos.prepare_emodel_dirs: test prepare_emodel_dirs
262-
based on test example 'simple1'.
260+
def test_prepare_emodel_dirs_single_process():
261+
"""prepare_combos.prepare_emodel_dirs_single_process:
262+
test prepare_emodel_dirs based on test example 'simple1
263+
using a single process'.
264+
"""
265+
# create test directory, where output of this test will be created
266+
test_dir = os.path.join(TMP_DIR, 'test_prepare_emodel_dirs')
267+
tools.makedirs(test_dir)
268+
269+
# input parameters
270+
final_dict = {'emodel1': {'main_path': '.',
271+
'seed': 2,
272+
'rank': 0,
273+
'notes': '',
274+
'branch': 'emodel1',
275+
'params': {'cm': 1.0},
276+
'fitness': {'Step1.SpikeCount': 20.0},
277+
'score': 104.72906197480131,
278+
'morph_path': 'morphologies/morph1.asc'},
279+
'emodel2': {'main_path': '.',
280+
'seed': 2,
281+
'rank': 0,
282+
'notes': '',
283+
'branch': 'emodel2',
284+
'params': {'cm': 0.5},
285+
'fitness': {'Step1.SpikeCount': 20.0},
286+
'score': 104.72906197480131,
287+
'morph_path': 'morphologies/morph2.asc'}}
288+
emodel_etype_map = {'emodel1': {'mm_recipe': 'emodel1',
289+
'etype': 'etype1',
290+
'layer': ['1', 'str1']},
291+
'emodel2': {'mm_recipe': 'emodel2',
292+
'etype': 'etype2',
293+
'layer': ['1', '2']}}
294+
emodels_dir = os.path.join(test_dir, 'tmp/emodels/')
295+
opt_dir = os.path.join(TEST_DATA_DIR, 'data/emodels_dir/subdir/')
296+
emodels_hoc_dir = os.path.join(test_dir, './output/emodels_hoc/')
297+
emodels_in_repo = False
298+
continu = False
299+
300+
# run function
301+
with tools.cd(TEST_DATA_DIR):
302+
ret = prepare_emodel_dirs.prepare_emodel_dirs(
303+
final_dict, emodel_etype_map, emodels_dir, opt_dir,
304+
emodels_hoc_dir, emodels_in_repo, continu, n_processes=1)
305+
306+
# verify output
307+
expected_ret = {emodel: os.path.join(
308+
emodels_dir, emodel) for emodel in final_dict}
309+
nt.assert_dict_equal(ret, expected_ret)
310+
nt.assert_true(os.path.isdir(emodels_dir))
311+
nt.assert_true(os.path.isdir(emodels_hoc_dir))
312+
313+
314+
@attr('unit')
315+
def test_prepare_emodel_dirs_multi_process():
316+
"""prepare_combos.prepare_emodel_dirs_multi_process:
317+
test prepare_emodel_dirs based on test example 'simple1'
318+
using multiprocessing.
263319
"""
264320
# create test directory, where output of this test will be created
265321
test_dir = os.path.join(TMP_DIR, 'test_prepare_emodel_dirs')
@@ -300,7 +356,7 @@ def test_prepare_emodel_dirs():
300356
with tools.cd(TEST_DATA_DIR):
301357
ret = prepare_emodel_dirs.prepare_emodel_dirs(
302358
final_dict, emodel_etype_map, emodels_dir, opt_dir,
303-
emodels_hoc_dir, emodels_in_repo, continu)
359+
emodels_hoc_dir, emodels_in_repo, continu, n_processes=None)
304360

305361
# verify output
306362
expected_ret = {emodel: os.path.join(

0 commit comments

Comments
 (0)