Skip to content

Commit 0f5d1db

Browse files
committed
initial commit
0 parents  commit 0f5d1db

File tree

10 files changed

+567
-0
lines changed

10 files changed

+567
-0
lines changed

.gitignore

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
.idea/
2+
.vscode/
3+
4+
__pycache__/
5+
*.py[cod]
6+
7+
*.mo
8+
9+
env/
10+
build/
11+
_build/
12+
dist/
13+
site/
14+
*.egg-info/
15+
*.egg
16+
.ruff_cache
17+
18+
.mypy_cache
19+
.pytest_cache
20+
.coverage
21+
reports
22+
23+
dev/
24+
.venv/

README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# aiocarrot
2+
3+
**aiocarrot** is fully asynchronous framework for working with <a href="https://www.rabbitmq.com/">RabbitMQ</a>
4+
5+
Accelerate development with a message broker at times with **aiocarrot**
6+
7+
## Installation
8+
9+
Create and activate virtual environment and then install **aiocarrot**:
10+
11+
```commandline
12+
pip install aiocarrot
13+
```
14+
15+
## Example
16+
17+
Create a file `main.py` with:
18+
19+
```python
20+
from aiocarrot import Carrot, Consumer
21+
22+
import asyncio
23+
24+
25+
consumer = Consumer()
26+
27+
28+
@consumer.message(name='multiply')
29+
async def multiply(first_number: int, second_number: int) -> None:
30+
print('Result is:', first_number * second_number)
31+
32+
33+
async def main() -> None:
34+
carrot = Carrot(url='amqp://guest:guest@127.0.0.1/', queue_name='sample')
35+
carrot.setup_consumer(consumer)
36+
await carrot.run()
37+
38+
39+
if __name__ == '__main__':
40+
asyncio.run(main())
41+
```
42+
43+
Then run it with:
44+
45+
```commandline
46+
python main.py
47+
```
48+
49+
Now you have created a consumer with the ability to receive a **"multiply"** task
50+
51+
### Produce message
52+
53+
If you want to send a message, use this:
54+
55+
```python
56+
from aiocarrot import Carrot
57+
58+
import asyncio
59+
60+
61+
async def main() -> None:
62+
carrot = Carrot(url='amqp://guest:guest@127.0.0.1:5672/', queue_name='sample')
63+
64+
await carrot.send('multiply', first_number=10, second_number=20)
65+
66+
67+
if __name__ == '__main__':
68+
asyncio.run(main())
69+
```
70+
71+
It's very simple to use. Enjoy!

aiocarrot/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .carrot import Carrot
2+
from .consumer import Consumer

aiocarrot/__meta__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__version__ = '1.0.0'

aiocarrot/carrot.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
from loguru import logger
2+
3+
from typing import Optional, TYPE_CHECKING
4+
5+
import asyncio, aio_pika, ujson, uuid
6+
7+
if TYPE_CHECKING:
8+
from aiormq.abc import ConfirmationFrameType
9+
10+
from .consumer import Consumer
11+
12+
13+
class Carrot:
14+
""" Carrot framework entrypoint class """
15+
16+
_url: str
17+
_queue_name: str
18+
_is_consumer_alive: bool = False
19+
_consumer: Optional['Consumer'] = None
20+
_connection: Optional['aio_pika.abc.AbstractConnection'] = None
21+
_channel: Optional['aio_pika.abc.AbstractChannel'] = None
22+
_queue: Optional['aio_pika.abc.AbstractQueue'] = None
23+
24+
def __init__(self, url: str, queue_name: str) -> None:
25+
"""
26+
aiocarrot is an asynchronous framework for working with the RabbitMQ message broker
27+
28+
:param url: RabbitMQ connection url
29+
:param queue_name: The name of the queue for further work
30+
"""
31+
32+
self._url = url
33+
self._queue_name = queue_name
34+
35+
async def send(self, _cnm: str, **kwargs) -> 'ConfirmationFrameType':
36+
"""
37+
Send message with specified name
38+
39+
:param _cnm: Message name
40+
:param kwargs: Message payload
41+
:return:
42+
"""
43+
44+
channel = await self._get_channel()
45+
46+
message_id = str(uuid.uuid4())
47+
message_body = {
48+
'_cid': message_id,
49+
'_cnm': _cnm,
50+
**kwargs,
51+
}
52+
53+
payload = ujson.dumps(message_body).encode()
54+
55+
return await channel.default_exchange.publish(
56+
message=aio_pika.Message(body=payload),
57+
routing_key=self._queue_name,
58+
)
59+
60+
def setup_consumer(self, consumer: 'Consumer') -> None:
61+
"""
62+
Sets the consumer as the primary one for this Carrot instance
63+
64+
:param consumer: Consumer object
65+
:return:
66+
"""
67+
68+
self._consumer = consumer
69+
70+
async def run(self) -> None:
71+
"""
72+
Start carrot listening
73+
74+
:return:
75+
"""
76+
77+
if not self._consumer:
78+
raise RuntimeError('Consumer is not registered. Please, specify using following method: '
79+
'.setup_consumer(consumer)')
80+
81+
logger.info('Starting aiocarrot with following configuration:')
82+
logger.info('')
83+
logger.info(f'> Queue: {self._queue_name}')
84+
logger.info(f'> Registered messages:')
85+
86+
for message_name in self._consumer._messages.keys():
87+
logger.info(f' * {message_name}')
88+
89+
logger.info('')
90+
logger.info('Starting listener loop...')
91+
92+
try:
93+
await self._consumer_loop()
94+
except KeyboardInterrupt:
95+
pass
96+
except BaseException:
97+
logger.trace('An unhandled error occurred while the consumer was working')
98+
finally:
99+
logger.info('Shutting down...')
100+
101+
async def _consumer_loop(self) -> None:
102+
"""
103+
Consumer primary loop
104+
105+
:return:
106+
"""
107+
108+
if self._is_consumer_alive:
109+
raise RuntimeError('Consumer loop is already running')
110+
111+
if not self._consumer:
112+
raise RuntimeError('Consumer is not registered. Please, specify using following method: '
113+
'.setup_consumer(consumer)')
114+
115+
queue = await self._get_queue()
116+
117+
logger.info('Consumer is successfully connected to queue')
118+
119+
async with queue.iterator() as queue_iterator:
120+
self._is_consumer_alive = True
121+
122+
async for message in queue_iterator:
123+
async with message.process():
124+
decoded_message: str = message.body.decode()
125+
126+
try:
127+
message_payload = ujson.loads(decoded_message)
128+
129+
assert isinstance(message_payload, dict)
130+
except ujson.JSONDecodeError:
131+
logger.trace(f'Error receiving the message (failed to receive JSON): {decoded_message}')
132+
continue
133+
134+
message_id = message_payload.get('_cid')
135+
message_name = message_payload.get('_cnm')
136+
137+
if not message_id:
138+
logger.error(
139+
'The message format could not be determined (identifier is missing): '
140+
f'{message_payload}'
141+
)
142+
143+
continue
144+
145+
if not message_name:
146+
logger.error(
147+
'The message format could not be determined (message name is missing): '
148+
f'{message_payload}'
149+
)
150+
151+
continue
152+
153+
del message_payload['_cid']
154+
del message_payload['_cnm']
155+
156+
asyncio.create_task(self._consumer.on_message(
157+
message_id,
158+
message_name,
159+
**message_payload,
160+
))
161+
162+
async def _get_queue(self) -> 'aio_pika.abc.AbstractQueue':
163+
"""
164+
Get active broker queue
165+
166+
:return: aiopika queue
167+
"""
168+
169+
if not self._queue:
170+
channel = await self._get_channel()
171+
self._queue = await channel.declare_queue(self._queue_name, durable=True, auto_delete=True)
172+
173+
return self._queue
174+
175+
async def _get_channel(self) -> 'aio_pika.abc.AbstractChannel':
176+
"""
177+
Get active broker channel
178+
179+
:return: aiopika channel
180+
"""
181+
182+
if not self._channel:
183+
connection = await self._get_connection()
184+
self._channel = await connection.channel()
185+
186+
return self._channel
187+
188+
async def _get_connection(self) -> 'aio_pika.abc.AbstractConnection':
189+
"""
190+
Get active connection to the broker
191+
192+
:return: aiopika broker connection
193+
"""
194+
195+
if not self._connection:
196+
self._connection = await aio_pika.connect_robust(url=self._url)
197+
198+
return self._connection
199+
200+
201+
__all__ = (
202+
'Carrot',
203+
)

aiocarrot/consumer/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .consumer import Consumer

0 commit comments

Comments
 (0)