Skip to content

Commit 59a41cd

Browse files
use Redis hash
1 parent fe327f4 commit 59a41cd

File tree

4 files changed

+24
-19
lines changed

4 files changed

+24
-19
lines changed

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from ...core.settings import get_application_settings
1010
from ._celery_types import register_celery_types
1111
from ._common import create_app
12-
from .backends._redis import RedisTaskMetadataStore
12+
from .backends._redis import RedisTaskInfoStore
1313
from .client import CeleryTaskQueueClient
1414

1515
_logger = logging.getLogger(__name__)
@@ -31,7 +31,7 @@ async def on_startup() -> None:
3131
app.state.celery_client = CeleryTaskQueueClient(
3232
celery_app,
3333
celery_settings,
34-
RedisTaskMetadataStore(redis_client_sdk),
34+
RedisTaskInfoStore(redis_client_sdk),
3535
)
3636

3737
register_celery_types()

services/storage/src/simcore_service_storage/modules/celery/backends/_redis.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,20 @@
77

88
from ..models import TaskContext, TaskID, TaskMetadata, TaskUUID, build_task_id_prefix
99

10-
_CELERY_TASK_METADATA_PREFIX: Final[str] = "celery-task-metadata-"
10+
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
1111
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
1212
_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":"
1313
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 10000
14+
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
1415

1516
_logger = logging.getLogger(__name__)
1617

1718

1819
def _build_key(task_id: TaskID) -> str:
19-
return _CELERY_TASK_METADATA_PREFIX + task_id
20+
return _CELERY_TASK_INFO_PREFIX + task_id
2021

2122

22-
class RedisTaskMetadataStore:
23+
class RedisTaskInfoStore:
2324
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
2425
self._redis_client_sdk = redis_client_sdk
2526

@@ -28,13 +29,13 @@ async def exists(self, task_id: TaskID) -> bool:
2829
assert isinstance(n, int) # nosec
2930
return n > 0
3031

31-
async def get(self, task_id: TaskID) -> TaskMetadata | None:
32-
result = await self._redis_client_sdk.redis.get(_build_key(task_id))
32+
async def get_metadata(self, task_id: TaskID) -> TaskMetadata | None:
33+
result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_METADATA_KEY) # type: ignore
3334
return TaskMetadata.model_validate_json(result) if result else None
3435

3536
async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
3637
search_key = (
37-
_CELERY_TASK_METADATA_PREFIX
38+
_CELERY_TASK_INFO_PREFIX
3839
+ build_task_id_prefix(task_context)
3940
+ _CELERY_TASK_ID_KEY_SEPARATOR
4041
)
@@ -55,11 +56,15 @@ async def remove(self, task_id: TaskID) -> None:
5556
await self._redis_client_sdk.redis.delete(_build_key(task_id))
5657
AsyncResult(task_id).forget()
5758

58-
async def set(
59+
async def set_metadata(
5960
self, task_id: TaskID, task_metadata: TaskMetadata, expiry: timedelta
6061
) -> None:
61-
await self._redis_client_sdk.redis.set(
62+
await self._redis_client_sdk.redis.hset(
63+
name=_build_key(task_id),
64+
key=_CELERY_TASK_METADATA_KEY,
65+
value=task_metadata.model_dump_json(),
66+
) # type: ignore
67+
await self._redis_client_sdk.redis.expire(
6268
_build_key(task_id),
63-
task_metadata.model_dump_json(),
64-
ex=expiry,
69+
expiry,
6570
)

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from .models import (
1818
TaskContext,
1919
TaskID,
20+
TaskInfoStore,
2021
TaskMetadata,
21-
TaskMetadataStore,
2222
TaskState,
2323
TaskStatus,
2424
TaskUUID,
@@ -46,7 +46,7 @@
4646
class CeleryTaskQueueClient:
4747
_celery_app: Celery
4848
_celery_settings: CelerySettings
49-
_task_store: TaskMetadataStore
49+
_task_store: TaskInfoStore
5050

5151
async def send_task(
5252
self,
@@ -76,7 +76,7 @@ async def send_task(
7676
if task_metadata.ephemeral
7777
else self._celery_settings.CELERY_RESULT_EXPIRES
7878
)
79-
await self._task_store.set(task_id, task_metadata, expiry=expiry)
79+
await self._task_store.set_metadata(task_id, task_metadata, expiry=expiry)
8080
return task_uuid
8181

8282
@make_async()
@@ -109,7 +109,7 @@ async def get_task_result(
109109
async_result = self._celery_app.AsyncResult(task_id)
110110
result = async_result.result
111111
if async_result.ready():
112-
task_metadata = await self._task_store.get(task_id)
112+
task_metadata = await self._task_store.get_metadata(task_id)
113113
if task_metadata is not None and task_metadata.ephemeral:
114114
await self._task_store.remove(task_id)
115115
return result

services/storage/src/simcore_service_storage/modules/celery/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ class TaskMetadata(BaseModel):
4646
_TASK_DONE = {TaskState.SUCCESS, TaskState.ERROR, TaskState.ABORTED}
4747

4848

49-
class TaskMetadataStore(Protocol):
49+
class TaskInfoStore(Protocol):
5050
async def exists(self, task_id: TaskID) -> bool: ...
5151

52-
async def get(self, task_id: TaskID) -> TaskMetadata | None: ...
52+
async def get_metadata(self, task_id: TaskID) -> TaskMetadata | None: ...
5353

5454
async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]: ...
5555

5656
async def remove(self, task_id: TaskID) -> None: ...
5757

58-
async def set(
58+
async def set_metadata(
5959
self, task_id: TaskID, task_data: TaskMetadata, expiry: timedelta
6060
) -> None: ...
6161

0 commit comments

Comments
 (0)