Skip to content

Commit b585dbf

Browse files
GitHKAndrei Neagu
andauthored
♻️ TasksManager uses Redis for task data (ITISFoundation#8131)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent b952aff commit b585dbf

File tree

115 files changed

+1575
-1138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+1575
-1138
lines changed

api/specs/web-server/_long_running_tasks_legacy.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
name="list_tasks",
2727
description="Lists all long running tasks",
2828
)
29-
def list_tasks(): ...
29+
async def list_tasks(): ...
3030

3131

3232
@router.get(
@@ -35,7 +35,7 @@ def list_tasks(): ...
3535
name="get_task_status",
3636
description="Retrieves the status of a task",
3737
)
38-
def get_task_status(
38+
async def get_task_status(
3939
_path_params: Annotated[_PathParam, Depends()],
4040
): ...
4141

@@ -46,7 +46,7 @@ def get_task_status(
4646
description="Cancels and deletes a task",
4747
status_code=status.HTTP_204_NO_CONTENT,
4848
)
49-
def cancel_and_delete_task(
49+
async def cancel_and_delete_task(
5050
_path_params: Annotated[_PathParam, Depends()],
5151
): ...
5252

@@ -57,6 +57,6 @@ def cancel_and_delete_task(
5757
response_model=Any,
5858
description="Retrieves the result of a task",
5959
)
60-
def get_task_result(
60+
async def get_task_result(
6161
_path_params: Annotated[_PathParam, Depends()],
6262
): ...

packages/celery-library/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# testing
1212
coverage
1313
faker
14+
fakeredis[lua]
1415
httpx
1516
pint
1617
pytest

packages/celery-library/requirements/_test.txt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ docker==7.1.0
5656
# pytest-docker-tools
5757
faker==37.3.0
5858
# via -r requirements/_test.in
59+
fakeredis==2.30.3
60+
# via -r requirements/_test.in
5961
flexcache==0.3
6062
# via pint
6163
flexparser==0.4
@@ -83,6 +85,8 @@ kombu==5.5.3
8385
# -c requirements/_base.txt
8486
# celery
8587
# pytest-celery
88+
lupa==2.5
89+
# via fakeredis
8690
packaging==25.0
8791
# via
8892
# -c requirements/_base.txt
@@ -112,6 +116,10 @@ pygments==2.19.1
112116
# via
113117
# -c requirements/_base.txt
114118
# pytest
119+
pyjwt==2.9.0
120+
# via
121+
# -c requirements/_base.txt
122+
# redis
115123
pytest==8.4.1
116124
# via
117125
# -r requirements/_test.in
@@ -156,6 +164,11 @@ pyyaml==6.0.2
156164
# -c requirements/../../../requirements/constraints.txt
157165
# -c requirements/_base.txt
158166
# -r requirements/_test.in
167+
redis==5.3.0
168+
# via
169+
# -c requirements/../../../requirements/constraints.txt
170+
# -c requirements/_base.txt
171+
# fakeredis
159172
requests==2.32.4
160173
# via
161174
# -c requirements/_base.txt
@@ -170,6 +183,8 @@ sniffio==1.3.1
170183
# via
171184
# -c requirements/_base.txt
172185
# anyio
186+
sortedcontainers==2.4.0
187+
# via fakeredis
173188
tenacity==9.1.2
174189
# via
175190
# -c requirements/_base.txt

packages/celery-library/src/celery_library/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ async def create_task_manager(
4747
),
4848
client_name="celery_tasks",
4949
)
50+
await redis_client_sdk.setup()
51+
# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159
5052

5153
return CeleryTaskManager(
5254
app,

packages/models-library/src/models_library/api_schemas_long_running_tasks/base.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from collections.abc import Awaitable, Callable
23
from typing import Annotated, TypeAlias
34

45
from pydantic import BaseModel, Field, field_validator, validate_call
@@ -22,8 +23,16 @@ class TaskProgress(BaseModel):
2223
message: ProgressMessage = ""
2324
percent: ProgressPercent = 0.0
2425

26+
# used to propagate progress updates internally
27+
_update_callback: Callable[["TaskProgress"], Awaitable[None]] | None = None
28+
29+
def set_update_callback(
30+
self, callback: Callable[["TaskProgress"], Awaitable[None]]
31+
) -> None:
32+
self._update_callback = callback
33+
2534
@validate_call
26-
def update(
35+
async def update(
2736
self,
2837
*,
2938
message: ProgressMessage | None = None,
@@ -40,6 +49,16 @@ def update(
4049

4150
_logger.debug("Progress update: %s", f"{self}")
4251

52+
if self._update_callback is not None:
53+
try:
54+
await self._update_callback(self)
55+
except Exception as exc: # pylint: disable=broad-exception-caught
56+
_logger.warning(
57+
"Error while calling progress update callback: %s",
58+
exc,
59+
stack_info=True,
60+
)
61+
4362
@classmethod
4463
def create(cls, task_id: TaskId | None = None) -> "TaskProgress":
4564
return cls(task_id=task_id)

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import pytest
1010
import tenacity
11+
from fakeredis import FakeAsyncRedis
1112
from pytest_mock import MockerFixture
1213
from redis.asyncio import Redis, from_url
1314
from settings_library.basic_types import PortInt
@@ -121,3 +122,9 @@ def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
121122
mocker.patch(
122123
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
123124
)
125+
126+
127+
@pytest.fixture
128+
async def use_in_memory_redis(mocker: MockerFixture) -> RedisSettings:
129+
mocker.patch("redis.asyncio.from_url", FakeAsyncRedis)
130+
return RedisSettings()

packages/service-library/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ botocore
1717
coverage
1818
docker
1919
faker
20+
fakeredis[lua]
2021
flaky
2122
numpy
2223
openapi-spec-validator

packages/service-library/requirements/_test.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ execnet==2.1.1
5353
# via pytest-xdist
5454
faker==36.1.1
5555
# via -r requirements/_test.in
56+
fakeredis==2.30.3
57+
# via -r requirements/_test.in
5658
flaky==3.8.1
5759
# via -r requirements/_test.in
5860
frozenlist==1.5.0
@@ -109,6 +111,8 @@ jsonschema-specifications==2024.10.1
109111
# openapi-schema-validator
110112
lazy-object-proxy==1.10.0
111113
# via openapi-spec-validator
114+
lupa==2.5
115+
# via fakeredis
112116
multidict==6.1.0
113117
# via
114118
# -c requirements/_aiohttp.txt
@@ -211,6 +215,11 @@ pyyaml==6.0.2
211215
# -c requirements/_base.txt
212216
# -c requirements/_fastapi.txt
213217
# jsonschema-path
218+
redis==5.2.1
219+
# via
220+
# -c requirements/../../../requirements/constraints.txt
221+
# -c requirements/_base.txt
222+
# fakeredis
214223
referencing==0.35.1
215224
# via
216225
# -c requirements/../../../requirements/constraints.txt
@@ -245,6 +254,8 @@ sniffio==1.3.1
245254
# -c requirements/_fastapi.txt
246255
# anyio
247256
# asgi-lifespan
257+
sortedcontainers==2.4.0
258+
# via fakeredis
248259
sqlalchemy==1.4.54
249260
# via
250261
# -c requirements/../../../requirements/constraints.txt

packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from typing import Final
99

1010
from aiohttp import web
11-
from servicelib.logging_utils import log_context
1211
from settings_library.postgres import PostgresSettings
1312
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
1413
get_pg_engine_stateinfo,

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import datetime
22

33
from aiohttp import web
4+
from settings_library.redis import RedisSettings
45

56
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
6-
from ...long_running_tasks.task import TaskContext, TasksManager
7+
from ...long_running_tasks.models import TaskContext
8+
from ...long_running_tasks.task import RedisNamespace, TasksManager
79
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
810
from ._request import get_task_context
911

@@ -14,11 +16,15 @@ def __init__(
1416
app: web.Application,
1517
stale_task_check_interval: datetime.timedelta,
1618
stale_task_detect_timeout: datetime.timedelta,
19+
redis_settings: RedisSettings,
20+
redis_namespace: RedisNamespace,
1721
):
1822
self._app = app
1923
self._tasks_manager = TasksManager(
2024
stale_task_check_interval=stale_task_check_interval,
2125
stale_task_detect_timeout=stale_task_detect_timeout,
26+
redis_settings=redis_settings,
27+
redis_namespace=redis_namespace,
2228
)
2329

2430
@property

0 commit comments

Comments
 (0)