Skip to content

Commit c4c8171

Browse files
Fix Typos, Warn on Unmatching Brokers (#423)
* fix typos * warn if task broker doesn't match self * oopsie * Update taskiq/schedule_sources/label_based.py Co-authored-by: Pavel Kirilin <[email protected]> * Update taskiq/schedule_sources/label_based.py Co-authored-by: Pavel Kirilin <[email protected]> * raise error on broker mismatch * ruffed * create new error class --------- Co-authored-by: Pavel Kirilin <[email protected]>
1 parent b54e2e0 commit c4c8171

File tree

22 files changed

+55
-31
lines changed

22 files changed

+55
-31
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pip install git+https://github.com/taskiq-python/taskiq
3535

3636
At first you need to create a broker. Broker is an object that can communicate to workers using distributed queues.
3737

38-
We have differet brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance.
38+
We have different brokers for different queue backends. For example, we have a broker for [NATS](https://github.com/taskiq-python/taskiq-nats), [Redis](https://github.com/taskiq-python/taskiq-redis), [RabbitMQ](https://github.com/taskiq-python/taskiq-aio-pika), [Kafka](https://github.com/taskiq-python/taskiq-aio-kafka) and even more. Choose the one that fits you and create an instance.
3939

4040
```python
4141
from taskiq_nats import JetStreamBroker

docs/available-components/schedule-sources.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ The format of the schedule label is the following:
3434
@broker.task(
3535
schedule=[
3636
{
37-
"cron": "* * * * *", # type: str, either cron or time shoule be specified.
38-
"cron_offset": None # type: str | timedelta | None, can be ommited.
39-
"time": None # type: datetime | None, either cron or time shoule be specified.
37+
"cron": "* * * * *", # type: str, either cron or time should be specified.
38+
"cron_offset": None # type: str | timedelta | None, can be omitted.
39+
"time": None # type: datetime | None, either cron or time should be specified.
4040
"args": [], # type List[Any] | None, can be omitted.
4141
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
4242
"labels": {}, # type: Dict[str, Any] | None, can be omitted.

docs/framework_integrations/faststream.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ order: 3
44

55
# Taskiq + FastStream
66

7-
[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the differense is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers.
7+
[FastStream](https://faststream.airt.ai/latest/) is a library that allows you to write consumers and producers for different message brokers almost like taskiq. But the difference is that taskiq is more focused on tasks for a specific project and more like celery but async, while FastStream is more focused on events and defining how different systems communicate with each other using distributed brokers.
88

99
If you want to declare communication between different projects you can use taskiq, but it might be a bit more complex than using FastStream.
1010

docs/framework_integrations/taskiq-with-aiogram.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ bot = Bot(token="TOKEN")
6666
# Taskiq calls this function when starting the worker.
6767
@dp.startup()
6868
async def setup_taskiq(bot: Bot, *_args, **_kwargs):
69-
# Here we check if it's a clien-side,
69+
# Here we check if it's a client-side,
7070
# Because otherwise you're going to
7171
# create infinite loop of startup events.
7272
if not broker.is_worker_process:

docs/framework_integrations/taskiq-with-fastapi.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ async def app_shutdown():
101101

102102
```
103103

104-
And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependenices. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/).
104+
And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependencies. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/).
105105

106106

107107
## Testing

docs/guide/cli.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ The number of signals before a hard kill can be configured with the `--hardkill-
112112
- `--log-level` is used to set a log level (default `INFO`).
113113
* `--max-async-tasks` - maximum number of simultaneously running async tasks.
114114
* `--max-prefetch` - number of tasks to be prefetched before execution. (Useful for systems with high message rates, but brokers should support acknowledgements).
115-
* `--max-threadpool-threads` - number of threads for sync function exection.
115+
* `--max-threadpool-threads` - number of threads for sync function execution.
116116
* `--no-propagate-errors` - if this parameter is enabled, exceptions won't be thrown in generator dependencies.
117117
* `--receiver` - python path to custom receiver class.
118118
* `--receiver_arg` - custom args for receiver.

docs/guide/scheduling-tasks.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Every time we update schedule it gets task from the source and executes this fun
7474
Sometimes, you want to be specific in terms of time zones. We have you covered.
7575
Our `ScheduledTask` model has fields for that. Use these fields or not, it's up to the specific schedule source.
7676

77-
Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convinient for developers.
77+
Taskiq scheduler assumes that if time has no specific timezone, it's in [UTC](https://www.wikiwand.com/en/Coordinated_Universal_Time). Sometimes, this behavior might not be convenient for developers.
7878

7979
For the `time` field of `ScheduledTask` we use timezone information from datetime to check if a task should run.
8080

@@ -85,7 +85,7 @@ an offset of the cron task. An offset can be a string like `Europe/Berlin` or an
8585

8686
By default, when you start the scheduler it will get all tasks from the schedule source and check whether they should have been executed in this minute. If tasks should have been executed, they will be executed.
8787

88-
This behaviour might be not convinient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago.
88+
This behaviour might be not convenient for some developers. For example, if you have a task that should be executed on every minute, it will be executed once you start the scheduler, even if it was executed a few seconds ago.
8989

9090
To avoid this behaviour, you can pass the `--skip-first-run` flag to the `taskiq scheduler` command. In this case, the scheduler will wait until the start of the next minute and then start executing tasks.
9191

@@ -163,7 +163,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi
163163
await schedule.unschedule()
164164
```
165165

166-
Or it can be done manually, by calling `delete_schedule` on schedule source providing it whith `schedule_id`.
166+
Or it can be done manually, by calling `delete_schedule` on schedule source providing it with `schedule_id`.
167167

168168
```python
169169
await redis_source.delete_schedule(schedule.schedule_id)

docs/guide/testing-taskiq.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ async def modify_path(some_path: Path = TaskiqDepends()):
229229

230230
:::
231231

232-
To test the task itself, it's not different to the example without dependencies, but we jsut need to pass all
232+
To test the task itself, it's not different to the example without dependencies, but we just need to pass all
233233
expected dependencies manually as function's arguments or key-word arguments.
234234

235235
```python

taskiq/abc/broker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from taskiq.acks import AckableMessage
3030
from taskiq.decor import AsyncTaskiqDecoratedTask
3131
from taskiq.events import TaskiqEvents
32+
from taskiq.exceptions import TaskBrokerMismatchError
3233
from taskiq.formatters.proxy_formatter import ProxyFormatter
3334
from taskiq.message import BrokerMessage
3435
from taskiq.result_backends.dummy import DummyResultBackend
@@ -512,12 +513,17 @@ def _register_task(
512513
task: AsyncTaskiqDecoratedTask[Any, Any],
513514
) -> None:
514515
"""
515-
Mehtod is used to register tasks.
516+
Method is used to register tasks.
516517
517518
By default we register tasks in local task registry.
518519
But this behaviour can be changed in subclasses.
519520
521+
This method may raise TaskBrokerMismatchError if task has already been
522+
registered to a different broker.
523+
520524
:param task_name: Name of a task.
521525
:param task: Decorated task.
522526
"""
527+
if task.broker != self:
528+
raise TaskBrokerMismatchError(broker=task.broker)
523529
self.local_task_registry[task_name] = task

taskiq/brokers/inmemory_broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> N
5050

5151
async def is_result_ready(self, task_id: str) -> bool:
5252
"""
53-
Checks wether result is ready.
53+
Checks whether result is ready.
5454
5555
Readiness means that result with this task_id is
5656
present in results dict.

0 commit comments

Comments
 (0)