1+ import asyncio
2+
3+ from rabbitmq_amqp_python_client import (
4+ AddressHelper ,
5+ AMQPMessagingHandler ,
6+ AsyncEnvironment ,
7+ Converter ,
8+ Event ,
9+ ExchangeSpecification ,
10+ ExchangeToQueueBindingSpecification ,
11+ Message ,
12+ OutcomeState ,
13+ QuorumQueueSpecification ,
14+ )
15+
16+ MESSAGES_TO_PUBLISH = 100
17+
18+
19+ class StopConsumerException (Exception ):
20+ """Exception to signal consumer should stop"""
21+ pass
22+
23+
24+ class MyMessageHandler (AMQPMessagingHandler ):
25+
26+ def __init__ (self ):
27+ super ().__init__ ()
28+ self ._count = 0
29+
30+ def on_amqp_message (self , event : Event ):
31+ print (
32+ "received message: {} " .format (
33+ Converter .bytes_to_string (event .message .body )
34+ )
35+ )
36+
37+ self .delivery_context .accept (event )
38+ self ._count = self ._count + 1
39+ print ("count " + str (self ._count ))
40+
41+ if self ._count == MESSAGES_TO_PUBLISH :
42+ print ("received all messages" )
43+ # Stop the consumer by raising an exception
44+ raise StopConsumerException ("All messages consumed" )
45+
46+ def on_connection_closed (self , event : Event ):
47+ print ("connection closed" )
48+
49+ def on_link_closed (self , event : Event ) -> None :
50+ print ("link closed" )
51+
52+
53+ async def main ():
54+ exchange_name = "test-exchange"
55+ queue_name = "example-queue"
56+ routing_key = "routing-key"
57+
58+ print ("connection to amqp server" )
59+ async with AsyncEnvironment (
60+ uri = "amqp://guest:guest@localhost:5672/"
61+ ) as environment :
62+ async with await environment .connection () as connection :
63+ async with await connection .management () as management :
64+ print ("declaring exchange and queue" )
65+ await management .declare_exchange (ExchangeSpecification (name = exchange_name ))
66+ await management .declare_queue (
67+ QuorumQueueSpecification (name = queue_name )
68+ )
69+
70+ print ("binding queue to exchange" )
71+ bind_name = await management .bind (
72+ ExchangeToQueueBindingSpecification (
73+ source_exchange = exchange_name ,
74+ destination_queue = queue_name ,
75+ binding_key = routing_key ,
76+ )
77+ )
78+
79+ addr = AddressHelper .exchange_address (exchange_name , routing_key )
80+ addr_queue = AddressHelper .queue_address (queue_name )
81+
82+ print ("create a publisher and publish a test message" )
83+ async with await connection .publisher (addr ) as publisher :
84+ print ("purging the queue" )
85+ messages_purged = await management .purge_queue (queue_name )
86+ print ("messages purged: " + str (messages_purged ))
87+
88+ # publish messages
89+ for i in range (MESSAGES_TO_PUBLISH ):
90+ status = await publisher .publish (
91+ Message (body = Converter .string_to_bytes ("test message {} " .format (i )))
92+ )
93+ if status .remote_state == OutcomeState .ACCEPTED :
94+ print ("message accepted" )
95+
96+ print ("create a consumer and consume the test message - press control + c to terminate to consume" )
97+ handler = MyMessageHandler ()
98+ async with await connection .consumer (addr_queue , message_handler = handler ) as consumer :
99+ # Run the consumer in a background task
100+ consumer_task = asyncio .create_task (consumer .run ())
101+
102+ try :
103+ # Wait for the consumer to finish (e.g., by raising the exception)
104+ await consumer_task
105+ except StopConsumerException as e :
106+ print (f"Consumer stopped: { e } " )
107+ except KeyboardInterrupt :
108+ print ("consumption interrupted by user, stopping consumer..." )
109+ await consumer .stop ()
110+
111+ print ("unbind" )
112+ await management .unbind (bind_name )
113+
114+ print ("delete queue" )
115+ await management .delete_queue (queue_name )
116+
117+ print ("delete exchange" )
118+ await management .delete_exchange (exchange_name )
119+
120+ if __name__ == "__main__" :
121+ asyncio .run (main ())
0 commit comments