22
33
44import time
5+ from dataclasses import dataclass
6+ from typing import Optional
57
68from rabbitmq_amqp_python_client import (
79 AddressHelper ,
810 AMQPMessagingHandler ,
911 BindingSpecification ,
1012 Connection ,
1113 ConnectionClosed ,
14+ Consumer ,
1215 Event ,
1316 ExchangeSpecification ,
17+ Management ,
1418 Message ,
19+ Publisher ,
1520 QuorumQueueSpecification ,
1621)
1722
18- connection = None
19- management = None
20- publisher = None
21- consumer = None
23+
24+ # here we keep track of the objects we need to reconnect
25+ @dataclass
26+ class ConnectionConfiguration :
27+ connection : Optional [Connection ] = None
28+ management : Optional [Management ] = None
29+ publisher : Optional [Publisher ] = None
30+ consumer : Optional [Consumer ] = None
31+
32+
33+ connection_configuration = ConnectionConfiguration ()
34+ messages_to_publish = 50000
2235
2336
2437# disconnection callback
@@ -30,22 +43,27 @@ def on_disconnection():
3043 queue_name = "example-queue"
3144 routing_key = "routing-key"
3245
33- global connection
34- global management
35- global publisher
36- global consumer
46+ global connection_configuration
3747
3848 addr = AddressHelper .exchange_address (exchange_name , routing_key )
3949 addr_queue = AddressHelper .queue_address (queue_name )
4050
41- if connection is not None :
42- connection = create_connection ()
43- if management is not None :
44- management = connection .management ()
45- if publisher is not None :
46- publisher = connection .publisher (addr )
47- if consumer is not None :
48- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
51+ if connection_configuration .connection is not None :
52+ connection_configuration .connection = create_connection ()
53+ if connection_configuration .management is not None :
54+ connection_configuration .management = (
55+ connection_configuration .connection .management ()
56+ )
57+ if connection_configuration .publisher is not None :
58+ connection_configuration .publisher = (
59+ connection_configuration .connection .publisher (addr )
60+ )
61+ if connection_configuration .consumer is not None :
62+ connection_configuration .consumer = (
63+ connection_configuration .connection .consumer (
64+ addr_queue , handler = MyMessageHandler ()
65+ )
66+ )
4967
5068
5169class MyMessageHandler (AMQPMessagingHandler ):
@@ -55,7 +73,8 @@ def __init__(self):
5573 self ._count = 0
5674
5775 def on_message (self , event : Event ):
58- print ("received message: " + str (event .message .annotations ))
76+ if self ._count % 1000 == 0 :
77+ print ("received 100 message: " + str (event .message .body ))
5978
6079 # accepting
6180 self .delivery_context .accept (event )
@@ -74,11 +93,9 @@ def on_message(self, event: Event):
7493 # in case of rejection with annotations added
7594 # self.delivery_context.discard_with_annotations(event)
7695
77- print ("count " + str (self ._count ))
78-
7996 self ._count = self ._count + 1
8097
81- if self ._count == 100 :
98+ if self ._count == messages_to_publish :
8299 print ("closing receiver" )
83100 # if you want you can add cleanup operations here
84101 # event.receiver.close()
@@ -115,30 +132,30 @@ def main() -> None:
115132 exchange_name = "test-exchange"
116133 queue_name = "example-queue"
117134 routing_key = "routing-key"
118- messages_to_publish = 50000
119135
120- global connection
121- global management
122- global publisher
123- global consumer
136+ global connection_configuration
124137
125138 print ("connection to amqp server" )
126- if connection is None :
127- connection = create_connection ()
139+ if connection_configuration . connection is None :
140+ connection_configuration . connection = create_connection ()
128141
129- if management is None :
130- management = connection .management ()
142+ if connection_configuration .management is None :
143+ connection_configuration .management = (
144+ connection_configuration .connection .management ()
145+ )
131146
132147 print ("declaring exchange and queue" )
133- management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
148+ connection_configuration .management .declare_exchange (
149+ ExchangeSpecification (name = exchange_name , arguments = {})
150+ )
134151
135- management .declare_queue (
152+ connection_configuration . management .declare_queue (
136153 QuorumQueueSpecification (name = queue_name )
137154 # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
138155 )
139156
140157 print ("binding queue to exchange" )
141- bind_name = management .bind (
158+ bind_name = connection_configuration . management .bind (
142159 BindingSpecification (
143160 source_exchange = exchange_name ,
144161 destination_queue = queue_name ,
@@ -151,30 +168,34 @@ def main() -> None:
151168 addr_queue = AddressHelper .queue_address (queue_name )
152169
153170 print ("create a publisher and publish a test message" )
154- if publisher is None :
155- publisher = connection .publisher (addr )
171+ if connection_configuration .publisher is None :
172+ connection_configuration .publisher = (
173+ connection_configuration .connection .publisher (addr )
174+ )
156175
157176 print ("purging the queue" )
158- messages_purged = management .purge_queue (queue_name )
177+ messages_purged = connection_configuration . management .purge_queue (queue_name )
159178
160179 print ("messages purged: " + str (messages_purged ))
161180 # management.close()
162181
163- # publish 10 messages
182+ # publishing messages
164183 while True :
165184 for i in range (messages_to_publish ):
166185
167186 if i % 1000 == 0 :
168- print ("publishing " )
187+ print ("published 1000 messages... " )
169188 try :
170- publisher .publish (Message (body = "test" ))
189+ if connection_configuration .publisher is not None :
190+ connection_configuration .publisher .publish (Message (body = "test" ))
171191 except ConnectionClosed :
172192 print ("publisher closing exception, resubmitting" )
173193 continue
174194
175- print ("closing" )
195+ print ("closing publisher " )
176196 try :
177- publisher .close ()
197+ if connection_configuration .publisher is not None :
198+ connection_configuration .publisher .close ()
178199 except ConnectionClosed :
179200 print ("publisher closing exception, resubmitting" )
180201 continue
@@ -183,12 +204,16 @@ def main() -> None:
183204 print (
184205 "create a consumer and consume the test message - press control + c to terminate to consume"
185206 )
186- if consumer is None :
187- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
207+ if connection_configuration .consumer is None :
208+ connection_configuration .consumer = (
209+ connection_configuration .connection .consumer (
210+ addr_queue , handler = MyMessageHandler ()
211+ )
212+ )
188213
189214 while True :
190215 try :
191- consumer .run ()
216+ connection_configuration . consumer .run ()
192217 except KeyboardInterrupt :
193218 pass
194219 except ConnectionClosed :
@@ -200,24 +225,24 @@ def main() -> None:
200225 break
201226
202227 print ("cleanup" )
203- consumer .close ()
228+ connection_configuration . consumer .close ()
204229 # once we finish consuming if we close the connection we need to create a new one
205230 # connection = create_connection()
206231 # management = connection.management()
207232
208233 print ("unbind" )
209- management .unbind (bind_name )
234+ connection_configuration . management .unbind (bind_name )
210235
211236 print ("delete queue" )
212- management .delete_queue (queue_name )
237+ connection_configuration . management .delete_queue (queue_name )
213238
214239 print ("delete exchange" )
215- management .delete_exchange (exchange_name )
240+ connection_configuration . management .delete_exchange (exchange_name )
216241
217242 print ("closing connections" )
218- management .close ()
243+ connection_configuration . management .close ()
219244 print ("after management closing" )
220- connection .close ()
245+ connection_configuration . connection .close ()
221246 print ("after connection closing" )
222247
223248
0 commit comments