Skip to content

Commit 1f626a2

Browse files
fix: worker shutdown
1 parent 70bf868 commit 1f626a2

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

packages/celery-library/tests/conftest.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
# pylint: disable=unused-argument
33

44
import datetime
5+
import logging
56
import threading
7+
from asyncio import wait_for
68
from collections.abc import AsyncIterator, Callable
79
from functools import partial
810
from typing import Any
@@ -20,6 +22,7 @@
2022
from pytest_simcore.helpers.typing_env import EnvVarsDict
2123
from servicelib.celery.app_server import BaseAppServer
2224
from servicelib.celery.task_manager import TaskManager
25+
from servicelib.logging_utils import log_catch
2326
from servicelib.redis import RedisClientSDK
2427
from settings_library.celery import CelerySettings
2528
from settings_library.redis import RedisDatabase, RedisSettings
@@ -35,6 +38,9 @@
3538
]
3639

3740

41+
_logger = logging.getLogger(__name__)
42+
43+
3844
class FakeAppServer(BaseAppServer):
3945
def __init__(self, app: Celery, settings: CelerySettings):
4046
super().__init__(app)
@@ -64,7 +70,8 @@ async def start_and_hold(self, startup_completed_event: threading.Event) -> None
6470
startup_completed_event.set()
6571
await self.shutdown_event.wait() # wait for shutdown
6672

67-
await redis_client_sdk.shutdown()
73+
with log_catch(_logger, reraise=False):
74+
await wait_for(redis_client_sdk.shutdown(), timeout=5.0)
6875

6976

7077
@pytest.fixture
@@ -79,7 +86,6 @@ def _(celery_app: Celery) -> None: ...
7986
@pytest.fixture
8087
def app_environment(
8188
monkeypatch: pytest.MonkeyPatch,
82-
use_in_memory_redis: RedisSettings,
8389
env_devel_dict: EnvVarsDict,
8490
) -> EnvVarsDict:
8591
return setenvs_from_dict(
@@ -98,8 +104,10 @@ def celery_settings(
98104

99105

100106
@pytest.fixture
101-
def app_server(celery_app: Celery, celery_settings: CelerySettings) -> BaseAppServer:
102-
return FakeAppServer(app=celery_app, settings=celery_settings)
107+
def app_server(
108+
celery_session_app: Celery, celery_settings: CelerySettings
109+
) -> BaseAppServer:
110+
return FakeAppServer(app=celery_session_app, settings=celery_settings)
103111

104112

105113
@pytest.fixture(scope="session")
@@ -133,20 +141,16 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
133141

134142
with start_worker(
135143
celery_app,
136-
pool="threads",
137-
concurrency=1,
138144
loglevel="info",
139145
perform_ping_check=False,
140146
queues="default",
147+
shutdown_timeout=10.0,
141148
) as worker:
142-
# Ensure worker is fully up before test continues
143-
worker.ensure_started()
144-
145149
try:
146150
yield worker
147151
finally:
148-
worker_init.disconnect(_on_worker_init_wrapper)
149152
on_worker_shutdown(worker)
153+
worker_init.disconnect(_on_worker_init_wrapper)
150154

151155

152156
@pytest.fixture
@@ -171,4 +175,5 @@ async def celery_task_manager(
171175
RedisTaskInfoStore(redis_client_sdk),
172176
)
173177
finally:
174-
await redis_client_sdk.shutdown()
178+
with log_catch(_logger, reraise=False):
179+
await wait_for(redis_client_sdk.shutdown(), timeout=5.0)

0 commit comments

Comments
 (0)