11import asyncio
2+ from abc import ABC , abstractmethod
23from time import time
3- from typing import TYPE_CHECKING , Generic , TypeVar
4+ from typing import TYPE_CHECKING , Any , Coroutine , Generic , TypeVar , Union
45
56from taskiq .exceptions import (
67 ResultGetError ,
78 ResultIsReadyError ,
89 TaskiqResultTimeoutError ,
910)
11+ from taskiq .utils import run_sync
1012
1113if TYPE_CHECKING :
1214 from taskiq .abc .result_backend import AsyncResultBackend
1517_ReturnType = TypeVar ("_ReturnType" )
1618
1719
18- class AsyncTaskiqTask (Generic [_ReturnType ]):
20+ class _Task (ABC , Generic [_ReturnType ]):
21+ """TaskiqTask interface."""
22+
23+ @abstractmethod
24+ def is_ready (self ) -> Union [bool , Coroutine [Any , Any , bool ]]:
25+ """
26+ Method to check wether result is ready.
27+
28+ :return: True if result is ready.
29+ """
30+
31+ @abstractmethod
32+ def get_result ( # noqa: WPS234
33+ self ,
34+ with_logs : bool = False ,
35+ ) -> Union [
36+ "TaskiqResult[_ReturnType]" ,
37+ Coroutine [Any , Any , "TaskiqResult[_ReturnType]" ],
38+ ]:
39+ """
40+ Get actual execution result.
41+
42+ :param with_logs: wether you want to fetch logs.
43+ :return: TaskiqResult.
44+ """
45+
46+ @abstractmethod
47+ def wait_result ( # noqa: WPS234
48+ self ,
49+ check_interval : float = 0.2 ,
50+ timeout : float = - 1.0 ,
51+ with_logs : bool = False ,
52+ ) -> Union [
53+ "TaskiqResult[_ReturnType]" ,
54+ Coroutine [Any , Any , "TaskiqResult[_ReturnType]" ],
55+ ]:
56+ """
57+ Wait for result to become ready and get it.
58+
59+ This function constantly checks wheter result is ready
60+ and fetches it when it becomes available.
61+
62+ :param check_interval: how ofen availability is checked.
63+ :param timeout: maximum amount of time it will wait
64+ before raising TaskiqResultTimeoutError.
65+ :param with_logs: whether you need to download logs.
66+ :return: TaskiqResult.
67+ """
68+
69+
70+ class SyncTaskiqTask (_Task [_ReturnType ]):
71+ """Sync wrapper over AsyncTaskiqTask."""
72+
73+ def __init__ (self , async_task : "AsyncTaskiqTask[_ReturnType]" ) -> None :
74+ self .async_task = async_task
75+
76+ def is_ready (self ) -> bool :
77+ """
78+ Checks if task is completed.
79+
80+ :return: True if task is completed.
81+ """
82+ return run_sync (self .async_task .is_ready ())
83+
84+ def get_result (self , with_logs : bool = False ) -> "TaskiqResult[_ReturnType]" :
85+ """
86+ Get result of a task from result backend.
87+
88+ :param with_logs: whether you want to fetch logs from worker.
89+
90+ :return: task's return value.
91+ """
92+ return run_sync (self .async_task .get_result (with_logs = with_logs ))
93+
94+ def wait_result (
95+ self ,
96+ check_interval : float = 0.2 ,
97+ timeout : float = - 1 ,
98+ with_logs : bool = False ,
99+ ) -> "TaskiqResult[_ReturnType]" :
100+ """
101+ Waits until result is ready.
102+
103+ This method just checks whether the task is
104+ ready. And if it is it returns the result.
105+
106+ It may throw TaskiqResultTimeoutError if
107+ task didn't became ready in provided
108+ period of time.
109+
110+ :param check_interval: How often checks are performed.
111+ :param timeout: timeout for the result.
112+ :param with_logs: whether you want to fetch logs from worker.
113+ :return: task's return value.
114+ """
115+ return run_sync (
116+ self .async_task .wait_result (
117+ check_interval = check_interval ,
118+ timeout = timeout ,
119+ with_logs = with_logs ,
120+ ),
121+ )
122+
123+
124+ class AsyncTaskiqTask (_Task [_ReturnType ]):
19125 """AsyncTask for AsyncResultBackend."""
20126
21127 def __init__ (
@@ -59,8 +165,8 @@ async def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType
59165
60166 async def wait_result (
61167 self ,
62- check_interval : float = 1.0 ,
63- timeout : float = 5 .0 ,
168+ check_interval : float = 0.2 ,
169+ timeout : float = - 1 .0 ,
64170 with_logs : bool = False ,
65171 ) -> "TaskiqResult[_ReturnType]" :
66172 """
@@ -83,6 +189,6 @@ async def wait_result(
83189 start_time = time ()
84190 while not await self .is_ready ():
85191 await asyncio .sleep (check_interval )
86- if time () - start_time > timeout :
192+ if 0 < timeout < time () - start_time :
87193 raise TaskiqResultTimeoutError ()
88194 return await self .get_result (with_logs = with_logs )
0 commit comments