|
24 | 24 | from typing_extensions import ParamSpec, Self, TypeAlias |
25 | 25 |
|
26 | 26 | from taskiq.abc.middleware import TaskiqMiddleware |
| 27 | +from taskiq.acks import AckableMessage |
27 | 28 | from taskiq.decor import AsyncTaskiqDecoratedTask |
28 | 29 | from taskiq.events import TaskiqEvents |
29 | 30 | from taskiq.formatters.json_formatter import JSONFormatter |
@@ -68,7 +69,10 @@ class AsyncBroker(ABC): |
68 | 69 | """ |
69 | 70 |
|
70 | 71 | available_tasks: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {} |
| 72 | + # True only if broker runs in worker process. |
71 | 73 | is_worker_process: bool = False |
| 74 | + # True only if broker runs in scheduler process. |
| 75 | + is_scheduler_process: bool = False |
72 | 76 |
|
73 | 77 | def __init__( |
74 | 78 | self, |
@@ -182,13 +186,19 @@ async def kick( |
182 | 186 | """ |
183 | 187 |
|
184 | 188 | @abstractmethod |
185 | | - def listen(self) -> AsyncGenerator[bytes, None]: |
| 189 | + def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]: |
186 | 190 | """ |
187 | 191 | This function listens to new messages and yields them. |
188 | 192 |
|
189 | 193 | This it the main point for workers. |
190 | 194 | This function is used to get new tasks from the network. |
191 | 195 |
|
| 196 | + If your broker support acknowledgement, then you |
| 197 | + should wrap your message in AckableMessage dataclass. |
| 198 | +
|
| 199 | + If your messages was wrapped in AckableMessage dataclass, |
| 200 | + taskiq will call ack when finish processing message. |
| 201 | +
|
192 | 202 | :yield: incoming messages. |
193 | 203 | :return: nothing. |
194 | 204 | """ |
|
0 commit comments