22
33from loguru import logger
44
5+ from .scheduler import Scheduler
6+
57from typing import Optional , TYPE_CHECKING
68
7- import asyncio , aio_pika , ujson , uuid , copy , signal
9+ import asyncio , aio_pika , ujson , uuid , copy , signal , aiormq
810
911if TYPE_CHECKING :
1012 from aiormq .abc import ConfirmationFrameType
@@ -23,6 +25,7 @@ class Carrot:
2325 _connection : Optional ['aio_pika.abc.AbstractConnection' ] = None
2426 _channel : Optional ['aio_pika.abc.AbstractChannel' ] = None
2527 _queue : Optional ['aio_pika.abc.AbstractQueue' ] = None
28+ _scheduler : Optional ['Scheduler' ] = None
2629
2730 def __init__ (self , url : str , queue_name : str ) -> None :
2831 """
@@ -35,6 +38,7 @@ def __init__(self, url: str, queue_name: str) -> None:
3538 self ._url = url
3639 self ._tasks = []
3740 self ._queue_name = queue_name
41+ self ._scheduler = Scheduler (carrot = self )
3842
3943 async def send (self , _cnm : str , ** kwargs ) -> 'ConfirmationFrameType' :
4044 """
@@ -76,6 +80,19 @@ def setup_consumer(self, consumer: 'Consumer') -> None:
7680
7781 self ._consumer = consumer
7882
83+ self ._scheduler .clear ()
84+ self ._scheduler .stop ()
85+
86+ for _ , message in self ._consumer ._messages .items ():
87+ if not message .schedule :
88+ continue
89+
90+ self ._scheduler .add_task (message )
91+
92+ if self ._is_consumer_alive and self ._scheduler .has_tasks :
93+ scheduler_task = asyncio .create_task (self ._scheduler .reload ())
94+ self ._tasks .append (scheduler_task )
95+
7996 async def run (self ) -> None :
8097 """
8198 Starts the main loop of the Carrot new message listener
@@ -99,6 +116,10 @@ async def run(self) -> None:
99116 logger .info (f' * { message_name } ' )
100117
101118 logger .info ('' )
119+
120+ if self ._scheduler .has_tasks :
121+ asyncio .create_task (self ._scheduler .start ())
122+
102123 logger .info ('Starting listener loop...' )
103124
104125 signal .signal (signal .SIGINT , self ._exit_signal_handler )
@@ -131,6 +152,7 @@ async def shutdown(self, silent: bool = False) -> None:
131152 :return:
132153 """
133154
155+ self ._scheduler .stop ()
134156 pending_tasks = [x for x in self ._tasks if not x .done ()]
135157
136158 if len (pending_tasks ) > 0 :
@@ -163,51 +185,62 @@ async def _consumer_loop(self) -> None:
163185 logger .info ('Consumer is successfully connected to queue' )
164186
165187 async with queue .iterator () as queue_iterator :
166- async for message in queue_iterator :
167- for task in copy .copy (self ._tasks ):
168- if task .done ():
169- self ._tasks .remove (task )
188+ if not self ._is_consumer_alive :
189+ return
190+
191+ try :
192+ await self ._iterate_queue (queue_iterator )
193+ except aiormq .ChannelClosed :
194+ return
195+
196+ async def _iterate_queue (self , queue_iterator : 'aio_pika.abc.AbstractQueueIterator' ) -> None :
197+ """ Iterates over the queue iterator and passes the message on to the handler """
170198
171- async with message .process ():
172- decoded_message : str = message .body .decode ()
199+ async for message in queue_iterator :
200+ for task in copy .copy (self ._tasks ):
201+ if task .done ():
202+ self ._tasks .remove (task )
173203
174- try :
175- message_payload = ujson . loads ( decoded_message )
204+ async with message . process () :
205+ decoded_message : str = message . body . decode ( )
176206
177- assert isinstance (message_payload , dict )
178- except ujson .JSONDecodeError :
179- logger .error (f'Error receiving the message (failed to receive JSON): { decoded_message } ' )
180- continue
207+ try :
208+ message_payload = ujson .loads (decoded_message )
181209
182- message_id = message_payload .get ('_cid' )
183- message_name = message_payload .get ('_cnm' )
210+ assert isinstance (message_payload , dict )
211+ except ujson .JSONDecodeError :
212+ logger .error (f'Error receiving the message (failed to receive JSON): { decoded_message } ' )
213+ continue
184214
185- if not message_id :
186- logger .error (
187- 'The message format could not be determined (identifier is missing): '
188- f'{ message_payload } '
189- )
215+ message_id = message_payload .get ('_cid' )
216+ message_name = message_payload .get ('_cnm' )
190217
191- continue
218+ if not message_id :
219+ logger .error (
220+ 'The message format could not be determined (identifier is missing): '
221+ f'{ message_payload } '
222+ )
223+
224+ continue
192225
193- if not message_name :
194- logger .error (
195- 'The message format could not be determined (message name is missing): '
196- f'{ message_payload } '
197- )
226+ if not message_name :
227+ logger .error (
228+ 'The message format could not be determined (message name is missing): '
229+ f'{ message_payload } '
230+ )
198231
199- continue
232+ continue
200233
201- del message_payload ['_cid' ]
202- del message_payload ['_cnm' ]
234+ del message_payload ['_cid' ]
235+ del message_payload ['_cnm' ]
203236
204- task = asyncio .create_task (self ._consumer .on_message (
205- message_id ,
206- message_name ,
207- ** message_payload ,
208- ))
237+ task = asyncio .create_task (self ._consumer .on_message (
238+ message_id ,
239+ message_name ,
240+ ** message_payload ,
241+ ))
209242
210- self ._tasks .append (task )
243+ self ._tasks .append (task )
211244
212245 async def _get_queue (self ) -> 'aio_pika.abc.AbstractQueue' :
213246 """
0 commit comments