File tree Expand file tree Collapse file tree 3 files changed +12
-7
lines changed
Expand file tree Collapse file tree 3 files changed +12
-7
lines changed Original file line number Diff line number Diff line change 2929from taskiq .acks import AckableMessage
3030from taskiq .decor import AsyncTaskiqDecoratedTask
3131from taskiq .events import TaskiqEvents
32- from taskiq .exceptions import TaskRejectedError
32+ from taskiq .exceptions import TaskBrokerMismatchError
3333from taskiq .formatters .proxy_formatter import ProxyFormatter
3434from taskiq .message import BrokerMessage
3535from taskiq .result_backends .dummy import DummyResultBackend
@@ -518,14 +518,12 @@ def _register_task(
518518 By default we register tasks in local task registry.
519519 But this behaviour can be changed in subclasses.
520520
521- This method may raise TaskRejectedError if task has already been
521+ This method may raise TaskBrokerMismatchError if task has already been
522522 registered to a different broker.
523523
524524 :param task_name: Name of a task.
525525 :param task: Decorated task.
526526 """
527527 if task .broker != self :
528- raise TaskRejectedError (
529- f"Task already has a different broker ({ task .broker } )" ,
530- )
528+ raise TaskBrokerMismatchError (broker = task .broker )
531529 self .local_task_registry [task_name ] = task
Original file line number Diff line number Diff line change @@ -101,3 +101,9 @@ class ScheduledTaskCancelledError(TaskiqError):
101101 """Scheduled task was cancelled and not sent to the queue."""
102102
103103 __template__ = "Cannot send scheduled task to the queue."
104+
105+
106+ class TaskBrokerMismatchError (TaskRejectedError ):
107+ """Task has a different broker than the one it was registered to."""
108+
109+ __template__ = "Task already has a different broker ({broker})"
Original file line number Diff line number Diff line change @@ -32,7 +32,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
3232 # if task broker doesn't match self, something is probably wrong
3333 logger .warning (
3434 f"Broker for { task_name } ({ task .broker } ) doesn't "
35- f"match scheduler's broker ({ self .broker } )" ,
35+ f"match scheduler's broker ({ self .broker } )"
3636 )
3737 continue
3838 for schedule in task .labels .get ("schedule" , []):
@@ -70,8 +70,9 @@ def post_send(self, scheduled_task: ScheduledTask) -> None:
7070 # if task broker doesn't match self, something is probably wrong
7171 logger .warning (
7272 f"Broker for { task_name } ({ task .broker } ) doesn't "
73- f"match scheduler's broker ({ self .broker } )" ,
73+ f"match scheduler's broker ({ self .broker } )"
7474 )
75+ continue
7576 if scheduled_task .task_name != task_name :
7677 continue
7778
You can’t perform that action at this time.
0 commit comments