11# type: ignore
2- import time
3-
42from rabbitmq_amqp_python_client import (
53 BindingSpecification ,
64 Connection ,
5+ Delivery ,
76 Event ,
87 ExchangeSpecification ,
98 Message ,
109 MessagingHandler ,
1110 QuorumQueueSpecification ,
1211 exchange_address ,
1312 queue_address ,
14- Delivery ,
1513)
1614
1715
18-
1916class MyMessageHandler (MessagingHandler ):
2017
2118 def __init__ (self ):
2219 super ().__init__ (auto_accept = False , auto_settle = False )
20+ self ._count = 0
2321
2422 def on_message (self , event : Event ):
2523 print ("received message: " + event .message .body )
2624
2725 dlv = event .delivery
28- #dlv.update(Delivery.REJECTED)
26+
2927 dlv .update (Delivery .ACCEPTED )
30- # dlv.settle()
28+ dlv .settle ()
3129
30+ print ("count " + str (self ._count ))
3231
33- #self.reject(event.delivery)
34- #self.settle(event.delivery, Delivery.REJECTED)
32+ self ._count = self ._count + 1
3533
34+ if self ._count == 100000 :
35+ print ("closing receiver" )
36+ event .receiver .close ()
37+ event .connection .close ()
3638
3739 def on_connection_closed (self , event : Event ):
3840 print ("connection closed" )
3941
40- def on_connection_cloing (self , event : Event ):
41- print ("connection closed" )
42-
4342 def on_link_closed (self , event : Event ) -> None :
4443 print ("link closed" )
4544
4645 def on_rejected (self , event : Event ) -> None :
4746 print ("rejected" )
4847
4948
50- def main () -> None :
49+ def create_connection () -> Connection :
50+ connection = Connection ("amqp://guest:guest@localhost:5672/" )
51+ connection .dial ()
52+
53+ return connection
54+
5155
56+ def main () -> None :
5257
5358 exchange_name = "test-exchange"
5459 queue_name = "example-queue"
5560 routing_key = "routing-key"
56- connection = Connection ( "amqp://guest:guest@localhost:5672/" )
61+ messages_to_publish = 100000
5762
5863 print ("connection to amqp server" )
59- connection . dial ()
64+ connection = create_connection ()
6065
6166 management = connection .management ()
6267
6368 print ("declaring exchange and queue" )
6469 management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
6570
66- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
71+ management .declare_queue (
72+ QuorumQueueSpecification (
73+ name = queue_name , dead_letter_exchange = "dead-letter-test"
74+ )
75+ )
6776
6877 print ("binding queue to exchange" )
6978 bind_name = management .bind (
@@ -81,34 +90,48 @@ def main() -> None:
8190 print ("create a publisher and publish a test message" )
8291 publisher = connection .publisher (addr )
8392
84-
8593 print ("purging the queue" )
86- # messages_purged = management.purge_queue(queue_name)
94+ messages_purged = management .purge_queue (queue_name )
8795
88- #print("messages purged: " + str(messages_purged))
96+ print ("messages purged: " + str (messages_purged ))
97+ management .close ()
8998
90- # for i in range(1):
91- publisher .publish (Message (body = "test" ))
99+ # publish 10 messages
100+ for i in range (messages_to_publish ):
101+ publisher .publish (Message (body = "test" ))
92102
93103 publisher .close ()
94104
95- print ("create a consumer and consume the test message" )
96-
105+ print (
106+ "create a consumer and consume the test message - press control + c to terminate to consume"
107+ )
97108 consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
98109
110+ try :
111+ consumer .run ()
112+ except KeyboardInterrupt :
113+ pass
114+
115+ print ("cleanup" )
116+ # once we finish consuming we close the connection so we need to create a new one
117+ connection = create_connection ()
118+
119+ management = connection .management ()
99120 print ("unbind" )
100121 management .unbind (bind_name )
101122
102- #consumer.close()
103123 print ("delete queue" )
104- # management.delete_queue(queue_name)
124+ management .delete_queue (queue_name )
105125
106126 print ("delete exchange" )
107127 management .delete_exchange (exchange_name )
108128
109129 print ("closing connections" )
110130 management .close ()
131+ consumer .close ()
132+ print ("after management closing" )
111133 connection .close ()
134+ print ("after connection closing" )
112135
113136
114137if __name__ == "__main__" :
0 commit comments