diff --git a/README.md b/README.md index df9af5b7..3f9d57ea 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ pip install git+https://github.com/taskiq-python/taskiq At first you need to create a broker. Broker is an object that can communicate to workers using distributed queues. -We have differet brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance. +We have different brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance. ```python from taskiq_nats import JetStreamBroker diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md index 97e985fe..4502ac0b 100644 --- a/docs/available-components/schedule-sources.md +++ b/docs/available-components/schedule-sources.md @@ -34,9 +34,9 @@ The format of the schedule label is the following: @broker.task( schedule=[ { - "cron": "* * * * *", # type: str, either cron or time shoule be specified. - "cron_offset": None # type: str | timedelta | None, can be ommited. - "time": None # type: datetime | None, either cron or time shoule be specified. + "cron": "* * * * *", # type: str, either cron or time should be specified. + "cron_offset": None # type: str | timedelta | None, can be omitted. + "time": None # type: datetime | None, either cron or time should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. "labels": {}, # type: Dict[str, Any] | None, can be omitted. diff --git a/docs/framework_integrations/faststream.md b/docs/framework_integrations/faststream.md index f774117a..13098265 100644 --- a/docs/framework_integrations/faststream.md +++ b/docs/framework_integrations/faststream.md @@ -4,7 +4,7 @@ order: 3 # Taskiq + FastStream -[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the differense is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers. +[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the difference is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers. If you want to declare communication between different projects you can use taskiq, but it might be a bit more complex than using FastStream. diff --git a/docs/framework_integrations/taskiq-with-aiogram.md b/docs/framework_integrations/taskiq-with-aiogram.md index d10682d1..cafffce6 100644 --- a/docs/framework_integrations/taskiq-with-aiogram.md +++ b/docs/framework_integrations/taskiq-with-aiogram.md @@ -66,7 +66,7 @@ bot = Bot(token="TOKEN") # Taskiq calls this function when starting the worker. @dp.startup() async def setup_taskiq(bot: Bot, *_args, **_kwargs): - # Here we check if it's a clien-side, + # Here we check if it's a client-side, # Because otherwise you're going to # create infinite loop of startup events. if not broker.is_worker_process: diff --git a/docs/framework_integrations/taskiq-with-fastapi.md b/docs/framework_integrations/taskiq-with-fastapi.md index 149eb4f0..5f7a9b56 100644 --- a/docs/framework_integrations/taskiq-with-fastapi.md +++ b/docs/framework_integrations/taskiq-with-fastapi.md @@ -101,7 +101,7 @@ async def app_shutdown(): ``` -And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependenices. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/). +And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependencies. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/). ## Testing diff --git a/docs/guide/cli.md b/docs/guide/cli.md index ee340c65..df111af0 100644 --- a/docs/guide/cli.md +++ b/docs/guide/cli.md @@ -112,7 +112,7 @@ The number of signals before a hard kill can be configured with the `--hardkill- - `--log-level` is used to set a log level (default `INFO`). * `--max-async-tasks` - maximum number of simultaneously running async tasks. * `--max-prefetch` - number of tasks to be prefetched before execution. (Useful for systems with high message rates, but brokers should support acknowledgements). -* `--max-threadpool-threads` - number of threads for sync function exection. +* `--max-threadpool-threads` - number of threads for sync function execution. * `--no-propagate-errors` - if this parameter is enabled, exceptions won't be thrown in generator dependencies. * `--receiver` - python path to custom receiver class. * `--receiver_arg` - custom args for receiver. diff --git a/docs/guide/scheduling-tasks.md b/docs/guide/scheduling-tasks.md index 0a7270cb..43793204 100644 --- a/docs/guide/scheduling-tasks.md +++ b/docs/guide/scheduling-tasks.md @@ -74,7 +74,7 @@ Every time we update schedule it gets task from the source and executes this fun Sometimes, you want to be specific in terms of time zones. We have you covered. Our `ScheduledTask` model has fields for that. Use these fields or not, it's up to the specific schedule source. -Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convinient for developers. +Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convenient for developers. For the `time` field of `ScheduledTask` we use timezone information from datetime to check if a task should run. @@ -85,7 +85,7 @@ an offset of the cron task. An offset can be a string like `Europe/Berlin` or an By default, when you start the scheduler it will get all tasks from the schedule source and check whether they should have been executed in this minute. If tasks should have been executed, they will be executed. -This behaviour might be not convinient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago. +This behaviour might be not convenient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago. To avoid this behaviour, you can pass the `--skip-first-run` flag to the `taskiq scheduler` command. In this case, the scheduler will wait until the start of the next minute and then start executing tasks. @@ -163,7 +163,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi await schedule.unschedule() ``` -Or it can be done manually, by calling `delete_schedule` on schedule source providing it whith `schedule_id`. +Or it can be done manually, by calling `delete_schedule` on schedule source providing it with `schedule_id`. ```python await redis_source.delete_schedule(schedule.schedule_id) diff --git a/docs/guide/testing-taskiq.md b/docs/guide/testing-taskiq.md index b8f93fa7..61013f43 100644 --- a/docs/guide/testing-taskiq.md +++ b/docs/guide/testing-taskiq.md @@ -229,7 +229,7 @@ async def modify_path(some_path: Path = TaskiqDepends()): ::: -To test the task itself, it's not different to the example without dependencies, but we jsut need to pass all +To test the task itself, it's not different to the example without dependencies, but we just need to pass all expected dependencies manually as function's arguments or key-word arguments. ```python diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index 69c9e25c..438e9f86 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -29,6 +29,7 @@ from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from taskiq.events import TaskiqEvents +from taskiq.exceptions import TaskBrokerMismatchError from taskiq.formatters.proxy_formatter import ProxyFormatter from taskiq.message import BrokerMessage from taskiq.result_backends.dummy import DummyResultBackend @@ -512,12 +513,17 @@ def _register_task( task: AsyncTaskiqDecoratedTask[Any, Any], ) -> None: """ - Mehtod is used to register tasks. + Method is used to register tasks. By default we register tasks in local task registry. But this behaviour can be changed in subclasses. + This method may raise TaskBrokerMismatchError if task has already been + registered to a different broker. + :param task_name: Name of a task. :param task: Decorated task. """ + if task.broker != self: + raise TaskBrokerMismatchError(broker=task.broker) self.local_task_registry[task_name] = task diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index 3fd2975a..23d64ef8 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -50,7 +50,7 @@ async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> N async def is_result_ready(self, task_id: str) -> bool: """ - Checks wether result is ready. + Checks whether result is ready. Readiness means that result with this task_id is present in results dict. @@ -87,7 +87,7 @@ async def set_progress( progress: TaskProgress[Any], ) -> None: """ - Set progress of task exection. + Set progress of task execution. :param task_id: task id :param progress: task execution progress diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 6fa3c091..63143e83 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -120,7 +120,7 @@ async def delayed_send( The main idea is that scheduler gathers tasks every minute and some of them have - specfic time. To respect the time, we calculate + specific time. To respect the time, we calculate the delay and send the task after some delay. :param scheduler: current scheduler. diff --git a/taskiq/cli/watcher.py b/taskiq/cli/watcher.py index 138c112a..9ed541d3 100644 --- a/taskiq/cli/watcher.py +++ b/taskiq/cli/watcher.py @@ -28,7 +28,7 @@ def dispatch(self, event: FileSystemEvent) -> None: """ React to event. - This function checks wether we need to + This function checks whether we need to react to event and calls callback if we do. :param event: incoming fs event. diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index df0afba7..5ee035bb 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -74,7 +74,7 @@ def start_listen(args: WorkerArgs) -> None: This function starts actual listening process. It imports broker and all tasks. - Since tasks auto registeres themselves in a broker, + Since tasks auto registers themselves in a broker, we don't need to do anything else other than importing. diff --git a/taskiq/context.py b/taskiq/context.py index 098b9d6b..cd1fd6dd 100644 --- a/taskiq/context.py +++ b/taskiq/context.py @@ -21,7 +21,7 @@ async def requeue(self) -> None: """ Requeue task. - This fuction creates a task with + This function creates a task with the same message and sends it using current broker. diff --git a/taskiq/exceptions.py b/taskiq/exceptions.py index 671bd373..535edd26 100644 --- a/taskiq/exceptions.py +++ b/taskiq/exceptions.py @@ -101,3 +101,9 @@ class ScheduledTaskCancelledError(TaskiqError): """Scheduled task was cancelled and not sent to the queue.""" __template__ = "Cannot send scheduled task to the queue." + + +class TaskBrokerMismatchError(TaskRejectedError): + """Task has a different broker than the one it was registered to.""" + + __template__ = "Task already has a different broker ({broker})" diff --git a/taskiq/funcs.py b/taskiq/funcs.py index 4e49e811..4ce68434 100644 --- a/taskiq/funcs.py +++ b/taskiq/funcs.py @@ -24,7 +24,7 @@ async def gather( :param tasks: tasks to await. :param timeout: timeout of awaiting, defaults to -1 - :param with_logs: wether you want to fetch logs, defaults to False + :param with_logs: whether you want to fetch logs, defaults to False :param periodicity: how often to ask for results, defaults to 0.1 :raises TaskiqResultTimeoutError: if timeout is reached. :return: list of TaskiqResults. diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index 41e687a6..c15fb935 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -246,7 +246,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915 # that happen while resolving dependencies. if dep_ctx: kwargs = await dep_ctx.resolve_kwargs() - # We udpate kwargs with kwargs from network. + # We update kwargs with kwargs from network. kwargs.update(message.kwargs) is_coroutine = True # If the function is a coroutine, we await it. @@ -379,7 +379,7 @@ async def prefetcher( self.sem_prefetch.release() continue # We're done, so now we need to check - # wether task has returned an error. + # whether task has returned an error. message = current_message.result() current_message = asyncio.create_task(iterator.__anext__()) # type: ignore fetched_tasks += 1 @@ -387,7 +387,7 @@ async def prefetcher( except (asyncio.CancelledError, StopAsyncIteration): break # We don't want to fetch new messages if we are shutting down. - logger.info("Stoping prefetching messages...") + logger.info("Stopping prefetching messages...") current_message.cancel() await queue.put(QUEUE_DONE) self.sem_prefetch.release() @@ -461,7 +461,7 @@ def _prepare_task(self, name: str, handler: Callable[..., Any]) -> None: It's useful for dynamic dependency resolution, because sometimes the receiver can get - funcion that is defined in runtime. We need + function that is defined in runtime. We need to be aware of that. :param name: task name. diff --git a/taskiq/result_backends/dummy.py b/taskiq/result_backends/dummy.py index 50550f80..3e002788 100644 --- a/taskiq/result_backends/dummy.py +++ b/taskiq/result_backends/dummy.py @@ -38,7 +38,7 @@ async def get_result(self, task_id: str, with_logs: bool = False) -> Any: This result doesn't care about passed parameters. :param task_id: task's id. - :param with_logs: wether to fetch logs. + :param with_logs: whether to fetch logs. :returns: TaskiqResult. """ return TaskiqResult( diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index e9116fd9..47d6b28e 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -29,6 +29,11 @@ async def get_schedules(self) -> List["ScheduledTask"]: schedules = [] for task_name, task in self.broker.get_all_tasks().items(): if task.broker != self.broker: + # if task broker doesn't match self, something is probably wrong + logger.warning( + f"Broker for {task_name} ({task.broker}) doesn't " + f"match scheduler's broker ({self.broker})" + ) continue for schedule in task.labels.get("schedule", []): if "cron" not in schedule and "time" not in schedule: @@ -61,7 +66,14 @@ def post_send(self, scheduled_task: ScheduledTask) -> None: return # it's scheduled task with cron label, do not remove this trigger. for task_name, task in self.broker.get_all_tasks().items(): - if task.broker != self.broker or scheduled_task.task_name != task_name: + if task.broker != self.broker: + # if task broker doesn't match self, something is probably wrong + logger.warning( + f"Broker for {task_name} ({task.broker}) doesn't " + f"match scheduler's broker ({self.broker})" + ) + continue + if scheduled_task.task_name != task_name: continue schedule_list = task.labels.get("schedule", []).copy() diff --git a/tests/cli/test_utils.py b/tests/cli/test_utils.py index bf8651d1..0ee165dd 100644 --- a/tests/cli/test_utils.py +++ b/tests/cli/test_utils.py @@ -48,11 +48,11 @@ def test_import_tasks_no_discover() -> None: def test_import_tasks_non_py_list_pattern() -> None: modules = ["taskiq.tasks"] with patch("taskiq.cli.utils.import_from_modules", autospec=True) as mock: - pathes = ( + paths = ( Path("tests/test1.so"), Path("tests/cli/test2.cpython-313-darwin.so"), ) - for path in pathes: + for path in paths: path.touch() try: @@ -66,6 +66,6 @@ def test_import_tasks_non_py_list_pattern() -> None: } mock.assert_called_with(modules) finally: - for path in pathes: + for path in paths: with suppress(FileNotFoundError): path.unlink() diff --git a/tests/receiver/test_receiver.py b/tests/receiver/test_receiver.py index 5a65efff..57637e95 100644 --- a/tests/receiver/test_receiver.py +++ b/tests/receiver/test_receiver.py @@ -41,7 +41,7 @@ def get_receiver( @pytest.mark.anyio -async def test_run_task_successfull_async() -> None: +async def test_run_task_successful_async() -> None: """Tests that run_task can run async tasks.""" async def test_func(param: int) -> int: @@ -332,7 +332,7 @@ async def ack_callback() -> None: @pytest.mark.anyio async def test_callback_wrong_format() -> None: - """Test that wrong format of a message won't thow an error.""" + """Test that wrong format of a message won't throw an error.""" receiver = get_receiver() await receiver.callback( diff --git a/tests/scheduler/test_scheduled_task.py b/tests/scheduler/test_scheduled_task.py index 60cd8bbd..00d442e9 100644 --- a/tests/scheduler/test_scheduled_task.py +++ b/tests/scheduler/test_scheduled_task.py @@ -3,7 +3,7 @@ from taskiq.scheduler.scheduled_task import ScheduledTask -def test_scheduled_task_paramters() -> None: +def test_scheduled_task_parameters() -> None: with pytest.raises(ValueError): ScheduledTask( task_name="a",