Skip to content

Commit 9a49e85

Browse files
Merge branch 'is8110/update-api-key-uniqueness-constraint' of github.com:giancarloromeo/osparc-simcore into is8110/update-api-key-uniqueness-constraint
2 parents c30dc15 + 126d607 commit 9a49e85

File tree

50 files changed

+819
-402
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+819
-402
lines changed

packages/celery-library/src/celery_library/backends/_redis.py renamed to packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
11
import contextlib
22
import logging
33
from datetime import timedelta
4-
from typing import Final
4+
from typing import TYPE_CHECKING, Final
55

66
from models_library.progress_bar import ProgressReport
77
from pydantic import ValidationError
88
from servicelib.celery.models import (
99
Task,
1010
TaskFilter,
1111
TaskID,
12+
TaskInfoStore,
1213
TaskMetadata,
13-
TaskUUID,
14+
Wildcard,
1415
)
15-
from servicelib.redis import RedisClientSDK
16-
17-
from ..utils import build_task_id_prefix
16+
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
1817

1918
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
2019
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
21-
_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":"
2220
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
2321
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
2422
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"
@@ -41,18 +39,24 @@ async def create_task(
4139
expiry: timedelta,
4240
) -> None:
4341
task_key = _build_key(task_id)
44-
await self._redis_client_sdk.redis.hset(
45-
name=task_key,
46-
key=_CELERY_TASK_METADATA_KEY,
47-
value=task_metadata.model_dump_json(),
48-
) # type: ignore
42+
await handle_redis_returns_union_types(
43+
self._redis_client_sdk.redis.hset(
44+
name=task_key,
45+
key=_CELERY_TASK_METADATA_KEY,
46+
value=task_metadata.model_dump_json(),
47+
)
48+
)
4949
await self._redis_client_sdk.redis.expire(
5050
task_key,
5151
expiry,
5252
)
5353

5454
async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
55-
raw_result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_METADATA_KEY) # type: ignore
55+
raw_result = await handle_redis_returns_union_types(
56+
self._redis_client_sdk.redis.hget(
57+
_build_key(task_id), _CELERY_TASK_METADATA_KEY
58+
)
59+
)
5660
if not raw_result:
5761
return None
5862

@@ -65,7 +69,11 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
6569
return None
6670

6771
async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
68-
raw_result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_PROGRESS_KEY) # type: ignore
72+
raw_result = await handle_redis_returns_union_types(
73+
self._redis_client_sdk.redis.hget(
74+
_build_key(task_id), _CELERY_TASK_PROGRESS_KEY
75+
)
76+
)
6977
if not raw_result:
7078
return None
7179

@@ -78,17 +86,14 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
7886
return None
7987

8088
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
81-
search_key = (
82-
_CELERY_TASK_INFO_PREFIX
83-
+ build_task_id_prefix(task_filter)
84-
+ _CELERY_TASK_ID_KEY_SEPARATOR
89+
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
90+
task_uuid=Wildcard()
8591
)
86-
search_key_len = len(search_key)
8792

8893
keys: list[str] = []
8994
pipeline = self._redis_client_sdk.redis.pipeline()
9095
async for key in self._redis_client_sdk.redis.scan_iter(
91-
match=search_key + "*", count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
96+
match=search_key, count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
9297
):
9398
# fake redis (tests) returns bytes, real redis returns str
9499
_key = (
@@ -110,7 +115,7 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
110115
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
111116
tasks.append(
112117
Task(
113-
uuid=TaskUUID(key[search_key_len:]),
118+
uuid=TaskFilter.get_task_uuid(key),
114119
metadata=task_metadata,
115120
)
116121
)
@@ -121,13 +126,19 @@ async def remove_task(self, task_id: TaskID) -> None:
121126
await self._redis_client_sdk.redis.delete(_build_key(task_id))
122127

123128
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
124-
await self._redis_client_sdk.redis.hset(
125-
name=_build_key(task_id),
126-
key=_CELERY_TASK_PROGRESS_KEY,
127-
value=report.model_dump_json(),
128-
) # type: ignore
129+
await handle_redis_returns_union_types(
130+
self._redis_client_sdk.redis.hset(
131+
name=_build_key(task_id),
132+
key=_CELERY_TASK_PROGRESS_KEY,
133+
value=report.model_dump_json(),
134+
)
135+
)
129136

130137
async def task_exists(self, task_id: TaskID) -> bool:
131138
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
132139
assert isinstance(n, int) # nosec
133140
return n > 0
141+
142+
143+
if TYPE_CHECKING:
144+
_: type[TaskInfoStore] = RedisTaskInfoStore

packages/celery-library/src/celery_library/common.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22
from typing import Any
33

44
from celery import Celery # type: ignore[import-untyped]
5-
from servicelib.redis import RedisClientSDK
65
from settings_library.celery import CelerySettings
76
from settings_library.redis import RedisDatabase
87

9-
from .backends._redis import RedisTaskInfoStore
10-
from .task_manager import CeleryTaskManager
11-
128

139
def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
1410
base_config = {
@@ -36,22 +32,3 @@ def create_app(settings: CelerySettings) -> Celery:
3632
),
3733
**_celery_configure(settings),
3834
)
39-
40-
41-
async def create_task_manager(
42-
app: Celery, settings: CelerySettings
43-
) -> CeleryTaskManager:
44-
redis_client_sdk = RedisClientSDK(
45-
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
46-
RedisDatabase.CELERY_TASKS
47-
),
48-
client_name="celery_tasks",
49-
)
50-
await redis_client_sdk.setup()
51-
# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159
52-
53-
return CeleryTaskManager(
54-
app,
55-
settings,
56-
RedisTaskInfoStore(redis_client_sdk),
57-
)

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,8 @@ async def result(
134134

135135
@router.expose(reraise_if_error_type=(JobSchedulerError,))
136136
async def list_jobs(
137-
task_manager: TaskManager, filter_: str, job_filter: AsyncJobFilter
137+
task_manager: TaskManager, job_filter: AsyncJobFilter
138138
) -> list[AsyncJobGet]:
139-
_ = filter_
140139
assert task_manager # nosec
141140
task_filter = TaskFilter.model_validate(job_filter.model_dump())
142141
try:

packages/celery-library/src/celery_library/signals.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,15 @@
66
from celery.worker.worker import WorkController # type: ignore[import-untyped]
77
from servicelib.celery.app_server import BaseAppServer
88
from servicelib.logging_utils import log_context
9-
from settings_library.celery import CelerySettings
109

11-
from .common import create_task_manager
1210
from .utils import get_app_server, set_app_server
1311

1412
_logger = logging.getLogger(__name__)
1513

1614

1715
def on_worker_init(
18-
app_server: BaseAppServer,
19-
celery_settings: CelerySettings,
2016
sender: WorkController,
17+
app_server: BaseAppServer,
2118
**_kwargs,
2219
) -> None:
2320
startup_complete_event = threading.Event()
@@ -26,21 +23,14 @@ def _init(startup_complete_event: threading.Event) -> None:
2623
loop = asyncio.new_event_loop()
2724
asyncio.set_event_loop(loop)
2825

29-
async def _setup_task_manager():
30-
assert sender.app # nosec
31-
assert isinstance(sender.app, Celery) # nosec
32-
33-
app_server.task_manager = await create_task_manager(
34-
sender.app,
35-
celery_settings,
36-
)
26+
assert sender.app # nosec
27+
assert isinstance(sender.app, Celery) # nosec
3728

38-
set_app_server(sender.app, app_server)
29+
set_app_server(sender.app, app_server)
3930

4031
app_server.event_loop = loop
4132

42-
loop.run_until_complete(_setup_task_manager())
43-
loop.run_until_complete(app_server.lifespan(startup_complete_event))
33+
loop.run_until_complete(app_server.run_until_shutdown(startup_complete_event))
4434

4535
thread = threading.Thread(
4636
group=None,

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from settings_library.celery import CelerySettings
2323

2424
from .errors import TaskNotFoundError
25-
from .utils import build_task_id
2625

2726
_logger = logging.getLogger(__name__)
2827

@@ -50,7 +49,7 @@ async def submit_task(
5049
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
5150
):
5251
task_uuid = uuid4()
53-
task_id = build_task_id(task_filter, task_uuid)
52+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
5453
self._celery_app.send_task(
5554
task_metadata.name,
5655
task_id=task_id,
@@ -74,7 +73,7 @@ async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> Non
7473
logging.DEBUG,
7574
msg=f"task cancellation: {task_filter=} {task_uuid=}",
7675
):
77-
task_id = build_task_id(task_filter, task_uuid)
76+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
7877
if not await self.task_exists(task_id):
7978
raise TaskNotFoundError(task_id=task_id)
8079

@@ -96,7 +95,7 @@ async def get_task_result(
9695
logging.DEBUG,
9796
msg=f"Get task result: {task_filter=} {task_uuid=}",
9897
):
99-
task_id = build_task_id(task_filter, task_uuid)
98+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
10099
if not await self.task_exists(task_id):
101100
raise TaskNotFoundError(task_id=task_id)
102101

@@ -139,7 +138,7 @@ async def get_task_status(
139138
logging.DEBUG,
140139
msg=f"Getting task status: {task_filter=} {task_uuid=}",
141140
):
142-
task_id = build_task_id(task_filter, task_uuid)
141+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
143142
if not await self.task_exists(task_id):
144143
raise TaskNotFoundError(task_id=task_id)
145144

packages/celery-library/src/celery_library/utils.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,8 @@
1-
from typing import Final
2-
31
from celery import Celery # type: ignore[import-untyped]
42
from servicelib.celery.app_server import BaseAppServer
5-
from servicelib.celery.models import TaskFilter, TaskID, TaskUUID
63

74
_APP_SERVER_KEY = "app_server"
85

9-
_TASK_ID_KEY_DELIMITATOR: Final[str] = ":"
10-
11-
12-
def build_task_id_prefix(task_filter: TaskFilter) -> str:
13-
filter_dict = task_filter.model_dump()
14-
return _TASK_ID_KEY_DELIMITATOR.join(
15-
[f"{filter_dict[key]}" for key in sorted(filter_dict)]
16-
)
17-
18-
19-
def build_task_id(task_filter: TaskFilter, task_uuid: TaskUUID) -> TaskID:
20-
return _TASK_ID_KEY_DELIMITATOR.join(
21-
[build_task_id_prefix(task_filter), f"{task_uuid}"]
22-
)
23-
246

257
def get_app_server(app: Celery) -> BaseAppServer:
268
app_server = app.conf[_APP_SERVER_KEY]

0 commit comments

Comments
 (0)