1
1
import pickle
2
- from typing import Any , Dict , TypeVar
2
+ from typing import TypeVar
3
3
4
4
from redis .asyncio import ConnectionPool , Redis
5
5
from taskiq .abc .result_backend import TaskiqResult
@@ -13,11 +13,7 @@ class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
13
13
"""Async result based on redis."""
14
14
15
15
def __init__ (self , redis_url : str ):
16
- self .redis_url = redis_url
17
-
18
- async def startup (self ) -> None :
19
- """Makes redis connection on startup."""
20
- self .redis_pool = ConnectionPool .from_url (self .redis_url )
16
+ self .redis_pool = ConnectionPool .from_url (redis_url )
21
17
22
18
async def shutdown (self ) -> None :
23
19
"""Closes redis connection."""
@@ -57,9 +53,9 @@ async def is_result_ready(self, task_id: str) -> bool:
57
53
:returns: True if the result is ready else False.
58
54
"""
59
55
async with Redis (connection_pool = self .redis_pool ) as redis :
60
- return await redis .exists (task_id )
56
+ return bool ( await redis .exists (task_id ) )
61
57
62
- async def get_result (
58
+ async def get_result ( # noqa: WPS210
63
59
self ,
64
60
task_id : str ,
65
61
with_logs : bool = False ,
@@ -71,20 +67,20 @@ async def get_result(
71
67
:param with_logs: if True it will download task's logs.
72
68
:return: task's return value.
73
69
"""
74
- result : Dict [str , Any ] = {
75
- result_key : None for result_key in TaskiqResult .__fields__
76
- }
70
+ fields = list (TaskiqResult .__fields__ .keys ())
77
71
78
72
if not with_logs :
79
- result . pop ("log" )
73
+ fields . remove ("log" )
80
74
81
75
async with Redis (connection_pool = self .redis_pool ) as redis :
82
76
result_values = await redis .hmget (
83
77
name = task_id ,
84
- keys = result ,
78
+ keys = fields ,
85
79
)
86
80
87
- for result_value , result_key in zip (result_values , result ):
88
- result [result_key ] = pickle .loads (result_value )
81
+ result = {
82
+ result_key : pickle .loads (result_value )
83
+ for result_value , result_key in zip (result_values , fields )
84
+ }
89
85
90
86
return TaskiqResult (** result )
0 commit comments