Skip to content

Commit 3ca7b7d

Browse files
Handle deconnections and fix requirements
1 parent a088540 commit 3ca7b7d

File tree

7 files changed

+37
-26
lines changed

7 files changed

+37
-26
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ htmlcov/
7575
.tox/
7676
.coverage
7777
.coverage.*
78+
.coverage-*
7879
.cache
7980
nosetests.xml
8081
coverage.xml

.gitlab-ci.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ include:
1313
file:
1414
- /ci/jobs/auto-release.yml
1515
- /ci/lib/pytest-template.yml
16-
- /ci/jobs/py36.yml
1716
- /ci/jobs/py37.yml
1817
- /ci/jobs/py38.yml
18+
- /ci/jobs/py39.yml
1919
- /ci/jobs/coverage.yml
2020

2121
.tox-template:
2222
variables:
23-
EXTRA_MODULES: archive/2021-05:py-mpi4py
24-
bb5_cpus_per_task: 6
23+
EXTRA_MODULES:
24+
archive/2021-12:py-mpi4py
25+
bb5_cpus_per_task: 8
26+
bb5_memory: 6G

bluepyparallel/database.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,16 @@ def __del__(self):
5858
@property
5959
def connection(self):
6060
"""Get a connection to the database."""
61+
try:
62+
if self._connection.connection.dbapi_connection is None:
63+
self._connection.close()
64+
self._connection = None
65+
except AttributeError:
66+
self._connection = None
67+
6168
if self._connection is None:
6269
self._connection = self.engine.connect()
70+
6371
return self._connection
6472

6573
def get_url(self):

bluepyparallel/parallel.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ def get_mapper(self, batch_size=None, chunk_size=None, **kwargs):
133133
"""Get a map."""
134134

135135
def _mapper(func, iterable, *func_args, **func_kwargs):
136-
func = self.mappable_func(func, *func_args, **func_kwargs)
137-
return self._with_batches(map, func, iterable)
136+
mapped_func = self.mappable_func(func, *func_args, **func_kwargs)
137+
return self._with_batches(map, mapped_func, iterable)
138138

139139
return _mapper
140140

@@ -156,10 +156,10 @@ def get_mapper(self, batch_size=None, chunk_size=None, **kwargs):
156156
self._chunksize_to_kwargs(chunk_size, kwargs, label="chunksize")
157157

158158
def _mapper(func, iterable, *func_args, **func_kwargs):
159-
func = self.mappable_func(func, *func_args, **func_kwargs)
159+
mapped_func = self.mappable_func(func, *func_args, **func_kwargs)
160160
return self._with_batches(
161161
partial(self.pool.imap_unordered, **kwargs),
162-
func,
162+
mapped_func,
163163
iterable,
164164
)
165165

@@ -195,17 +195,19 @@ def get_mapper(self, batch_size=None, chunk_size=None, **kwargs):
195195
self._chunksize_to_kwargs(chunk_size, kwargs)
196196

197197
def _mapper(func, iterable, *func_args, **func_kwargs):
198-
func = self.mappable_func(func, *func_args, **func_kwargs)
198+
mapped_func = self.mappable_func(func, *func_args, **func_kwargs)
199199
return self._with_batches(
200-
partial(self.lview.imap, **kwargs), func, iterable, batch_size=batch_size
200+
partial(self.lview.imap, **kwargs), mapped_func, iterable, batch_size=batch_size
201201
)
202202

203203
return _mapper
204204

205205
def shutdown(self):
206206
"""Remove zmq."""
207-
if self.rc is not None: # pragma: no cover
207+
try:
208208
self.rc.close()
209+
except Exception: # pragma: no cover ; pylint: disable=broad-except
210+
pass
209211

210212

211213
class DaskFactory(ParallelFactory):
@@ -256,13 +258,13 @@ def get_mapper(self, batch_size=None, chunk_size=None, **kwargs):
256258
self._chunksize_to_kwargs(chunk_size, kwargs, label="batch_size")
257259

258260
def _mapper(func, iterable, *func_args, **func_kwargs):
259-
def _dask_mapper(func, iterable):
260-
futures = self.client.map(func, iterable, **kwargs)
261+
def _dask_mapper(in_dask_func, iterable):
262+
futures = self.client.map(in_dask_func, iterable, **kwargs)
261263
for _future, result in dask.distributed.as_completed(futures, with_results=True):
262264
yield result
263265

264-
func = self.mappable_func(func, *func_args, **func_kwargs)
265-
return self._with_batches(_dask_mapper, func, iterable, batch_size=batch_size)
266+
mapped_func = self.mappable_func(func, *func_args, **func_kwargs)
267+
return self._with_batches(_dask_mapper, mapped_func, iterable, batch_size=batch_size)
266268

267269
return _mapper
268270

setup.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
11
"""Setup for the BluePyParallel package."""
22
import imp
3-
import sys
43

54
from setuptools import find_packages
65
from setuptools import setup
76

8-
if sys.version_info < (3, 6):
9-
sys.exit("Sorry, Python < 3.6 is not supported")
10-
117
# Read the contents of the README file
128
with open("README.rst", encoding="utf-8") as f:
139
README = f.read()
1410

1511
reqs = [
1612
"pandas",
17-
"ipyparallel",
13+
"ipyparallel<7",
1814
"dask[dataframe, distributed]>=2.30",
1915
"dask-mpi>=2.20",
20-
"sqlalchemy<1.4",
16+
"sqlalchemy>1.4",
2117
"sqlalchemy-utils",
2218
"tqdm",
2319
]
@@ -49,12 +45,12 @@
4945
"docs": doc_reqs,
5046
},
5147
packages=find_packages(exclude=["tests"]),
52-
python_requires=">=3.6",
48+
python_requires=">=3.7",
5349
classifiers=[
5450
"Programming Language :: Python",
5551
"Programming Language :: Python :: 3",
56-
"Programming Language :: Python :: 3.6",
5752
"Programming Language :: Python :: 3.7",
5853
"Programming Language :: Python :: 3.8",
54+
"Programming Language :: Python :: 3.9",
5955
],
6056
)

tests/test_evaluator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ def _interrupting_function(row, *args, **kwargs):
3131
return _evaluation_function(row, *args, **kwargs)
3232

3333

34-
def _slow_function(row, *args, sleep_time=0.02, **kwargs):
34+
def _slow_function(row, *args, **kwargs):
3535
"""Mock evaluation function."""
3636
if "sleep_time" in row:
3737
sleep_time = row["sleep_time"]
38+
else:
39+
sleep_time = kwargs.get("sleep_time", 0.02)
3840
time.sleep(sleep_time)
3941
return _evaluation_function(row, *args, **kwargs)
4042

tox.ini

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ testdeps =
77
pytest-benchmark
88
pytest-cov
99
pytest-html
10-
pytest-xdist
1110

1211
[tox]
1312
envlist =
@@ -23,11 +22,10 @@ setenv =
2322
COVERAGE_FILE = {env:COVERAGE_FILE:.coverage-{envname}}
2423
deps = {[base]testdeps}
2524
commands_pre =
26-
- ipcluster stop --cluster-id={[base]name}_{envname} --debug
25+
- ipcluster stop --cluster-id={[base]name}_{envname} --signal 9 --debug
2726
ipcluster start -n 2 --daemonize --log-to-file --cluster-id={[base]name}_{envname} --debug
2827
commands =
2928
pytest \
30-
-n 2 \
3129
--basetemp={envtmpdir} \
3230
--cov={[base]name} \
3331
--cov-branch \
@@ -77,6 +75,7 @@ commands_pre =
7775
commands_post =
7876

7977
[testenv:lint]
78+
basepython = python3.7
8079
deps =
8180
{[base]testdeps}
8281
pycodestyle
@@ -94,6 +93,7 @@ commands_pre =
9493
commands_post =
9594

9695
[testenv:format]
96+
basepython = python3.7
9797
skip_install = true
9898
deps =
9999
black

0 commit comments

Comments
 (0)