Skip to content

Commit 9dde38a

Browse files
Update doc, README and author
Change-Id: Ia625406e1c9acaa0601076c9707fc9d5a8e90114
1 parent 592efdf commit 9dde38a

File tree

7 files changed

+119
-68
lines changed

7 files changed

+119
-68
lines changed

README.rst

Lines changed: 78 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,91 @@
1-
# BluePyParallel: Bluebrain Python Embarassingly Parallel library
1+
BluePyParallel: Bluebrain Python Embarassingly Parallel library
2+
===============================================================
23

34

45
Introduction
5-
============
6+
------------
7+
8+
Provides an embarassingly parallel tool with sql backend, inspired by `BluePyMM <https://github.com/BlueBrain/BluePyMM>`_.
9+
10+
11+
Installation
12+
------------
13+
14+
This package should be installed using pip:
15+
16+
.. code-block:: bash
17+
18+
pip install bluepyparallel
19+
20+
21+
Usage
22+
-----
23+
24+
General computation
25+
~~~~~~~~~~~~~~~~~~~
26+
27+
.. code-block:: python
28+
29+
factory_name = "multiprocessing" # Can also be None, dask or ipyparallel
30+
batch_size = 10 # This value is used to split the data into batches before processing them
31+
chunk_size = 1000 # This value is used to gather the elements to process before sending them to the workers
32+
33+
# Setup the parallel factory
34+
parallel_factory = init_parallel_factory(
35+
factory_name,
36+
batch_size=batch_size,
37+
chunk_size=chunk_size,
38+
processes=4, # This parameter is specific to the multiprocessing factory
39+
)
40+
41+
# Get the mapper from the factory
42+
mapper = parallel_factory.get_mapper()
43+
44+
# Use the mapper to map the given function to each element of mapped_data and gather the results
45+
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))
46+
47+
48+
Working with Pandas and SQL backend
49+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
50+
51+
This library provides a specific function working with large :class:`pandas.DataFrame`: :func:`bluepyparallel.evaluator.evaluate`.
52+
This function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results.
53+
As it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend.
54+
The SQL backend uses the `SQLAlchemy <https://docs.sqlalchemy.org/en/latest>`_ library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...).
55+
To activate this feature, just pass a `URL that can be processed by SQLAlchemy <https://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=url#database-urls>`_ to the ``db_url`` parameter of :func:`bluepyparallel.evaluator.evaluate`.
56+
57+
.. note:: A specific driver might have to be installed to access the database (like `psycopg2 <https://www.psycopg.org/docs/>`_ for PostgreSQL for example).
58+
59+
Example:
60+
61+
.. code-block:: python
62+
63+
# Use the mapper to map the given function to each element of the DataFrame
64+
result_df = evaluate(
65+
input_df, # This is the DataFrame to process
66+
evaluation_function, # This is the function that should be apllied to each row of the DataFrame
67+
parallel_factory="multiprocessing", # This could also be a Factory previously defined
68+
db_url="sqlite:///db.sql", # This could also just be "db.sql" and would be automatically turned to SQLite URL
69+
)
70+
71+
Now, if the computation crashed for any reason, the partial result is stored in the ``db.sql`` file.
72+
If the crash was due to an external cause (therefore executing the code again should work), it is possible to resume the
73+
computation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.
674

7-
Provides an embarassingly parallel tool with sql backend.
875

976
Running using Dask
10-
==================
77+
------------------
1178

12-
This is an example of a sbatch script that can be adapted to execute the script using multiple nodes and workers.
79+
This is an example of a `sbatch <https://slurm.schedmd.com/sbatch.html>`_ script that can be adapted to execute the script using multiple nodes and workers.
80+
In this example, the code called by the ``<command>`` should parallelized using BluePyParallel.
1381

1482
Dask variables are not strictly required, but highly recommended, and they can be fine tuned.
1583

1684

1785
.. code:: bash
1886
1987
#!/bin/bash -l
20-
#SBATCH --nodes=2 # Number of nodes
21-
#SBATCH --time=24:00:00 # Time limit
22-
#SBATCH --partition=prod # Submit to the production 'partition'
23-
#SBATCH --constraint=cpu # Constraint the job to run on nodes with/without SSDs. If you want SSD, use only "nvme". If you want KNLs then "knl"
24-
#SBATCH --exclusive # only if you need to allocate whole node
25-
#SBATCH --mem=0
26-
#SBATCH --ntasks-per-node=72 # no of mpi ranks to use per node
27-
#SBATCH --account=projXX # your project number
28-
#SBATCH --job-name=myscript
29-
#SBATCH --output=myscript_out_%j
30-
#SBATCH --error=myscript_err_%j
31-
set -e
32-
33-
module purge
34-
module load unstable hpe-mpi
35-
module unload unstable
36-
37-
unset PMI_RANK # for neuron
38-
88+
3989
# Dask configuration
4090
export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
4191
export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
@@ -48,27 +98,9 @@ Dask variables are not strictly required, but highly recommended, and they can b
4898
# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
4999
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms # Time between statistical profiling queries
50100
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms # Time between starting new profile
51-
101+
52102
# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
53103
export PARALLEL_BATCH_SIZE=1000
54-
55-
# Script parameters
56-
OUTPUT="/path/to/mecombo_emodel.tsv"
57-
CIRCUIT_CONFIG="/gpfs/bbp.cscs.ch/project/proj68/circuits/Isocortex/20190307/CircuitConfig"
58-
MORPHOLOGY_PATH="/gpfs/bbp.cscs.ch/project/proj68/circuits/Isocortex/20190307/morphologies"
59-
RELEASE_PATH="emodel_release"
60-
N_CELLS=100
61-
MTYPE="L5_TPC:A"
62-
63-
# load the virtual env (alternatively, load the required modules)
64-
source ~/venv/3.7.4-BluePyEModel/bin/activate
65-
66-
srun -v \
67-
BluePyEModel -v get_me_combos_parameters \
68-
--circuit-config "$CIRCUIT_CONFIG" \
69-
--morphology-path "$MORPHOLOGY_PATH" \
70-
--release-path "$RELEASE_PATH" \
71-
--output "$OUTPUT" \
72-
--n-cells "$N_CELLS" \
73-
--mtype "$MTYPE" \
74-
--parallel-lib dask
104+
105+
srun -v <command>
106+

bluepyparallel/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class DataBase:
2626
2727
Args:
2828
url (str): The URL of the database following the RFC-1738 format (
29-
https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls)
29+
https://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls)
3030
create (bool): If set to True, the database will be automatically created by the
3131
constructor.
3232
args and kwargs: They will be passed to the :func:`sqlalchemy.create_engine` function.

bluepyparallel/evaluator.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,20 @@ def evaluate(
4040
"""Evaluate and save results in a sqlite database on the fly and return dataframe.
4141
4242
Args:
43-
df (DataFrame): each row contains information for the computation.
44-
evaluation_function (function): function used to evaluate each row,
43+
df (pandas.DataFrame): each row contains information for the computation.
44+
evaluation_function (callable): function used to evaluate each row,
4545
should have a single argument as list-like containing values of the rows of df,
4646
and return a dict with keys corresponding to the names in new_columns.
4747
new_columns (list): list of names of new column and empty value to save evaluation results,
48-
i.e.: [['result', 0.0], ['valid', False]].
49-
resume (bool): if True, it will use only compute the empty rows of the database,
50-
if False, it will ecrase or generate the database.
51-
parallel_factory (ParallelFactory): parallel factory instance.
52-
db_url (str): should be DB URL that can be interpreted by SQLAlchemy or can be a file path
53-
that is interpreted as a SQLite database. If an URL is given, the SQL backend will be
54-
enabled to store results and allowing future resume. Should not be used when
55-
evaluations are numerous and fast, in order to avoid the overhead of communication with
56-
SQL database.
48+
i.e.: :code:`[['result', 0.0], ['valid', False]]`.
49+
resume (bool): if :obj:`True` and ``db_url`` is provided, it will use only compute the
50+
missing rows of the database.
51+
parallel_factory (ParallelFactory or str): parallel factory name or instance.
52+
db_url (str): should be DB URL that can be interpreted by :func:`sqlalchemy.create_engine`
53+
or can be a file path that is interpreted as a SQLite database. If an URL is given,
54+
the SQL backend will be enabled to store results and allowing future resume. Should
55+
not be used when evaluations are numerous and fast, in order to avoid the overhead of
56+
communication with the SQL database.
5757
func_args (list): the arguments to pass to the evaluation_function.
5858
func_kwargs (dict): the keyword arguments to pass to the evaluation_function.
5959
**mapper_kwargs: the keyword arguments are passed to the get_mapper() method of the

bluepyparallel/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Package version"""
22
# pragma: no cover
3-
VERSION = "0.0.3"
3+
VERSION = "0.0.4.dev0"

doc/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,5 @@
7979
"pandas": ("https://pandas.pydata.org/docs", None),
8080
"dask": ("https://docs.dask.org/en/latest/", None),
8181
"ipyparallel": ("https://ipyparallel.readthedocs.io/en/latest/", None),
82+
"sqlalchemy": ("https://docs.sqlalchemy.org/en/latest/", None),
8283
}

examples/run_large_dask.sh

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
#!/bin/bash -l
2-
#SBATCH --nodes=1 # Number of nodes
3-
#SBATCH --time=00:10:00 # Time limit
4-
#SBATCH --partition=prod
5-
#SBATCH --constraint=cpu
6-
#SBATCH --mem=0
7-
#SBATCH --cpus-per-task=1
8-
#SBATCH --account=proj82 # your project number
9-
#SBATCH --job-name=test_bpp
2+
3+
# SBATCH --nodes=1 # Number of nodes
4+
# SBATCH --time=00:10:00 # Time limit
5+
# SBATCH --partition=prod
6+
# SBATCH --constraint=cpu
7+
# SBATCH --mem=0
8+
# SBATCH --cpus-per-task=1
9+
# SBATCH --account=proj82 # your project number
10+
# SBATCH --job-name=test_bpp
11+
12+
# # Dask configuration
13+
# export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
14+
# export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
15+
# export DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False # don't spill to disk
16+
# export DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False # don't spill to disk
17+
# export DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80 # pause execution at 80% memory use
18+
# export DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95 # restart the worker at 95% use
19+
# export DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn
20+
# export DASK_DISTRIBUTED__WORKER__DAEMON=True
21+
# # Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
22+
# export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms # Time between statistical profiling queries
23+
# export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms # Time between starting new profile
24+
25+
# # Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
26+
# export PARALLEL_BATCH_SIZE=1000
27+
1028
set -e
1129

1230

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
setup(
3232
name="BluePyParallel",
33-
author="BlueBrain cells",
33+
author="bbp-ou-cells",
3434
author_email="[email protected]",
3535
version=VERSION,
3636
description="Provides an embarassingly parallel tool with sql backend",

0 commit comments

Comments
 (0)