Skip to content

Commit 8d855c3

Browse files
add expiration
1 parent 054a553 commit 8d855c3

File tree

5 files changed

+35
-5
lines changed

5 files changed

+35
-5
lines changed

packages/settings-library/src/settings_library/celery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ class CelerySettings(BaseCustomSettings):
2222
description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
2323
),
2424
] = timedelta(days=7)
25+
CELERY_EPHEMERAL_RESULT_EXPIRES: Annotated[
26+
timedelta,
27+
Field(
28+
description="Time after which ephemeral task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
29+
),
30+
] = timedelta(hours=1)
2531
CELERY_RESULT_PERSISTENT: Annotated[
2632
bool,
2733
Field(

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ async def on_startup() -> None:
2929
)
3030

3131
app.state.celery_client = CeleryTaskQueueClient(
32-
celery_app, RedisTaskMetadataStore(redis_client_sdk)
32+
celery_app,
33+
celery_settings,
34+
RedisTaskMetadataStore(redis_client_sdk),
3335
)
3436

3537
register_celery_types()

services/storage/src/simcore_service_storage/modules/celery/backends/_redis.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
from datetime import timedelta
13
from typing import Final
24

35
from celery.result import AsyncResult
@@ -11,6 +13,8 @@
1113
_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":"
1214
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 10000
1315

16+
_logger = logging.getLogger(__name__)
17+
1418

1519
class RedisTaskMetadataStore:
1620
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
@@ -30,7 +34,11 @@ async def get(self, task_id: TaskID) -> TaskMetadata | None:
3034
return TaskMetadata.model_validate_json(result) if result else None
3135

3236
async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
33-
search_key = build_task_id_prefix(task_context) + _CELERY_TASK_ID_KEY_SEPARATOR
37+
search_key = (
38+
_CELERY_TASK_METADATA_PREFIX
39+
+ build_task_id_prefix(task_context)
40+
+ _CELERY_TASK_ID_KEY_SEPARATOR
41+
)
3442
keys = set()
3543
async for key in self._redis_client_sdk.redis.scan_iter(
3644
match=search_key + "*", count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
@@ -50,8 +58,11 @@ async def remove(self, task_id: TaskID) -> None:
5058
)
5159
AsyncResult(_CELERY_TASK_META_PREFIX + task_id).forget()
5260

53-
async def set(self, task_id: TaskID, task_data: TaskMetadata) -> None:
61+
async def set(
62+
self, task_id: TaskID, task_data: TaskMetadata, expiry: timedelta
63+
) -> None:
5464
await self._redis_client_sdk.redis.set(
5565
_CELERY_TASK_METADATA_PREFIX + task_id,
5666
task_data.model_dump_json(),
67+
ex=expiry,
5768
)

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from models_library.progress_bar import ProgressReport
1313
from pydantic import ValidationError
1414
from servicelib.logging_utils import log_context
15+
from settings_library.celery import CelerySettings
1516

1617
from .models import (
1718
TaskContext,
@@ -43,6 +44,7 @@
4344
@dataclass
4445
class CeleryTaskQueueClient:
4546
_celery_app: Celery
47+
_celery_settings: CelerySettings
4648
_task_store: TaskMetadataStore
4749

4850
async def send_task(
@@ -67,7 +69,13 @@ async def send_task(
6769
kwargs=task_params,
6870
queue=task_metadata.queue,
6971
)
70-
await self._task_store.set(task_id, task_metadata)
72+
73+
expiry = (
74+
self._celery_settings.CELERY_EPHEMERAL_RESULT_EXPIRES
75+
if task_metadata.ephemeral
76+
else self._celery_settings.CELERY_RESULT_EXPIRES
77+
)
78+
await self._task_store.set(task_id, task_metadata, expiry=expiry)
7179
return task_uuid
7280

7381
@staticmethod

services/storage/src/simcore_service_storage/modules/celery/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
from enum import StrEnum, auto
23
from typing import Any, Final, Protocol, TypeAlias
34
from uuid import UUID
@@ -49,7 +50,9 @@ async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]: ...
4950

5051
async def remove(self, task_id: TaskID) -> None: ...
5152

52-
async def set(self, task_id: TaskID, task_data: TaskMetadata) -> None: ...
53+
async def set(
54+
self, task_id: TaskID, task_data: TaskMetadata, expiry: timedelta
55+
) -> None: ...
5356

5457

5558
class TaskStatus(BaseModel):

0 commit comments

Comments
 (0)