File tree Expand file tree Collapse file tree 3 files changed +71
-4
lines changed Expand file tree Collapse file tree 3 files changed +71
-4
lines changed Original file line number Diff line number Diff line change @@ -24,7 +24,7 @@ from taskiq_redis.redis_backend import RedisAsyncResultBackend
24
24
25
25
26
26
redis_async_result = RedisAsyncResultBackend(
27
- url = " redis://localhost:6379" ,
27
+ redis_url = " redis://localhost:6379" ,
28
28
)
29
29
30
30
broker = RedisBroker(
@@ -41,7 +41,7 @@ async def best_task_ever() -> None:
41
41
42
42
43
43
async def main ():
44
- task = await my_async_task .kiq()
44
+ task = await best_task_ever .kiq()
45
45
print (await task.get_result())
46
46
47
47
@@ -60,4 +60,5 @@ RedisBroker parameters:
60
60
## RedisAsyncResultBackend configuration
61
61
62
62
RedisAsyncResultBackend parameters:
63
- * ` url ` - url to redis.
63
+ * ` redis_url ` - url to redis.
64
+ * ` keep_results ` - flag to not remove results from Redis after reading.
Original file line number Diff line number Diff line change 11
11
class RedisAsyncResultBackend (AsyncResultBackend [_ReturnType ]):
12
12
"""Async result based on redis."""
13
13
14
- def __init__ (self , redis_url : str ):
14
+ def __init__ (self , redis_url : str , keep_results : bool = True ):
15
+ """
16
+ Constructs a new result backend.
17
+
18
+ :param redis_url: url to redis.
19
+ :param keep_results: flag to not remove results from Redis after reading.
20
+ """
15
21
self .redis_pool = ConnectionPool .from_url (redis_url )
22
+ self .keep_results = keep_results
16
23
17
24
async def shutdown (self ) -> None :
18
25
"""Closes redis connection."""
@@ -80,6 +87,9 @@ async def get_result( # noqa: WPS210
80
87
keys = fields ,
81
88
)
82
89
90
+ if not self .keep_results :
91
+ await redis .delete (task_id )
92
+
83
93
result = {
84
94
result_key : pickle .loads (result_value )
85
95
for result_value , result_key in zip (result_values , fields )
Original file line number Diff line number Diff line change @@ -68,3 +68,59 @@ async def test_fetch_without_logs(redis_url: str) -> None:
68
68
assert fetched_result .return_value == 11
69
69
assert fetched_result .execution_time == 112.2 # noqa: WPS459
70
70
assert fetched_result .is_err
71
+
72
+
73
+ @pytest .mark .anyio
74
+ async def test_remove_results_after_reading (redis_url : str ) -> None :
75
+ """
76
+ Check if removing results after reading works fine.
77
+
78
+ :param redis_url: redis URL.
79
+ """
80
+ result_backend = RedisAsyncResultBackend ( # type: ignore
81
+ redis_url = redis_url ,
82
+ keep_results = False ,
83
+ )
84
+ task_id = uuid .uuid4 ().hex
85
+ result : "TaskiqResult[int]" = TaskiqResult (
86
+ is_err = True ,
87
+ log = "My Log" ,
88
+ return_value = 11 ,
89
+ execution_time = 112.2 ,
90
+ )
91
+ await result_backend .set_result (
92
+ task_id = task_id ,
93
+ result = result ,
94
+ )
95
+
96
+ await result_backend .get_result (task_id = task_id )
97
+ with pytest .raises (Exception ):
98
+ await result_backend .get_result (task_id = task_id )
99
+
100
+
101
+ @pytest .mark .anyio
102
+ async def test_keep_results_after_reading (redis_url : str ) -> None :
103
+ """
104
+ Check if keeping results after reading works fine.
105
+
106
+ :param redis_url: redis URL.
107
+ """
108
+ result_backend = RedisAsyncResultBackend ( # type: ignore
109
+ redis_url = redis_url ,
110
+ keep_results = True ,
111
+ )
112
+ task_id = uuid .uuid4 ().hex
113
+ result : "TaskiqResult[int]" = TaskiqResult (
114
+ is_err = True ,
115
+ log = "My Log" ,
116
+ return_value = 11 ,
117
+ execution_time = 112.2 ,
118
+ )
119
+ await result_backend .set_result (
120
+ task_id = task_id ,
121
+ result = result ,
122
+ )
123
+
124
+ res1 = await result_backend .get_result (task_id = task_id )
125
+ res2 = await result_backend .get_result (task_id = task_id )
126
+ assert res1 == res2
You can’t perform that action at this time.
0 commit comments