Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pip install git+https://github.com/taskiq-python/taskiq

At first you need to create a broker. Broker is an object that can communicate to workers using distributed queues.

We have differet brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance.
We have different brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance.

```python
from taskiq_nats import JetStreamBroker
Expand Down
6 changes: 3 additions & 3 deletions docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ The format of the schedule label is the following:
@broker.task(
schedule=[
{
"cron": "* * * * *", # type: str, either cron or time shoule be specified.
"cron_offset": None # type: str | timedelta | None, can be ommited.
"time": None # type: datetime | None, either cron or time shoule be specified.
"cron": "* * * * *", # type: str, either cron or time should be specified.
"cron_offset": None # type: str | timedelta | None, can be omitted.
"time": None # type: datetime | None, either cron or time should be specified.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
Expand Down
2 changes: 1 addition & 1 deletion docs/framework_integrations/faststream.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ order: 3

# Taskiq + FastStream

[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the differense is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers.
[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the difference is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers.

If you want to declare communication between different projects you can use taskiq, but it might be a bit more complex than using FastStream.

Expand Down
2 changes: 1 addition & 1 deletion docs/framework_integrations/taskiq-with-aiogram.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bot = Bot(token="TOKEN")
# Taskiq calls this function when starting the worker.
@dp.startup()
async def setup_taskiq(bot: Bot, *_args, **_kwargs):
# Here we check if it's a clien-side,
# Here we check if it's a client-side,
# Because otherwise you're going to
# create infinite loop of startup events.
if not broker.is_worker_process:
Expand Down
2 changes: 1 addition & 1 deletion docs/framework_integrations/taskiq-with-fastapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def app_shutdown():

```

And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependenices. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/).
And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependencies. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/).


## Testing
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ The number of signals before a hard kill can be configured with the `--hardkill-
- `--log-level` is used to set a log level (default `INFO`).
* `--max-async-tasks` - maximum number of simultaneously running async tasks.
* `--max-prefetch` - number of tasks to be prefetched before execution. (Useful for systems with high message rates, but brokers should support acknowledgements).
* `--max-threadpool-threads` - number of threads for sync function exection.
* `--max-threadpool-threads` - number of threads for sync function execution.
* `--no-propagate-errors` - if this parameter is enabled, exceptions won't be thrown in generator dependencies.
* `--receiver` - python path to custom receiver class.
* `--receiver_arg` - custom args for receiver.
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Every time we update schedule it gets task from the source and executes this fun
Sometimes, you want to be specific in terms of time zones. We have you covered.
Our `ScheduledTask` model has fields for that. Use these fields or not, it's up to the specific schedule source.

Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convinient for developers.
Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convenient for developers.

For the `time` field of `ScheduledTask` we use timezone information from datetime to check if a task should run.

Expand All @@ -85,7 +85,7 @@ an offset of the cron task. An offset can be a string like `Europe/Berlin` or an

By default, when you start the scheduler it will get all tasks from the schedule source and check whether they should have been executed in this minute. If tasks should have been executed, they will be executed.

This behaviour might be not convinient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago.
This behaviour might be not convenient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago.

To avoid this behaviour, you can pass the `--skip-first-run` flag to the `taskiq scheduler` command. In this case, the scheduler will wait until the start of the next minute and then start executing tasks.

Expand Down Expand Up @@ -163,7 +163,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi
await schedule.unschedule()
```

Or it can be done manually, by calling `delete_schedule` on schedule source providing it whith `schedule_id`.
Or it can be done manually, by calling `delete_schedule` on schedule source providing it with `schedule_id`.

```python
await redis_source.delete_schedule(schedule.schedule_id)
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/testing-taskiq.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def modify_path(some_path: Path = TaskiqDepends()):

:::

To test the task itself, it's not different to the example without dependencies, but we jsut need to pass all
To test the task itself, it's not different to the example without dependencies, but we just need to pass all
expected dependencies manually as function's arguments or key-word arguments.

```python
Expand Down
10 changes: 9 additions & 1 deletion taskiq/abc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from taskiq.acks import AckableMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskRejectedError
from taskiq.formatters.proxy_formatter import ProxyFormatter
from taskiq.message import BrokerMessage
from taskiq.result_backends.dummy import DummyResultBackend
Expand Down Expand Up @@ -512,12 +513,19 @@ def _register_task(
task: AsyncTaskiqDecoratedTask[Any, Any],
) -> None:
"""
Mehtod is used to register tasks.
Method is used to register tasks.

By default we register tasks in local task registry.
But this behaviour can be changed in subclasses.

This method may raise TaskRejectedError if task has already been
registered to a different broker.

:param task_name: Name of a task.
:param task: Decorated task.
"""
if task.broker != self:
raise TaskRejectedError(
Copy link
Member

@s3rius s3rius Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a syntax error. It's izulu. We use it for errors.

Please define separate error class for this error with

__template__ and broker argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rgr. This is the syntax error:

image

image

f"Task already has a different broker ({task.broker})",
)
self.local_task_registry[task_name] = task
4 changes: 2 additions & 2 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> N

async def is_result_ready(self, task_id: str) -> bool:
"""
Checks wether result is ready.
Checks whether result is ready.

Readiness means that result with this task_id is
present in results dict.
Expand Down Expand Up @@ -87,7 +87,7 @@ async def set_progress(
progress: TaskProgress[Any],
) -> None:
"""
Set progress of task exection.
Set progress of task execution.

:param task_id: task id
:param progress: task execution progress
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def delayed_send(

The main idea is that scheduler gathers
tasks every minute and some of them have
specfic time. To respect the time, we calculate
specific time. To respect the time, we calculate
the delay and send the task after some delay.

:param scheduler: current scheduler.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def dispatch(self, event: FileSystemEvent) -> None:
"""
React to event.

This function checks wether we need to
This function checks whether we need to
react to event and calls callback if we do.

:param event: incoming fs event.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def start_listen(args: WorkerArgs) -> None:
This function starts actual listening process.

It imports broker and all tasks.
Since tasks auto registeres themselves in a broker,
Since tasks auto registers themselves in a broker,
we don't need to do anything else other than importing.


Expand Down
2 changes: 1 addition & 1 deletion taskiq/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def requeue(self) -> None:
"""
Requeue task.

This fuction creates a task with
This function creates a task with
the same message and sends it using
current broker.

Expand Down
2 changes: 1 addition & 1 deletion taskiq/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def gather(
:param tasks: tasks to await.
:param timeout: timeout of awaiting, defaults to -1
:param with_logs: wether you want to fetch logs, defaults to False
:param with_logs: whether you want to fetch logs, defaults to False
:param periodicity: how often to ask for results, defaults to 0.1
:raises TaskiqResultTimeoutError: if timeout is reached.
:return: list of TaskiqResults.
Expand Down
8 changes: 4 additions & 4 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
# that happen while resolving dependencies.
if dep_ctx:
kwargs = await dep_ctx.resolve_kwargs()
# We udpate kwargs with kwargs from network.
# We update kwargs with kwargs from network.
kwargs.update(message.kwargs)
is_coroutine = True
# If the function is a coroutine, we await it.
Expand Down Expand Up @@ -379,15 +379,15 @@ async def prefetcher(
self.sem_prefetch.release()
continue
# We're done, so now we need to check
# wether task has returned an error.
# whether task has returned an error.
message = current_message.result()
current_message = asyncio.create_task(iterator.__anext__()) # type: ignore
fetched_tasks += 1
await queue.put(message)
except (asyncio.CancelledError, StopAsyncIteration):
break
# We don't want to fetch new messages if we are shutting down.
logger.info("Stoping prefetching messages...")
logger.info("Stopping prefetching messages...")
current_message.cancel()
await queue.put(QUEUE_DONE)
self.sem_prefetch.release()
Expand Down Expand Up @@ -461,7 +461,7 @@ def _prepare_task(self, name: str, handler: Callable[..., Any]) -> None:

It's useful for dynamic dependency resolution,
because sometimes the receiver can get
funcion that is defined in runtime. We need
function that is defined in runtime. We need
to be aware of that.

:param name: task name.
Expand Down
2 changes: 1 addition & 1 deletion taskiq/result_backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def get_result(self, task_id: str, with_logs: bool = False) -> Any:
This result doesn't care about passed parameters.

:param task_id: task's id.
:param with_logs: wether to fetch logs.
:param with_logs: whether to fetch logs.
:returns: TaskiqResult.
"""
return TaskiqResult(
Expand Down
13 changes: 12 additions & 1 deletion taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ async def get_schedules(self) -> List["ScheduledTask"]:
schedules = []
for task_name, task in self.broker.get_all_tasks().items():
if task.broker != self.broker:
# if task broker doesn't match self, something is probably wrong
logger.warning(
f"Broker for {task_name} ({task.broker}) doesn't "
f"match scheduler's broker ({self.broker})",
)
continue
for schedule in task.labels.get("schedule", []):
if "cron" not in schedule and "time" not in schedule:
Expand Down Expand Up @@ -61,7 +66,13 @@ def post_send(self, scheduled_task: ScheduledTask) -> None:
return # it's scheduled task with cron label, do not remove this trigger.

for task_name, task in self.broker.get_all_tasks().items():
if task.broker != self.broker or scheduled_task.task_name != task_name:
if task.broker != self.broker:
# if task broker doesn't match self, something is probably wrong
logger.warning(
f"Broker for {task_name} ({task.broker}) doesn't "
f"match scheduler's broker ({self.broker})",
)
if scheduled_task.task_name != task_name:
continue

schedule_list = task.labels.get("schedule", []).copy()
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def test_import_tasks_no_discover() -> None:
def test_import_tasks_non_py_list_pattern() -> None:
modules = ["taskiq.tasks"]
with patch("taskiq.cli.utils.import_from_modules", autospec=True) as mock:
pathes = (
paths = (
Path("tests/test1.so"),
Path("tests/cli/test2.cpython-313-darwin.so"),
)
for path in pathes:
for path in paths:
path.touch()

try:
Expand All @@ -66,6 +66,6 @@ def test_import_tasks_non_py_list_pattern() -> None:
}
mock.assert_called_with(modules)
finally:
for path in pathes:
for path in paths:
with suppress(FileNotFoundError):
path.unlink()
4 changes: 2 additions & 2 deletions tests/receiver/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_receiver(


@pytest.mark.anyio
async def test_run_task_successfull_async() -> None:
async def test_run_task_successful_async() -> None:
"""Tests that run_task can run async tasks."""

async def test_func(param: int) -> int:
Expand Down Expand Up @@ -332,7 +332,7 @@ async def ack_callback() -> None:

@pytest.mark.anyio
async def test_callback_wrong_format() -> None:
"""Test that wrong format of a message won't thow an error."""
"""Test that wrong format of a message won't throw an error."""
receiver = get_receiver()

await receiver.callback(
Expand Down
2 changes: 1 addition & 1 deletion tests/scheduler/test_scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from taskiq.scheduler.scheduled_task import ScheduledTask


def test_scheduled_task_paramters() -> None:
def test_scheduled_task_parameters() -> None:
with pytest.raises(ValueError):
ScheduledTask(
task_name="a",
Expand Down
Loading