Skip to content

Commit d364c74

Browse files
Merge remote-tracking branch 'upstream/master' into is8102/add-search-api-in-storage
2 parents 09cb3dd + 87820ae commit d364c74

File tree

66 files changed

+1191
-464
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1191
-464
lines changed

packages/aws-library/src/aws_library/s3/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
)
2323

2424
__all__: tuple[str, ...] = (
25-
"CopiedBytesTransferredCallback",
26-
"MultiPartUploadLinks",
2725
"PRESIGNED_LINK_MAX_SIZE",
2826
"S3_MAX_FILE_SIZE",
27+
"CopiedBytesTransferredCallback",
28+
"MultiPartUploadLinks",
2929
"S3AccessError",
3030
"S3BucketInvalidError",
3131
"S3DestinationNotEmptyError",
@@ -37,8 +37,8 @@
3737
"S3RuntimeError",
3838
"S3UploadNotFoundError",
3939
"SimcoreS3API",
40-
"UploadedBytesTransferredCallback",
4140
"UploadID",
41+
"UploadedBytesTransferredCallback",
4242
)
4343

4444
# nopycln: file

packages/celery-library/src/celery_library/backends/_redis.py renamed to packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,22 @@
33
import logging
44
from collections.abc import AsyncIterable
55
from datetime import timedelta
6-
from typing import Final
6+
from typing import TYPE_CHECKING, Final
77

88
from models_library.progress_bar import ProgressReport
99
from pydantic import ValidationError
1010
from servicelib.celery.models import (
1111
Task,
1212
TaskFilter,
1313
TaskID,
14+
TaskInfoStore,
1415
TaskMetadata,
15-
TaskUUID,
16+
Wildcard,
1617
)
17-
from servicelib.redis import RedisClientSDK
18-
19-
from ..utils import build_task_id_prefix
18+
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
2019

2120
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
2221
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
23-
_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":"
2422
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
2523
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
2624
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"
@@ -43,18 +41,24 @@ async def create_task(
4341
expiry: timedelta,
4442
) -> None:
4543
task_key = _build_key(task_id)
46-
await self._redis_client_sdk.redis.hset(
47-
name=task_key,
48-
key=_CELERY_TASK_METADATA_KEY,
49-
value=task_metadata.model_dump_json(),
50-
) # type: ignore
44+
await handle_redis_returns_union_types(
45+
self._redis_client_sdk.redis.hset(
46+
name=task_key,
47+
key=_CELERY_TASK_METADATA_KEY,
48+
value=task_metadata.model_dump_json(),
49+
)
50+
)
5151
await self._redis_client_sdk.redis.expire(
5252
task_key,
5353
expiry,
5454
)
5555

5656
async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
57-
raw_result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_METADATA_KEY) # type: ignore
57+
raw_result = await handle_redis_returns_union_types(
58+
self._redis_client_sdk.redis.hget(
59+
_build_key(task_id), _CELERY_TASK_METADATA_KEY
60+
)
61+
)
5862
if not raw_result:
5963
return None
6064

@@ -67,7 +71,11 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
6771
return None
6872

6973
async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
70-
raw_result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_PROGRESS_KEY) # type: ignore
74+
raw_result = await handle_redis_returns_union_types(
75+
self._redis_client_sdk.redis.hget(
76+
_build_key(task_id), _CELERY_TASK_PROGRESS_KEY
77+
)
78+
)
7179
if not raw_result:
7280
return None
7381

@@ -80,17 +88,14 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
8088
return None
8189

8290
async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
83-
search_key = (
84-
_CELERY_TASK_INFO_PREFIX
85-
+ build_task_id_prefix(task_filter)
86-
+ _CELERY_TASK_ID_KEY_SEPARATOR
91+
search_key = _CELERY_TASK_INFO_PREFIX + task_filter.create_task_id(
92+
task_uuid=Wildcard()
8793
)
88-
search_key_len = len(search_key)
8994

9095
keys: list[str] = []
9196
pipeline = self._redis_client_sdk.redis.pipeline()
9297
async for key in self._redis_client_sdk.redis.scan_iter(
93-
match=search_key + "*", count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
98+
match=search_key, count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
9499
):
95100
# fake redis (tests) returns bytes, real redis returns str
96101
_key = (
@@ -112,7 +117,7 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
112117
task_metadata = TaskMetadata.model_validate_json(raw_metadata)
113118
tasks.append(
114119
Task(
115-
uuid=TaskUUID(key[search_key_len:]),
120+
uuid=TaskFilter.get_task_uuid(key),
116121
metadata=task_metadata,
117122
)
118123
)
@@ -153,13 +158,19 @@ async def stream_task_result(
153158
return
154159

155160
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
156-
await self._redis_client_sdk.redis.hset(
157-
name=_build_key(task_id),
158-
key=_CELERY_TASK_PROGRESS_KEY,
159-
value=report.model_dump_json(),
160-
) # type: ignore
161+
await handle_redis_returns_union_types(
162+
self._redis_client_sdk.redis.hset(
163+
name=_build_key(task_id),
164+
key=_CELERY_TASK_PROGRESS_KEY,
165+
value=report.model_dump_json(),
166+
)
167+
)
161168

162169
async def task_exists(self, task_id: TaskID) -> bool:
163170
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
164171
assert isinstance(n, int) # nosec
165172
return n > 0
173+
174+
175+
if TYPE_CHECKING:
176+
_: type[TaskInfoStore] = RedisTaskInfoStore

packages/celery-library/src/celery_library/common.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22
from typing import Any
33

44
from celery import Celery # type: ignore[import-untyped]
5-
from servicelib.redis import RedisClientSDK
65
from settings_library.celery import CelerySettings
76
from settings_library.redis import RedisDatabase
87

9-
from .backends._redis import RedisTaskInfoStore
10-
from .task_manager import CeleryTaskManager
11-
128

139
def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
1410
base_config = {
@@ -36,22 +32,3 @@ def create_app(settings: CelerySettings) -> Celery:
3632
),
3733
**_celery_configure(settings),
3834
)
39-
40-
41-
async def create_task_manager(
42-
app: Celery, settings: CelerySettings
43-
) -> CeleryTaskManager:
44-
redis_client_sdk = RedisClientSDK(
45-
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
46-
RedisDatabase.CELERY_TASKS
47-
),
48-
client_name="celery_tasks",
49-
)
50-
await redis_client_sdk.setup()
51-
# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159
52-
53-
return CeleryTaskManager(
54-
app,
55-
settings,
56-
RedisTaskInfoStore(redis_client_sdk),
57-
)

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,8 @@ async def result(
134134

135135
@router.expose(reraise_if_error_type=(JobSchedulerError,))
136136
async def list_jobs(
137-
task_manager: TaskManager, filter_: str, job_filter: AsyncJobFilter
137+
task_manager: TaskManager, job_filter: AsyncJobFilter
138138
) -> list[AsyncJobGet]:
139-
_ = filter_
140139
assert task_manager # nosec
141140
task_filter = TaskFilter.model_validate(job_filter.model_dump())
142141
try:

packages/celery-library/src/celery_library/signals.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,15 @@
66
from celery.worker.worker import WorkController # type: ignore[import-untyped]
77
from servicelib.celery.app_server import BaseAppServer
88
from servicelib.logging_utils import log_context
9-
from settings_library.celery import CelerySettings
109

11-
from .common import create_task_manager
1210
from .utils import get_app_server, set_app_server
1311

1412
_logger = logging.getLogger(__name__)
1513

1614

1715
def on_worker_init(
18-
app_server: BaseAppServer,
19-
celery_settings: CelerySettings,
2016
sender: WorkController,
17+
app_server: BaseAppServer,
2118
**_kwargs,
2219
) -> None:
2320
startup_complete_event = threading.Event()
@@ -26,21 +23,14 @@ def _init(startup_complete_event: threading.Event) -> None:
2623
loop = asyncio.new_event_loop()
2724
asyncio.set_event_loop(loop)
2825

29-
async def _setup_task_manager():
30-
assert sender.app # nosec
31-
assert isinstance(sender.app, Celery) # nosec
32-
33-
app_server.task_manager = await create_task_manager(
34-
sender.app,
35-
celery_settings,
36-
)
26+
assert sender.app # nosec
27+
assert isinstance(sender.app, Celery) # nosec
3728

38-
set_app_server(sender.app, app_server)
29+
set_app_server(sender.app, app_server)
3930

4031
app_server.event_loop = loop
4132

42-
loop.run_until_complete(_setup_task_manager())
43-
loop.run_until_complete(app_server.lifespan(startup_complete_event))
33+
loop.run_until_complete(app_server.run_until_shutdown(startup_complete_event))
4434

4535
thread = threading.Thread(
4636
group=None,

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from settings_library.celery import CelerySettings
2323

2424
from .errors import TaskNotFoundError
25-
from .utils import build_task_id
2625

2726
_logger = logging.getLogger(__name__)
2827

@@ -50,7 +49,7 @@ async def submit_task(
5049
msg=f"Submit {task_metadata.name=}: {task_filter=} {task_params=}",
5150
):
5251
task_uuid = uuid4()
53-
task_id = build_task_id(task_filter, task_uuid)
52+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
5453
self._celery_app.send_task(
5554
task_metadata.name,
5655
task_id=task_id,
@@ -74,7 +73,7 @@ async def cancel_task(self, task_filter: TaskFilter, task_uuid: TaskUUID) -> Non
7473
logging.DEBUG,
7574
msg=f"task cancellation: {task_filter=} {task_uuid=}",
7675
):
77-
task_id = build_task_id(task_filter, task_uuid)
76+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
7877
if not await self.task_exists(task_id):
7978
raise TaskNotFoundError(task_id=task_id)
8079

@@ -96,7 +95,7 @@ async def get_task_result(
9695
logging.DEBUG,
9796
msg=f"Get task result: {task_filter=} {task_uuid=}",
9897
):
99-
task_id = build_task_id(task_filter, task_uuid)
98+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
10099
if not await self.task_exists(task_id):
101100
raise TaskNotFoundError(task_id=task_id)
102101

@@ -139,7 +138,7 @@ async def get_task_status(
139138
logging.DEBUG,
140139
msg=f"Getting task status: {task_filter=} {task_uuid=}",
141140
):
142-
task_id = build_task_id(task_filter, task_uuid)
141+
task_id = task_filter.create_task_id(task_uuid=task_uuid)
143142
if not await self.task_exists(task_id):
144143
raise TaskNotFoundError(task_id=task_id)
145144

packages/celery-library/src/celery_library/utils.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,8 @@
1-
from typing import Final
2-
31
from celery import Celery # type: ignore[import-untyped]
42
from servicelib.celery.app_server import BaseAppServer
5-
from servicelib.celery.models import TaskFilter, TaskID, TaskUUID
63

74
_APP_SERVER_KEY = "app_server"
85

9-
_TASK_ID_KEY_DELIMITATOR: Final[str] = ":"
10-
11-
12-
def build_task_id_prefix(task_filter: TaskFilter) -> str:
13-
filter_dict = task_filter.model_dump()
14-
return _TASK_ID_KEY_DELIMITATOR.join(
15-
[f"{filter_dict[key]}" for key in sorted(filter_dict)]
16-
)
17-
18-
19-
def build_task_id(task_filter: TaskFilter, task_uuid: TaskUUID) -> TaskID:
20-
return _TASK_ID_KEY_DELIMITATOR.join(
21-
[build_task_id_prefix(task_filter), f"{task_uuid}"]
22-
)
23-
246

257
def get_app_server(app: Celery) -> BaseAppServer:
268
app_server = app.conf[_APP_SERVER_KEY]

0 commit comments

Comments
 (0)