1313 ExchangeSpecification ,
1414 Message ,
1515 QuorumQueueSpecification ,
16+ Management ,
17+ Publisher ,
18+ Consumer ,
1619)
1720
18- connection = None
19- management = None
20- publisher = None
21- consumer = None
21+ from typing import Optional
22+ from dataclasses import dataclass
23+
24+
25+ # here we keep track of the objects we need to reconnect
26+ @dataclass
27+ class ConnectionConfiguration :
28+ connection : Optional [Connection ] = None
29+ management : Optional [Management ] = None
30+ publisher : Optional [Publisher ] = None
31+ consumer : Optional [Consumer ] = None
32+
33+
34+ connection_configuration = ConnectionConfiguration ()
35+ messages_to_publish = 50000
2236
2337
2438# disconnection callback
@@ -30,22 +44,27 @@ def on_disconnection():
3044 queue_name = "example-queue"
3145 routing_key = "routing-key"
3246
33- global connection
34- global management
35- global publisher
36- global consumer
47+ global connection_configuration
3748
3849 addr = AddressHelper .exchange_address (exchange_name , routing_key )
3950 addr_queue = AddressHelper .queue_address (queue_name )
4051
41- if connection is not None :
42- connection = create_connection ()
43- if management is not None :
44- management = connection .management ()
45- if publisher is not None :
46- publisher = connection .publisher (addr )
47- if consumer is not None :
48- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
52+ if connection_configuration .connection is not None :
53+ connection_configuration .connection = create_connection ()
54+ if connection_configuration .management is not None :
55+ connection_configuration .management = (
56+ connection_configuration .connection .management ()
57+ )
58+ if connection_configuration .publisher is not None :
59+ connection_configuration .publisher = (
60+ connection_configuration .connection .publisher (addr )
61+ )
62+ if connection_configuration .consumer is not None :
63+ connection_configuration .consumer = (
64+ connection_configuration .connection .consumer (
65+ addr_queue , handler = MyMessageHandler ()
66+ )
67+ )
4968
5069
5170class MyMessageHandler (AMQPMessagingHandler ):
@@ -55,7 +74,8 @@ def __init__(self):
5574 self ._count = 0
5675
5776 def on_message (self , event : Event ):
58- print ("received message: " + str (event .message .annotations ))
77+ if self ._count % 1000 == 0 :
78+ print ("received 100 message: " + str (event .message .body ))
5979
6080 # accepting
6181 self .delivery_context .accept (event )
@@ -74,11 +94,9 @@ def on_message(self, event: Event):
7494 # in case of rejection with annotations added
7595 # self.delivery_context.discard_with_annotations(event)
7696
77- print ("count " + str (self ._count ))
78-
7997 self ._count = self ._count + 1
8098
81- if self ._count == 100 :
99+ if self ._count == messages_to_publish :
82100 print ("closing receiver" )
83101 # if you want you can add cleanup operations here
84102 # event.receiver.close()
@@ -115,30 +133,30 @@ def main() -> None:
115133 exchange_name = "test-exchange"
116134 queue_name = "example-queue"
117135 routing_key = "routing-key"
118- messages_to_publish = 50000
119136
120- global connection
121- global management
122- global publisher
123- global consumer
137+ global connection_configuration
124138
125139 print ("connection to amqp server" )
126- if connection is None :
127- connection = create_connection ()
140+ if connection_configuration . connection is None :
141+ connection_configuration . connection = create_connection ()
128142
129- if management is None :
130- management = connection .management ()
143+ if connection_configuration .management is None :
144+ connection_configuration .management = (
145+ connection_configuration .connection .management ()
146+ )
131147
132148 print ("declaring exchange and queue" )
133- management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
149+ connection_configuration .management .declare_exchange (
150+ ExchangeSpecification (name = exchange_name , arguments = {})
151+ )
134152
135- management .declare_queue (
153+ connection_configuration . management .declare_queue (
136154 QuorumQueueSpecification (name = queue_name )
137155 # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
138156 )
139157
140158 print ("binding queue to exchange" )
141- bind_name = management .bind (
159+ bind_name = connection_configuration . management .bind (
142160 BindingSpecification (
143161 source_exchange = exchange_name ,
144162 destination_queue = queue_name ,
@@ -151,30 +169,34 @@ def main() -> None:
151169 addr_queue = AddressHelper .queue_address (queue_name )
152170
153171 print ("create a publisher and publish a test message" )
154- if publisher is None :
155- publisher = connection .publisher (addr )
172+ if connection_configuration .publisher is None :
173+ connection_configuration .publisher = (
174+ connection_configuration .connection .publisher (addr )
175+ )
156176
157177 print ("purging the queue" )
158- messages_purged = management .purge_queue (queue_name )
178+ messages_purged = connection_configuration . management .purge_queue (queue_name )
159179
160180 print ("messages purged: " + str (messages_purged ))
161181 # management.close()
162182
163- # publish 10 messages
183+ # publishing messages
164184 while True :
165185 for i in range (messages_to_publish ):
166186
167187 if i % 1000 == 0 :
168- print ("publishing" )
188+ print ("publishing 1000 messages... " )
169189 try :
170- publisher .publish (Message (body = "test" ))
190+ if connection_configuration .publisher is not None :
191+ connection_configuration .publisher .publish (Message (body = "test" ))
171192 except ConnectionClosed :
172193 print ("publisher closing exception, resubmitting" )
173194 continue
174195
175- print ("closing" )
196+ print ("closing publisher " )
176197 try :
177- publisher .close ()
198+ if connection_configuration .publisher is not None :
199+ connection_configuration .publisher .close ()
178200 except ConnectionClosed :
179201 print ("publisher closing exception, resubmitting" )
180202 continue
@@ -183,12 +205,16 @@ def main() -> None:
183205 print (
184206 "create a consumer and consume the test message - press control + c to terminate to consume"
185207 )
186- if consumer is None :
187- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
208+ if connection_configuration .consumer is None :
209+ connection_configuration .consumer = (
210+ connection_configuration .connection .consumer (
211+ addr_queue , handler = MyMessageHandler ()
212+ )
213+ )
188214
189215 while True :
190216 try :
191- consumer .run ()
217+ connection_configuration . consumer .run ()
192218 except KeyboardInterrupt :
193219 pass
194220 except ConnectionClosed :
@@ -200,24 +226,24 @@ def main() -> None:
200226 break
201227
202228 print ("cleanup" )
203- consumer .close ()
229+ connection_configuration . consumer .close ()
204230 # once we finish consuming if we close the connection we need to create a new one
205231 # connection = create_connection()
206232 # management = connection.management()
207233
208234 print ("unbind" )
209- management .unbind (bind_name )
235+ connection_configuration . management .unbind (bind_name )
210236
211237 print ("delete queue" )
212- management .delete_queue (queue_name )
238+ connection_configuration . management .delete_queue (queue_name )
213239
214240 print ("delete exchange" )
215- management .delete_exchange (exchange_name )
241+ connection_configuration . management .delete_exchange (exchange_name )
216242
217243 print ("closing connections" )
218- management .close ()
244+ connection_configuration . management .close ()
219245 print ("after management closing" )
220- connection .close ()
246+ connection_configuration . connection .close ()
221247 print ("after connection closing" )
222248
223249
0 commit comments