17
17
from redis .asyncio .cluster import RedisCluster
18
18
from taskiq import AsyncResultBackend
19
19
from taskiq .abc .result_backend import TaskiqResult
20
+ from taskiq .abc .serializer import TaskiqSerializer
20
21
21
22
from taskiq_redis .exceptions import (
22
23
DuplicateExpireTimeSelectedError ,
23
24
ExpireTimeMustBeMoreThanZeroError ,
24
25
ResultIsMissingError ,
25
26
)
27
+ from taskiq_redis .serializer import PickleSerializer
26
28
27
29
if sys .version_info >= (3 , 10 ):
28
30
from typing import TypeAlias
@@ -303,6 +305,7 @@ def __init__(
303
305
result_px_time : Optional [int ] = None ,
304
306
min_other_sentinels : int = 0 ,
305
307
sentinel_kwargs : Optional [Any ] = None ,
308
+ serializer : Optional [TaskiqSerializer ] = None ,
306
309
** connection_kwargs : Any ,
307
310
) -> None :
308
311
"""
@@ -328,6 +331,9 @@ def __init__(
328
331
** connection_kwargs ,
329
332
)
330
333
self .master_name = master_name
334
+ if serializer is None :
335
+ serializer = PickleSerializer ()
336
+ self .serializer = serializer
331
337
self .keep_results = keep_results
332
338
self .result_ex_time = result_ex_time
333
339
self .result_px_time = result_px_time
@@ -369,7 +375,7 @@ async def set_result(
369
375
"""
370
376
redis_set_params : Dict [str , Union [str , bytes , int ]] = {
371
377
"name" : task_id ,
372
- "value" : pickle . dumps (result ),
378
+ "value" : self . serializer . dumpb (result ),
373
379
}
374
380
if self .result_ex_time :
375
381
redis_set_params ["ex" ] = self .result_ex_time
0 commit comments