1515_LIST_CONCURRENCY : Final [int ] = 2
1616
1717
18- def _to_redis (data : dict [str , Any ]) -> dict [str , str ]:
19- """convert values to redis compatible data"""
18+ def _to_redis_hash_mapping (data : dict [str , Any ]) -> dict [str , str ]:
2019 return {k : json_dumps (v ) for k , v in data .items ()}
2120
2221
23- def _from_redis (data : dict [str , str ]) -> dict [str , Any ]:
24- """converrt back from redis compatible data to python types"""
22+ def _load_from_redis_hash (data : dict [str , str ]) -> dict [str , Any ]:
2523 return {k : json_loads (v ) for k , v in data .items ()}
2624
2725
@@ -51,7 +49,7 @@ def _redis(self) -> aioredis.Redis:
5149 def _get_redis_key_task_data_match (self ) -> str :
5250 return f"{ self .namespace } :{ _STORE_TYPE_TASK_DATA } *"
5351
54- def _get_redis_key_task_data_hash (self , task_id : TaskId ) -> str :
52+ def _get_redis_task_data_key (self , task_id : TaskId ) -> str :
5553 return f"{ self .namespace } :{ _STORE_TYPE_TASK_DATA } :{ task_id } "
5654
5755 def _get_key_to_remove (self ) -> str :
@@ -62,20 +60,20 @@ def _get_key_to_remove(self) -> str:
6260 async def get_task_data (self , task_id : TaskId ) -> TaskData | None :
6361 result : dict [str , Any ] = await handle_redis_returns_union_types (
6462 self ._redis .hgetall (
65- self ._get_redis_key_task_data_hash (task_id ),
63+ self ._get_redis_task_data_key (task_id ),
6664 )
6765 )
6866 return (
69- TypeAdapter (TaskData ).validate_python (_from_redis (result ))
67+ TypeAdapter (TaskData ).validate_python (_load_from_redis_hash (result ))
7068 if result and len (result )
7169 else None
7270 )
7371
7472 async def add_task_data (self , task_id : TaskId , value : TaskData ) -> None :
7573 await handle_redis_returns_union_types (
7674 self ._redis .hset (
77- self ._get_redis_key_task_data_hash (task_id ),
78- mapping = _to_redis (value .model_dump ()),
75+ self ._get_redis_task_data_key (task_id ),
76+ mapping = _to_redis_hash_mapping (value .model_dump ()),
7977 )
8078 )
8179
@@ -87,8 +85,8 @@ async def update_task_data(
8785 ) -> None :
8886 await handle_redis_returns_union_types (
8987 self ._redis .hset (
90- self ._get_redis_key_task_data_hash (task_id ),
91- mapping = _to_redis (updates ),
88+ self ._get_redis_task_data_key (task_id ),
89+ mapping = _to_redis_hash_mapping (updates ),
9290 )
9391 )
9492
@@ -107,14 +105,14 @@ async def list_tasks_data(self) -> list[TaskData]:
107105 )
108106
109107 return [
110- TypeAdapter (TaskData ).validate_python (_from_redis (item ))
108+ TypeAdapter (TaskData ).validate_python (_load_from_redis_hash (item ))
111109 for item in result
112110 if item
113111 ]
114112
115113 async def delete_task_data (self , task_id : TaskId ) -> None :
116114 await handle_redis_returns_union_types (
117- self ._redis .delete (self ._get_redis_key_task_data_hash (task_id ))
115+ self ._redis .delete (self ._get_redis_task_data_key (task_id ))
118116 )
119117
120118 # to cancel
0 commit comments