Skip to content

Commit 17c2332

Browse files
committed
Added broker for shared tasks.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 25dd38a commit 17c2332

File tree

5 files changed

+101
-16
lines changed

5 files changed

+101
-16
lines changed

taskiq/abc/broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
result_backend = DummyResultBackend()
3232
self.result_backend = result_backend
3333
self.is_worker_process = False
34+
self.decorator_class = AsyncTaskiqDecoratedTask
3435

3536
async def startup(self) -> None:
3637
"""Do something when starting broker."""
@@ -133,7 +134,7 @@ def inner(
133134
wrapper = wraps(func)
134135

135136
decorated_task = wrapper(
136-
AsyncTaskiqDecoratedTask(
137+
self.decorator_class(
137138
broker=self,
138139
original_func=func,
139140
labels=inner_labels,

taskiq/brokers/shared_broker.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from typing import AsyncGenerator, Optional, TypeVar
2+
3+
from taskiq.abc.broker import AsyncBroker
4+
from taskiq.decor import AsyncTaskiqDecoratedTask
5+
from taskiq.exceptions import TaskiqError
6+
from taskiq.kicker import AsyncKicker
7+
from taskiq.message import TaskiqMessage
8+
from taskiq.types_helpers import ReturnType_
9+
10+
Params_ = TypeVar("Params_") # noqa: WPS120
11+
12+
13+
class SharedDecoratedTask(AsyncTaskiqDecoratedTask[Params_, ReturnType_]):
14+
"""Decorator that is used with shared broker."""
15+
16+
def kicker(self) -> AsyncKicker[Params_, ReturnType_]:
17+
"""
18+
This method updates getting default kicker.
19+
20+
In this method we want to get default broker from
21+
our shared broker and send task to it, instead
22+
of shared_broker.
23+
24+
:raises TaskiqError: if _default_broker is not set.
25+
:return: new kicker.
26+
"""
27+
broker = getattr(self.broker, "_default_broker", None)
28+
if broker is None:
29+
raise TaskiqError(
30+
"You cannot use kiq directly on shared task "
31+
"without setting the default_broker.",
32+
)
33+
return AsyncKicker(
34+
task_name=self.task_name,
35+
broker=broker,
36+
labels=self.labels,
37+
)
38+
39+
40+
class AsyncSharedBroker(AsyncBroker):
41+
"""Broker for creating shared tasks."""
42+
43+
def __init__(self) -> None:
44+
super().__init__(None)
45+
self._default_broker: Optional[AsyncBroker] = None
46+
self.decorator_class = SharedDecoratedTask
47+
48+
async def kick(self, message: TaskiqMessage) -> None:
49+
"""
50+
Shared broker cannot kick tasks.
51+
52+
:param message: message to send.
53+
:raises TaskiqError: if called.
54+
"""
55+
raise TaskiqError("Shared broker cannot kick tasks.")
56+
57+
def default_broker(self, new_broker: AsyncBroker) -> None:
58+
"""
59+
Updates default broker.
60+
61+
:param new_broker: new async broker to kick tasks with.
62+
"""
63+
self._default_broker = new_broker
64+
65+
async def listen(self) -> AsyncGenerator[TaskiqMessage, None]: # type: ignore
66+
"""
67+
Shared broker cannot listen to tasks.
68+
69+
This method will throw an exception.
70+
71+
:raises TaskiqError: if called.
72+
"""
73+
raise TaskiqError("Shared broker cannot listen")
74+
75+
76+
async_shared_broker = AsyncSharedBroker()

taskiq/cli/async_task_runner.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def parse_params( # noqa: C901
6464
argnum += 1
6565
annot = params_type.annotation
6666
value = None
67-
logger.debug("Trying to parse %s as %s" % (param_name, params_type.annotation))
67+
logger.debug("Trying to parse %s as %s", param_name, params_type.annotation)
6868
if argnum >= len(message.args):
6969
value = message.kwargs.get(param_name)
7070
if value is None:
@@ -146,7 +146,8 @@ async def run_task( # noqa: WPS210
146146
except Exception as exc:
147147
is_err = True
148148
logger.error(
149-
"Exception found while executing function: %s" % exc,
149+
"Exception found while executing function: %s",
150+
exc,
150151
exc_info=True,
151152
)
152153
execution_time = time() - start_time
@@ -180,7 +181,7 @@ def exit_process(task: asyncio.Task[Any]) -> NoReturn:
180181
try:
181182
result = task.result()
182183
if result is not None:
183-
logger.info("Broker returned value on shutdown: '%s'" % str(result))
184+
logger.info("Broker returned value on shutdown: '%s'", str(result))
184185
except Exception as exc:
185186
logger.warning("Exception was found while shutting down!")
186187
logger.warning(exc, exc_info=True)

taskiq/cli/worker.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,15 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
133133
if worker.is_alive():
134134
continue
135135
if worker.exitcode is not None and worker.exitcode > 0 and restart_workers:
136-
logger.info("Trying to restart the worker-%s" % worker_id)
136+
logger.info("Trying to restart the worker-%s", worker_id)
137137
worker_processes[worker_id] = Process(
138138
target=start_listen,
139139
kwargs={"args": args},
140140
name=f"worker-{worker_id}",
141141
)
142142
worker_processes[worker_id].start()
143143
else:
144-
logger.info("Worker-%s has finished." % worker_id)
144+
logger.info("Worker-%s has finished.", worker_id)
145145
worker.join()
146146
process_to_remove.append(worker)
147147

@@ -162,7 +162,7 @@ def run_worker(args: TaskiqArgs) -> None:
162162
level=getLevelName(args.log_level),
163163
format=("[%(asctime)s][%(levelname)-7s][%(processName)s] %(message)s"),
164164
)
165-
logger.info("Starting %s worker processes." % args.workers)
165+
logger.info("Starting %s worker processes.", args.workers)
166166

167167
global worker_processes # noqa: WPS420
168168

@@ -174,11 +174,9 @@ def run_worker(args: TaskiqArgs) -> None:
174174
)
175175
work_proc.start()
176176
logger.debug(
177-
"Started process worker-%d with pid %s "
178-
% (
179-
process,
180-
work_proc.pid,
181-
),
177+
"Started process worker-%d with pid %s ",
178+
process,
179+
work_proc.pid,
182180
)
183181
worker_processes.append(work_proc)
184182

taskiq/result_backends/dummy.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any, TypeVar
22

33
from taskiq.abc.result_backend import AsyncResultBackend
4+
from taskiq.result import TaskiqResult
45

56
_ReturnType = TypeVar("_ReturnType")
67

@@ -20,23 +21,31 @@ async def set_result(self, task_id: str, result: Any) -> None:
2021
"""
2122
return await super().set_result(task_id, result)
2223

23-
async def is_result_ready(self, _id: str) -> bool:
24+
async def is_result_ready(self, task_id: str) -> bool:
2425
"""
2526
Checks whether task is completed.
2627
2728
Result is always ready,
2829
since it doesn't care about tasks.
2930
30-
:param _id: task's id.
31+
:param task_id: task's id.
3132
:return: true.
3233
"""
3334
return True
3435

35-
def get_result(self, _id: str, _with_logs: bool = False) -> Any:
36+
async def get_result(self, task_id: str, with_logs: bool = False) -> Any:
3637
"""
3738
Returns None.
3839
3940
This result doesn't care about passed parameters.
4041
41-
:param _id: task's id.
42+
:param task_id: task's id.
43+
:param with_logs: wether to fetch logs.
44+
:returns: TaskiqResult.
4245
"""
46+
return TaskiqResult(
47+
is_err=False,
48+
log=None,
49+
return_value=None,
50+
execution_time=0,
51+
)

0 commit comments

Comments
 (0)