Skip to content

Commit 92e6120

Browse files
authored
Init celery application in app once + run call to blocking call in executor (ITISFoundation#2372)
* move celery client where it is used --> activity manager * use pytest-celery to mock celery app
1 parent dc4c691 commit 92e6120

File tree

10 files changed

+145
-32
lines changed

10 files changed

+145
-32
lines changed

services/web/server/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
coverage
1212
pytest
1313
pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76
14+
pytest-celery
1415
pytest-cov
1516
pytest-docker
1617
pytest-icdiff

services/web/server/requirements/_test.txt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ aioresponses==0.7.2
1313
# via -r requirements/_test.in
1414
alembic==1.6.2
1515
# via -r requirements/_test.in
16+
amqp==5.0.6
17+
# via
18+
# -c requirements/_base.txt
19+
# kombu
1620
astroid==2.5
1721
# via pylint
1822
async-timeout==3.0.1
@@ -28,8 +32,16 @@ attrs==20.3.0
2832
# pytest-docker
2933
bcrypt==3.2.0
3034
# via paramiko
35+
billiard==3.6.4.0
36+
# via
37+
# -c requirements/_base.txt
38+
# celery
3139
cached-property==1.5.2
3240
# via docker-compose
41+
celery[redis]==5.0.5
42+
# via
43+
# -c requirements/_base.txt
44+
# pytest-celery
3345
certifi==2020.12.5
3446
# via requests
3547
cffi==1.14.5
@@ -43,10 +55,26 @@ chardet==3.0.4
4355
# -c requirements/_base.txt
4456
# aiohttp
4557
# requests
58+
click-didyoumean==0.0.3
59+
# via
60+
# -c requirements/_base.txt
61+
# celery
62+
click-plugins==1.1.1
63+
# via
64+
# -c requirements/_base.txt
65+
# celery
66+
click-repl==0.1.6
67+
# via
68+
# -c requirements/_base.txt
69+
# celery
4670
click==7.1.2
4771
# via
4872
# -c requirements/_base.txt
4973
# -r requirements/_test.in
74+
# celery
75+
# click-didyoumean
76+
# click-plugins
77+
# click-repl
5078
codecov==2.1.11
5179
# via -r requirements/_test.in
5280
coverage[toml]==5.5
@@ -96,6 +124,7 @@ importlib-metadata==4.0.1
96124
# via
97125
# -c requirements/_base.txt
98126
# jsonschema
127+
# kombu
99128
# pluggy
100129
# pytest
101130
iniconfig==1.1.1
@@ -113,6 +142,10 @@ jsonschema==3.2.0
113142
# docker-compose
114143
# openapi-schema-validator
115144
# openapi-spec-validator
145+
kombu==5.0.2
146+
# via
147+
# -c requirements/_base.txt
148+
# celery
116149
lazy-object-proxy==1.4.3
117150
# via
118151
# -c requirements/_base.txt
@@ -148,6 +181,10 @@ pluggy==0.13.1
148181
# via pytest
149182
pprintpp==0.4.0
150183
# via pytest-icdiff
184+
prompt-toolkit==3.0.18
185+
# via
186+
# -c requirements/_base.txt
187+
# click-repl
151188
psycopg2-binary==2.8.6
152189
# via
153190
# -c requirements/_base.txt
@@ -172,6 +209,8 @@ pyrsistent==0.17.3
172209
# jsonschema
173210
pytest-aiohttp==0.3.0
174211
# via -r requirements/_test.in
212+
pytest-celery==0.0.0
213+
# via -r requirements/_test.in
175214
pytest-cov==2.12.0
176215
# via -r requirements/_test.in
177216
pytest-docker==0.10.1
@@ -206,6 +245,10 @@ python-dotenv==0.17.1
206245
# docker-compose
207246
python-editor==1.0.4
208247
# via alembic
248+
pytz==2021.1
249+
# via
250+
# -c requirements/_base.txt
251+
# celery
209252
pyyaml==5.4.1
210253
# via
211254
# -c requirements/../../../../requirements/constraints.txt
@@ -216,6 +259,7 @@ redis==3.5.3
216259
# via
217260
# -c requirements/_base.txt
218261
# -r requirements/_test.in
262+
# celery
219263
requests==2.25.1
220264
# via
221265
# codecov
@@ -226,6 +270,7 @@ six==1.16.0
226270
# via
227271
# -c requirements/_base.txt
228272
# bcrypt
273+
# click-repl
229274
# dockerpty
230275
# isodate
231276
# jsonschema
@@ -267,6 +312,15 @@ urllib3==1.26.4
267312
# via
268313
# -c requirements/../../../../requirements/constraints.txt
269314
# requests
315+
vine==5.0.0
316+
# via
317+
# -c requirements/_base.txt
318+
# amqp
319+
# celery
320+
wcwidth==0.2.5
321+
# via
322+
# -c requirements/_base.txt
323+
# prompt-toolkit
270324
websocket-client==0.59.0
271325
# via
272326
# docker
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import logging
2+
from typing import Optional
3+
4+
from aiohttp import web
5+
from celery import Celery
6+
from tenacity import AsyncRetrying, before_log, stop_after_attempt, wait_fixed
7+
8+
from ..computation_config import ComputationSettings, get_settings
9+
from .config import CONFIG_SECTION_NAME
10+
11+
__APP_CLIENT_CELERY_CLIENT_KEY = ".".join(
12+
[__name__, CONFIG_SECTION_NAME, "celery_client"]
13+
)
14+
15+
log = logging.getLogger(__name__)
16+
17+
retry_upon_init_policy = dict(
18+
stop=stop_after_attempt(4),
19+
wait=wait_fixed(1.5),
20+
before=before_log(log, logging.WARNING),
21+
reraise=True,
22+
)
23+
24+
25+
def _get_computation_settings(app: web.Application) -> ComputationSettings:
26+
return get_settings(app)
27+
28+
29+
async def _celery_app(app: web.Application):
30+
comp_settings: ComputationSettings = _get_computation_settings(app)
31+
32+
celery_app: Optional[Celery] = None
33+
async for attempt in AsyncRetrying(**retry_upon_init_policy):
34+
with attempt:
35+
celery_app = Celery(
36+
comp_settings.task_name,
37+
broker=comp_settings.broker_url,
38+
backend=comp_settings.result_backend,
39+
)
40+
if not celery_app:
41+
raise ValueError(
42+
"Expected celery client app instance, got {celery_app}"
43+
)
44+
45+
app[__APP_CLIENT_CELERY_CLIENT_KEY] = celery_app
46+
47+
yield
48+
49+
if celery_app is not app[__APP_CLIENT_CELERY_CLIENT_KEY]:
50+
log.critical("Invalid celery client in app")
51+
52+
celery_app.close()
53+
54+
55+
def setup(app: web.Application):
56+
app[__APP_CLIENT_CELERY_CLIENT_KEY] = None
57+
58+
app.cleanup_ctx.append(_celery_app)
59+
60+
61+
def get_celery_client(app: web.Application) -> Celery:
62+
return app[__APP_CLIENT_CELERY_CLIENT_KEY]

services/web/server/src/simcore_service_webserver/activity/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
- config-file schema
33
- prometheus endpoint information
44
"""
5-
from aiohttp.web import Application
5+
from typing import Dict
6+
67
import trafaret as T
8+
from aiohttp.web import Application
79
from models_library.basic_types import PortInt, VersionTag
810
from pydantic import BaseSettings
911
from servicelib.application_keys import APP_CONFIG_KEY
10-
from typing import Dict
1112

1213
CONFIG_SECTION_NAME = "activity"
1314

15+
1416
schema = T.Dict(
1517
{
1618
T.Key("enabled", default=True, optional=True): T.Bool(),

services/web/server/src/simcore_service_webserver/activity/handlers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from servicelib.request_keys import RQT_USERID_KEY
88
from yarl import URL
99

10-
from ..computation_api import get_celery
1110
from ..login.decorators import login_required
11+
from .celery_client import get_celery_client
1212
from .config import CONFIG_SECTION_NAME
1313

1414

@@ -19,7 +19,7 @@ async def query_prometheus(session, url, query):
1919

2020

2121
def celery_reserved(app):
22-
return get_celery(app).control.inspect().reserved()
22+
return get_celery_client(app).control.inspect().reserved()
2323

2424

2525
async def get_cpu_usage(session, url, user_id):
@@ -33,7 +33,8 @@ async def get_memory_usage(session, url, user_id):
3333

3434

3535
async def get_celery_reserved(app):
36-
return celery_reserved(app)
36+
# this can take a bit of time, blocks for a second. so it runs in an executor
37+
return await asyncio.get_event_loop().run_in_executor(None, celery_reserved, app)
3738

3839

3940
async def get_container_metric_for_labels(session, url, user_id):

services/web/server/src/simcore_service_webserver/activity/module_setup.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from servicelib.rest_routing import iter_path_operations, map_handlers_with_operations
66

77
from ..rest_config import APP_OPENAPI_SPECS_KEY
8-
from . import handlers
8+
from . import celery_client, handlers
99
from .config import assert_valid_config
1010

1111
logger = logging.getLogger(__name__)
@@ -19,11 +19,11 @@
1919
)
2020
def setup_activity(app: web.Application):
2121

22-
#----------------------------------------------
22+
# ----------------------------------------------
2323
# TODO: temporary, just to check compatibility between
2424
# trafaret and pydantic schemas
2525
assert_valid_config(app)
26-
#---------------------------------------------
26+
# ---------------------------------------------
2727

2828
# setup routes ------------
2929
specs = app[APP_OPENAPI_SPECS_KEY]
@@ -38,3 +38,5 @@ def include_path(tup_object):
3838
handlers_dict, filter(include_path, iter_path_operations(specs)), strict=True
3939
)
4040
app.router.add_routes(routes)
41+
42+
celery_client.setup(app)

services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects
2222
from sqlalchemy.sql import select
2323

24-
from .computation_api import convert_state_from_db
24+
from .computation_utils import convert_state_from_db
2525
from .projects import projects_api, projects_exceptions
2626
from .projects.projects_utils import project_get_depending_nodes
2727

services/web/server/src/simcore_service_webserver/computation_api.py renamed to services/web/server/src/simcore_service_webserver/computation_utils.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,15 @@
11
""" API of computation subsystem within this application
22
33
"""
4-
# pylint: disable=too-many-arguments
5-
import logging
64

7-
from aiohttp import web
8-
from celery import Celery
95
from models_library.projects_state import RunningState
106
from simcore_postgres_database.models.comp_pipeline import StateType
117

12-
from .computation_config import ComputationSettings
13-
from .computation_config import get_settings as get_computation_settings
14-
15-
log = logging.getLogger(__file__)
16-
17-
188
#
199
# API ------------------------------------------
2010
#
2111

2212

23-
def get_celery(app: web.Application) -> Celery:
24-
comp_settings: ComputationSettings = get_computation_settings(app)
25-
celery_app = Celery(
26-
comp_settings.task_name,
27-
broker=comp_settings.broker_url,
28-
backend=comp_settings.result_backend,
29-
)
30-
return celery_app
31-
32-
3313
DB_TO_RUNNING_STATE = {
3414
StateType.FAILED: RunningState.FAILED,
3515
StateType.PENDING: RunningState.PENDING,

services/web/server/tests/unit/isolated/test_activity.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,25 @@
99
import yaml
1010
from aiohttp import web
1111
from aiohttp.client_exceptions import ClientConnectionError
12-
12+
from celery import Celery
1313
from pytest_simcore.helpers.utils_assert import assert_status
1414
from pytest_simcore.helpers.utils_mock import future_with_result
1515
from servicelib.application import create_safe_application
1616
from simcore_service_webserver.activity import handlers, setup_activity
17+
from simcore_service_webserver.computation_config import ComputationSettings
1718
from simcore_service_webserver.rest import setup_rest
1819
from simcore_service_webserver.security import setup_security
1920
from simcore_service_webserver.session import setup_session
2021

2122

23+
@pytest.fixture
24+
def mocked_celery_client(celery_app: Celery, mocker):
25+
mocker.patch(
26+
"simcore_service_webserver.activity.celery_client._get_computation_settings",
27+
return_value=ComputationSettings.create_from_env(),
28+
)
29+
30+
2231
@pytest.fixture
2332
def mocked_login_required(mocker):
2433
mock = mocker.patch(
@@ -82,7 +91,9 @@ def app_config(fake_data_dir: Path, osparc_simcore_root_dir: Path):
8291

8392

8493
@pytest.fixture
85-
def client(loop, aiohttp_client, app_config, mock_orphaned_services):
94+
def client(
95+
mocked_celery_client, loop, aiohttp_client, app_config, mock_orphaned_services
96+
):
8697
app = create_safe_application(app_config)
8798

8899
setup_session(app)

services/web/server/tests/unit/with_dbs/04/test_computation_api.py renamed to services/web/server/tests/unit/with_dbs/04/test_computation_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pytest
77
from models_library.projects_state import RunningState
88
from simcore_postgres_database.models.comp_pipeline import StateType
9-
from simcore_service_webserver.computation_api import convert_state_from_db
9+
from simcore_service_webserver.computation_utils import convert_state_from_db
1010

1111

1212
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)