1616_logger = logging .getLogger (__name__ )
1717
1818
19+ def _build_key (task_id : TaskID ) -> str :
20+ return _CELERY_TASK_META_PREFIX + task_id
21+
22+
1923class RedisTaskMetadataStore :
2024 def __init__ (self , redis_client_sdk : RedisClientSDK ) -> None :
2125 self ._redis_client_sdk = redis_client_sdk
2226
2327 async def exists (self , task_id : TaskID ) -> bool :
24- n = await self ._redis_client_sdk .redis .exists (
25- _CELERY_TASK_METADATA_PREFIX + task_id
26- )
28+ n = await self ._redis_client_sdk .redis .exists (_build_key (task_id ))
2729 assert isinstance (n , int ) # nosec
2830 return n > 0
2931
3032 async def get (self , task_id : TaskID ) -> TaskMetadata | None :
31- result = await self ._redis_client_sdk .redis .get (
32- _CELERY_TASK_METADATA_PREFIX + task_id
33- )
33+ result = await self ._redis_client_sdk .redis .get (_build_key (task_id ))
3434 return TaskMetadata .model_validate_json (result ) if result else None
3535
3636 async def get_uuids (self , task_context : TaskContext ) -> set [TaskUUID ]:
@@ -53,16 +53,14 @@ async def get_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
5353 return keys
5454
5555 async def remove (self , task_id : TaskID ) -> None :
56- await self ._redis_client_sdk .redis .delete (
57- _CELERY_TASK_METADATA_PREFIX + task_id
58- )
56+ await self ._redis_client_sdk .redis .delete (_build_key (task_id ))
5957 AsyncResult (_CELERY_TASK_META_PREFIX + task_id ).forget ()
6058
6159 async def set (
6260 self , task_id : TaskID , task_metadata : TaskMetadata , expiry : timedelta
6361 ) -> None :
6462 await self ._redis_client_sdk .redis .set (
65- _CELERY_TASK_METADATA_PREFIX + task_id ,
63+ _build_key ( task_id ) ,
6664 task_metadata .model_dump_json (),
6765 ex = expiry ,
6866 )
0 commit comments