|
1 |
| -# taskiq_faststream |
| 1 | +# Taskiq - FastStream |
| 2 | + |
| 3 | +<p align="center"> |
| 4 | + <a href="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml" target="_blank"> |
| 5 | + <img src="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml/badge.svg" alt="Tests status"/> |
| 6 | + </a> |
| 7 | + <a href="https://pypi.org/project/taskiq-faststream/" target="_blank"> |
| 8 | + <img src="https://img.shields.io/pypi/v/taskiq-faststream?label=pypi%20package" alt="Package version"> |
| 9 | + </a> |
| 10 | + <a href="https://pepy.tech/project/taskiq-faststream" target="_blank"> |
| 11 | + <img src="https://static.pepy.tech/personalized-badge/taskiq-faststream?period=month&units=international_system&left_color=grey&right_color=blue" alt="downloads"/> |
| 12 | + </a> |
| 13 | + <br/> |
| 14 | + <a href="https://pypi.org/project/taskiq-faststream" target="_blank"> |
| 15 | + <img src="https://img.shields.io/pypi/pyversions/taskiq-faststream.svg" alt="Supported Python versions"> |
| 16 | + </a> |
| 17 | + <a href="https://github.com/taskiq-python/taskiq-faststream/blob/master/LICENSE" target="_blank"> |
| 18 | + <img alt="GitHub" src="https://img.shields.io/github/license/taskiq-python/taskiq-faststream?color=%23007ec6"> |
| 19 | + </a> |
| 20 | +</p> |
| 21 | + |
| 22 | +--- |
| 23 | + |
| 24 | +The current package is just a wrapper for [**FastStream**](https://faststream.airt.ai/0.2/?utm_source=github&utm_medium=acquisition&utm_campaign=measure) objects to make them compatible with [**Taskiq**](https://taskiq-python.github.io/) library. |
| 25 | + |
| 26 | +The main goal of it - provide **FastStream** with a great **Taskiq** tasks [scheduling](https://taskiq-python.github.io/guide/scheduling-tasks.html) feature. |
| 27 | + |
| 28 | +## Installation |
| 29 | + |
| 30 | +If you already have **FastStream** project to interact with your Message Broker, you can add scheduling to it by installing just a **taskiq-faststream** |
| 31 | + |
| 32 | +```bash |
| 33 | +pip install taskiq-faststream |
| 34 | +``` |
| 35 | + |
| 36 | +If you starting with a clear project, you can specify **taskiq-faststream** broker by the following distributions: |
| 37 | + |
| 38 | +```bash |
| 39 | +pip install taskiq-faststream[rabbit] |
| 40 | +# or |
| 41 | +pip install taskiq-faststream[kafka] |
| 42 | +# or |
| 43 | +pip install taskiq-faststream[nats] |
| 44 | +``` |
| 45 | + |
| 46 | +## Usage |
| 47 | + |
| 48 | +The package gives you two classes: `AppWrapper` and `BrokerWrapper` |
| 49 | + |
| 50 | +These are just containers for the related **FastStream** objects to make them **taskiq**-compatible |
| 51 | + |
| 52 | +To create scheduling tasks for your broker, just wrap it to `BrokerWrapper` and use it like a regular **taskiq** Broker. |
| 53 | + |
| 54 | +```python |
| 55 | +# regular FastStream code |
| 56 | +from faststream.nats import NatsBroker |
| 57 | + |
| 58 | +broker = NatsBroker() |
| 59 | + |
| 60 | +@broker.subscriber("test-subject") |
| 61 | +async def handler(msg: str): |
| 62 | + print(msg) |
| 63 | + |
| 64 | +# taskiq-faststream scheduling |
| 65 | +from taskiq import TaskiqScheduler |
| 66 | +from taskiq.schedule_sources import LabelScheduleSource |
| 67 | +from taskiq_faststream import BrokerWrapper |
| 68 | + |
| 69 | +# wrap FastStream object |
| 70 | +taskiq_broker = BrokerWrapper(broker) |
| 71 | + |
| 72 | +# create periodic task |
| 73 | +taskiq_broker.task( |
| 74 | + message="Hi!", |
| 75 | + subject="test-subject" |
| 76 | + schedule=[{ |
| 77 | + "cron": "* * * * *", |
| 78 | + }], |
| 79 | +) |
| 80 | + |
| 81 | +# create scheduler object |
| 82 | +scheduler=TaskiqScheduler( |
| 83 | + broker=taskiq_broker, |
| 84 | + sources=[LabelScheduleSource(taskiq_broker)], |
| 85 | +) |
| 86 | +``` |
| 87 | + |
| 88 | +Also, you can wrap your **FastStream** application the same way (allows to use lifespan events and AsyncAPI documentation): |
| 89 | + |
| 90 | +```python |
| 91 | +# regular FastStream code |
| 92 | +from faststream import FastStream |
| 93 | +from faststream.nats import NatsBroker |
| 94 | + |
| 95 | +broker = NatsBroker() |
| 96 | +app = FastStream(broker) |
| 97 | + |
| 98 | +@broker.subscriber("test-subject") |
| 99 | +async def handler(msg: str): |
| 100 | + print(msg) |
| 101 | + |
| 102 | +# wrap FastStream object |
| 103 | +from taskiq_faststream import AppWrapper |
| 104 | +taskiq_broker = AppWrapper(app) |
| 105 | + |
| 106 | +# Code below omitted 👇 |
| 107 | +``` |
| 108 | + |
| 109 | +Also, instead of using a final `message` argument, you can set a message callback to collect information right before sending: |
| 110 | + |
| 111 | +```python |
| 112 | +async def collect_information_to_send(): |
| 113 | + return "Message to send" |
| 114 | + |
| 115 | +taskiq_broker.task( |
| 116 | + message=collect_information_to_send, |
| 117 | + ..., |
| 118 | +) |
| 119 | +``` |
0 commit comments