1
1
import pickle
2
2
from typing import Any , Dict , TypeVar
3
3
4
- from redis_client import RedisClient
4
+ from redis . asyncio import ConnectionPool , Redis
5
5
from taskiq .abc .result_backend import TaskiqResult
6
6
7
7
from taskiq import AsyncResultBackend
@@ -17,11 +17,11 @@ def __init__(self, redis_url: str):
17
17
18
18
async def startup (self ) -> None :
19
19
"""Makes redis connection on startup."""
20
- self .redis_client = RedisClient ( redis_url = self .redis_url )
20
+ self .redis_pool = ConnectionPool . from_url ( self .redis_url )
21
21
22
22
async def shutdown (self ) -> None :
23
23
"""Closes redis connection."""
24
- await self .redis_client . close ()
24
+ await self .redis_pool . disconnect ()
25
25
26
26
async def set_result (
27
27
self ,
@@ -37,16 +37,16 @@ async def set_result(
37
37
:param task_id: ID of the task.
38
38
:param result: TaskiqResult instance.
39
39
"""
40
- to_insert_result_data = {}
40
+ result_dict = result . dict ()
41
41
42
- for result_key , result_value in result .__dict__ .items ():
43
- result_value = pickle .dumps (result_value )
44
- to_insert_result_data [result_key ] = result_value
42
+ for result_key , result_value in result_dict .items ():
43
+ result_dict [result_key ] = pickle .dumps (result_value )
45
44
46
- await self .redis_client .hset (
47
- task_id ,
48
- mapping = to_insert_result_data ,
49
- )
45
+ async with Redis (connection_pool = self .redis_pool ) as redis :
46
+ await redis .hset (
47
+ task_id ,
48
+ mapping = result_dict ,
49
+ )
50
50
51
51
async def is_result_ready (self , task_id : str ) -> bool :
52
52
"""
@@ -56,7 +56,8 @@ async def is_result_ready(self, task_id: str) -> bool:
56
56
57
57
:returns: True if the result is ready else False.
58
58
"""
59
- return await self .redis_client .exists (task_id )
59
+ async with Redis (connection_pool = self .redis_pool ) as redis :
60
+ return await redis .exists (task_id )
60
61
61
62
async def get_result (
62
63
self ,
@@ -70,24 +71,32 @@ async def get_result(
70
71
:param with_logs: if True it will download task's logs.
71
72
:return: task's return value.
72
73
"""
73
- task_data : Dict [str , Any ] = {"log" : None }
74
-
75
- redis_key_result_param = {
76
- "is_err" : "is_err" ,
77
- "_return_value" : "return_value" ,
78
- "execution_time" : "execution_time" ,
74
+ result : Dict [str , Any ] = {
75
+ result_key : None for result_key in TaskiqResult .__fields__
79
76
}
80
77
81
- if with_logs :
82
- redis_key_result_param [ "log" ] = "log"
78
+ if not with_logs :
79
+ result . pop ( "log" )
83
80
84
- for redis_key , result_param in redis_key_result_param .items ():
85
- key_data = pickle .loads (
86
- await self .redis_client .hget (
87
- task_id ,
88
- redis_key ,
89
- ),
81
+ async with Redis (connection_pool = self .redis_pool ) as redis :
82
+ result_values = await redis .hmget (
83
+ name = task_id ,
84
+ keys = result ,
90
85
)
91
- task_data [result_param ] = key_data
92
86
93
- return TaskiqResult (** task_data )
87
+ for result_value , result_key in zip (result_values , result ):
88
+ result [result_key ] = pickle .loads (result_value )
89
+
90
+ return TaskiqResult (** result )
91
+
92
+
93
+ async def main ():
94
+ t = TaskiqResult (
95
+ is_err = False ,
96
+ log = "ASD" ,
97
+ return_value = 123 ,
98
+ execution_time = 1.0 ,
99
+ )
100
+
101
+ r = RedisAsyncResultBackend ("redis://localhost:6379" )
102
+ await r .set_result ()
0 commit comments