Skip to content

Commit cee7479

Browse files
GitHKAndrei Neagu
andauthored
🐛addresses blocking Thread/Process pool executors when shutting down (ITISFoundation#2397)
* refactor, no destruction is necessary * avoids destroying the pool and blocking * using non blocking process pool * replaces blocking processpool * replaces blocking processpoolexecutor * refactored * using try finally to ensure cleanup * refactored * pylint * fixing error * fixing pool creation * extracted pools module * marked as duplicate * refactored to use new shared code * transformed int a generator * fix error * it can be waited here * using system default threadpool * removing unused * removing unused * added testing dependency * cpython issue was casing a hang * added regression tests for pools * updated comments * fix pylint * cleanup comments * downgraded * fixed tests * update codestyle * updated non_blocking_process_pool_executor in catalog * adds tests and more comments * added fixme notes Co-authored-by: Andrei Neagu <[email protected]>
1 parent 3ab3bf6 commit cee7479

File tree

22 files changed

+172
-54
lines changed

22 files changed

+172
-54
lines changed

packages/service-library/requirements/_base.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# This file is autogenerated by pip-compile
2+
# This file is autogenerated by pip-compile with python 3.8
33
# To update, run:
44
#
55
# pip-compile --output-file=requirements/_base.txt requirements/_base.in
@@ -67,10 +67,10 @@ pydantic==1.8.2
6767
# via
6868
# -c requirements/../../../requirements/constraints.txt
6969
# -r requirements/_base.in
70-
pyinstrument-cext==0.2.4
71-
# via pyinstrument
7270
pyinstrument==3.4.2
7371
# via -r requirements/_base.in
72+
pyinstrument-cext==0.2.4
73+
# via pyinstrument
7474
pyrsistent==0.17.3
7575
# via jsonschema
7676
pyyaml==5.4.1

packages/service-library/requirements/_test.txt

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# This file is autogenerated by pip-compile
2+
# This file is autogenerated by pip-compile with python 3.8
33
# To update, run:
44
#
55
# pip-compile --output-file=requirements/_test.txt requirements/_test.in
@@ -49,12 +49,12 @@ cryptography==3.4.7
4949
# paramiko
5050
distro==1.5.0
5151
# via docker-compose
52+
docker[ssh]==5.0.0
53+
# via docker-compose
5254
docker-compose==1.29.1
5355
# via
5456
# -c requirements/../../../requirements/constraints.txt
5557
# pytest-docker
56-
docker[ssh]==5.0.0
57-
# via docker-compose
5858
dockerpty==0.4.1
5959
# via docker-compose
6060
docopt==0.6.2
@@ -68,7 +68,7 @@ idna==2.10
6868
# yarl
6969
iniconfig==1.1.1
7070
# via pytest
71-
isort==5.9.0
71+
isort==5.9.1
7272
# via pylint
7373
jsonschema==3.2.0
7474
# via
@@ -107,6 +107,15 @@ pyrsistent==0.17.3
107107
# via
108108
# -c requirements/_base.txt
109109
# jsonschema
110+
pytest==6.2.4
111+
# via
112+
# -r requirements/_test.in
113+
# pytest-aiohttp
114+
# pytest-cov
115+
# pytest-docker
116+
# pytest-instafail
117+
# pytest-mock
118+
# pytest-sugar
110119
pytest-aiohttp==0.3.0
111120
# via -r requirements/_test.in
112121
pytest-cov==2.12.1
@@ -121,15 +130,6 @@ pytest-runner==5.3.1
121130
# via -r requirements/_test.in
122131
pytest-sugar==0.9.4
123132
# via -r requirements/_test.in
124-
pytest==6.2.4
125-
# via
126-
# -r requirements/_test.in
127-
# pytest-aiohttp
128-
# pytest-cov
129-
# pytest-docker
130-
# pytest-instafail
131-
# pytest-mock
132-
# pytest-sugar
133133
python-dotenv==0.18.0
134134
# via docker-compose
135135
pyyaml==5.4.1

packages/service-library/requirements/_tools.txt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# This file is autogenerated by pip-compile
2+
# This file is autogenerated by pip-compile with python 3.8
33
# To update, run:
44
#
55
# pip-compile --output-file=requirements/_tools.txt requirements/_tools.in
@@ -24,7 +24,7 @@ filelock==3.0.12
2424
# via virtualenv
2525
identify==2.2.10
2626
# via pre-commit
27-
isort==5.9.0
27+
isort==5.9.1
2828
# via
2929
# -c requirements/_test.txt
3030
# -r requirements/../../../requirements/devenv.txt
@@ -36,7 +36,7 @@ pathspec==0.8.1
3636
# via black
3737
pep517==0.10.0
3838
# via pip-tools
39-
pip-tools==6.1.0
39+
pip-tools==6.2.0
4040
# via -r requirements/../../../requirements/devenv.txt
4141
pre-commit==2.13.0
4242
# via -r requirements/../../../requirements/devenv.txt
@@ -61,6 +61,9 @@ toml==0.10.2
6161
# pre-commit
6262
virtualenv==20.4.7
6363
# via pre-commit
64+
wheel==0.36.2
65+
# via pip-tools
6466

6567
# The following packages are considered to be unsafe in a requirements file:
6668
# pip
69+
# setuptools

packages/service-library/src/servicelib/archiving_utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import asyncio
22
import logging
33
import zipfile
4-
from concurrent.futures import ProcessPoolExecutor
54
from pathlib import Path
65
from typing import Iterator, List, Set
76

7+
from servicelib.pools import non_blocking_process_pool_executor
8+
89
MAX_UNARCHIVING_WORKER_COUNT = 2
910

1011
log = logging.getLogger(__name__)
@@ -81,7 +82,7 @@ async def unarchive_dir(
8182
all tree leafs, which might include files or empty folders
8283
"""
8384
with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler:
84-
with ProcessPoolExecutor(max_workers=max_workers) as pool:
85+
with non_blocking_process_pool_executor(max_workers=max_workers) as pool:
8586
loop = asyncio.get_event_loop()
8687

8788
# running in process poll is not ideal for concurrency issues
@@ -141,7 +142,7 @@ async def archive_dir(
141142
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
142143
) -> bool:
143144
"""Returns True if successuly archived"""
144-
with ProcessPoolExecutor(max_workers=1) as pool:
145+
with non_blocking_process_pool_executor(max_workers=1) as pool:
145146
return await asyncio.get_event_loop().run_in_executor(
146147
pool,
147148
_serial_add_to_archive,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from concurrent.futures import ProcessPoolExecutor
2+
from contextlib import contextmanager
3+
4+
# only gets created on use and is guaranteed to be the s
5+
# ame for the entire lifetime of the application
6+
__shared_process_pool_executor = {}
7+
8+
9+
def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
10+
# sometimes a pool requires a specific configuration
11+
# the key helps to distinguish between them in the same application
12+
key = "".join(sorted(["_".join((k, str(v))) for k, v in kwargs.items()]))
13+
14+
if key not in __shared_process_pool_executor:
15+
# pylint: disable=consider-using-with
16+
__shared_process_pool_executor[key] = ProcessPoolExecutor(**kwargs)
17+
18+
return __shared_process_pool_executor[key]
19+
20+
21+
@contextmanager
22+
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
23+
"""
24+
Avoids default context manger behavior which calls
25+
shutdown with wait=True an blocks.
26+
"""
27+
executor = get_shared_process_pool_executor(**kwargs)
28+
try:
29+
yield executor
30+
finally:
31+
# due to an issue in cpython https://bugs.python.org/issue34073
32+
# bypassing shutdown and using a shared pool
33+
# remove call to get_shared_process_pool_executor and replace with
34+
# a new instance when the issue is fixed
35+
# FIXME: uncomment below line when the issue is fixed
36+
# executor.shutdown(wait=False)
37+
pass

packages/service-library/src/servicelib/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import asyncio
88
import logging
99
import os
10+
1011
from pathlib import Path
1112
from typing import Any, Awaitable, Coroutine, List, Optional, Union
1213

packages/service-library/tests/test_application_setup.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import pytest
99
from aiohttp import web
10-
1110
from servicelib.application_keys import APP_CONFIG_KEY
1211
from servicelib.application_setup import (
1312
APP_SETUP_KEY,
@@ -37,7 +36,12 @@ def setup_zee(app: web.Application, arg1, kargs=55):
3736

3837

3938
@app_module_setup(
40-
"package.needs_foo", ModuleCategory.SYSTEM, depends=["package.foo",], logger=log
39+
"package.needs_foo",
40+
ModuleCategory.SYSTEM,
41+
depends=[
42+
"package.foo",
43+
],
44+
logger=log,
4145
)
4246
def setup_needs_foo(app: web.Application, arg1, kargs=55):
4347
return True

packages/service-library/tests/test_archiving_utils_extra.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def print_tree(path: Path, level=0):
2323

2424
@pytest.fixture
2525
def state_dir(tmp_path) -> Path:
26-
""" Folder with some data, representing a given state"""
26+
"""Folder with some data, representing a given state"""
2727
base_dir = tmp_path / "original"
2828
base_dir.mkdir()
2929
(base_dir / "empty").mkdir()
@@ -55,7 +55,7 @@ def state_dir(tmp_path) -> Path:
5555

5656
@pytest.fixture
5757
def new_state_dir(tmp_path) -> Path:
58-
""" Folder AFTER updated with new data """
58+
"""Folder AFTER updated with new data"""
5959
base_dir = tmp_path / "updated"
6060
base_dir.mkdir()
6161
(base_dir / "d1").mkdir()

packages/service-library/tests/test_incidents_monitoring.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77

88
import pytest
9-
109
from servicelib import monitor_slow_callbacks
1110
from servicelib.aiopg_utils import (
1211
DatabaseError,

packages/service-library/tests/test_incidents_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import operator
66

77
import attr
8-
98
from servicelib.incidents import BaseIncident, LimitedOrderedStack
109

1110

0 commit comments

Comments
 (0)