Skip to content

Commit 71b3a21

Browse files
change task key prefix
1 parent 19b1ed8 commit 71b3a21

File tree

5 files changed

+21
-21
lines changed

5 files changed

+21
-21
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
)
2222
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
2323

24-
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
24+
_CELERY_TASK_PREFIX: Final[str] = "celery-task-"
2525
_CELERY_TASK_STREAM_PREFIX: Final[str] = "celery-task-stream-"
2626
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
2727
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
@@ -38,16 +38,16 @@
3838
_logger = logging.getLogger(__name__)
3939

4040

41-
def _build_info_key(task_id: TaskID) -> str:
42-
return _CELERY_TASK_INFO_PREFIX + task_id
41+
def _build_key(task_id: TaskID) -> str:
42+
return _CELERY_TASK_PREFIX + task_id
4343

4444

4545
def _build_stream_key(task_id: TaskID) -> str:
4646
return _CELERY_TASK_STREAM_PREFIX + task_id
4747

4848

4949
@dataclass
50-
class RedisTaskInfoStore:
50+
class RedisTaskStore:
5151
_redis_client_sdk: RedisClientSDK
5252

5353
async def create_task(
@@ -56,7 +56,7 @@ async def create_task(
5656
execution_metadata: ExecutionMetadata,
5757
expiry: timedelta,
5858
) -> None:
59-
task_key = _build_info_key(task_id)
59+
task_key = _build_key(task_id)
6060
await handle_redis_returns_union_types(
6161
self._redis_client_sdk.redis.hset(
6262
name=task_key,
@@ -88,7 +88,7 @@ async def create_task(
8888
async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
8989
raw_result = await handle_redis_returns_union_types(
9090
self._redis_client_sdk.redis.hget(
91-
_build_info_key(task_id), _CELERY_TASK_METADATA_KEY
91+
_build_key(task_id), _CELERY_TASK_METADATA_KEY
9292
)
9393
)
9494
if not raw_result:
@@ -105,7 +105,7 @@ async def get_task_metadata(self, task_id: TaskID) -> ExecutionMetadata | None:
105105
async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
106106
raw_result = await handle_redis_returns_union_types(
107107
self._redis_client_sdk.redis.hget(
108-
_build_info_key(task_id), _CELERY_TASK_PROGRESS_KEY
108+
_build_key(task_id), _CELERY_TASK_PROGRESS_KEY
109109
)
110110
)
111111
if not raw_result:
@@ -120,7 +120,7 @@ async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None:
120120
return None
121121

122122
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
123-
search_key = _CELERY_TASK_INFO_PREFIX + owner_metadata.model_dump_task_id(
123+
search_key = _CELERY_TASK_PREFIX + owner_metadata.model_dump_task_id(
124124
task_uuid=WILDCARD
125125
)
126126

@@ -157,20 +157,20 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
157157
return tasks
158158

159159
async def remove_task(self, task_id: TaskID) -> None:
160-
await self._redis_client_sdk.redis.delete(_build_info_key(task_id))
160+
await self._redis_client_sdk.redis.delete(_build_key(task_id))
161161
await self._redis_client_sdk.redis.delete(_build_stream_key(task_id))
162162

163163
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
164164
await handle_redis_returns_union_types(
165165
self._redis_client_sdk.redis.hset(
166-
name=_build_info_key(task_id),
166+
name=_build_key(task_id),
167167
key=_CELERY_TASK_PROGRESS_KEY,
168168
value=report.model_dump_json(),
169169
)
170170
)
171171

172172
async def task_exists(self, task_id: TaskID) -> bool:
173-
n = await self._redis_client_sdk.redis.exists(_build_info_key(task_id))
173+
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
174174
assert isinstance(n, int) # nosec
175175
return n > 0
176176

@@ -209,4 +209,4 @@ async def consume_task_events(
209209

210210

211211
if TYPE_CHECKING:
212-
_: type[TaskInfoStore] = RedisTaskInfoStore
212+
_: type[TaskInfoStore] = RedisTaskStore

packages/celery-library/tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from celery.signals import worker_init, worker_shutdown
1717
from celery.worker.worker import WorkController
18-
from celery_library.backends.redis import RedisTaskInfoStore
18+
from celery_library.backends.redis import RedisTaskStore
1919
from celery_library.signals import on_worker_init, on_worker_shutdown
2020
from celery_library.task_manager import CeleryTaskManager
2121
from celery_library.types import register_celery_types
@@ -66,7 +66,7 @@ async def run_until_shutdown(
6666
self._task_manager = CeleryTaskManager(
6767
self._app,
6868
self._settings,
69-
RedisTaskInfoStore(redis_client_sdk),
69+
RedisTaskStore(redis_client_sdk),
7070
)
7171

7272
startup_completed_event.set()
@@ -173,7 +173,7 @@ async def celery_task_manager(
173173
yield CeleryTaskManager(
174174
mock_celery_app,
175175
celery_settings,
176-
RedisTaskInfoStore(redis_client_sdk),
176+
RedisTaskStore(redis_client_sdk),
177177
)
178178
finally:
179179
await redis_client_sdk.shutdown()

services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22

3-
from celery_library.backends.redis import RedisTaskInfoStore
3+
from celery_library.backends.redis import RedisTaskStore
44
from celery_library.common import create_app
55
from celery_library.task_manager import CeleryTaskManager
66
from celery_library.types import register_celery_types, register_pydantic_types
@@ -30,7 +30,7 @@ async def on_startup() -> None:
3030
app.state.task_manager = CeleryTaskManager(
3131
create_app(settings),
3232
settings,
33-
RedisTaskInfoStore(redis_client_sdk),
33+
RedisTaskStore(redis_client_sdk),
3434
)
3535

3636
register_celery_types()

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22

3-
from celery_library.backends.redis import RedisTaskInfoStore
3+
from celery_library.backends.redis import RedisTaskStore
44
from celery_library.common import create_app
55
from celery_library.task_manager import CeleryTaskManager
66
from celery_library.types import register_celery_types, register_pydantic_types
@@ -34,7 +34,7 @@ async def on_startup() -> None:
3434
app.state.task_manager = CeleryTaskManager(
3535
create_app(settings),
3636
settings,
37-
RedisTaskInfoStore(redis_client_sdk),
37+
RedisTaskStore(redis_client_sdk),
3838
)
3939

4040
register_celery_types()

services/web/server/src/simcore_service_webserver/celery/_task_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Final
33

44
from aiohttp import web
5-
from celery_library.backends.redis import RedisTaskInfoStore
5+
from celery_library.backends.redis import RedisTaskStore
66
from celery_library.common import create_app
77
from celery_library.task_manager import CeleryTaskManager
88
from celery_library.types import register_celery_types
@@ -30,7 +30,7 @@ async def setup_task_manager(app: web.Application):
3030
app[_APP_CELERY_TASK_MANAGER_KEY] = CeleryTaskManager(
3131
celery_app,
3232
celery_settings,
33-
RedisTaskInfoStore(redis_client_sdk),
33+
RedisTaskStore(redis_client_sdk),
3434
)
3535
register_celery_types()
3636

0 commit comments

Comments
 (0)