11# type: ignore
2-
2+ import threading
33
44from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55 AddressHelper ,
@@ -78,8 +78,16 @@ def create_connection() -> Connection:
7878 return connection
7979
8080
81- def main () -> None :
81+ def threaded_function (addr_queue ):
82+ connection = create_connection ()
83+ consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
84+ try :
85+ consumer .run ()
86+ except KeyboardInterrupt :
87+ pass
88+
8289
90+ def main () -> None :
8391 exchange_name = "test-exchange"
8492 queue_name = "example-queue"
8593 routing_key = "routing-key"
@@ -91,72 +99,67 @@ def main() -> None:
9199 management = connection .management ()
92100
93101 print ("declaring exchange and queue" )
94- #management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
102+ # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
95103
96104 management .declare_queue (
97105 StreamSpecification (name = queue_name )
98106 # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
99107 )
100108
101109 print ("binding queue to exchange" )
102- #bind_name = management.bind(
110+ # bind_name = management.bind(
103111 # BindingSpecification(
104112 # source_exchange=exchange_name,
105113 # destination_queue=queue_name,
106114 # binding_key=routing_key,
107115 # )
108- #)
116+ # )
109117
110- #addr = AddressHelper.exchange_address(exchange_name, routing_key)
118+ # addr = AddressHelper.exchange_address(exchange_name, routing_key)
111119
112120 addr_queue = AddressHelper .queue_address (queue_name )
113121
114- print ("create a publisher and publish a test message" )
122+ thread = threading .Thread (target = threaded_function , args = (addr_queue ,))
123+ thread .start ()
124+ ## press control + c to terminate the consumer
125+
126+ # print("create a publisher and publish a test message")
115127 publisher = connection .publisher (addr_queue )
116128
117- print ("purging the queue" )
118- #messages_purged = management.purge_queue(queue_name)
129+ # print("purging the queue")
130+ # messages_purged = management.purge_queue(queue_name)
119131
120- #print("messages purged: " + str(messages_purged))
132+ # print("messages purged: " + str(messages_purged))
121133 # management.close()
122134
123135 # publish 10 messages
124136 for i in range (messages_to_publish ):
125137 status = publisher .publish (Message (body = "test" ))
126- # if status.ACCEPTED:
127- # print("message accepted")
128- # elif status.RELEASED:
129- # print("message not routed")
130- # elif status.REJECTED:
131- # print("message not rejected")
132-
138+ # # if status.ACCEPTED:
139+ # # print("message accepted")
140+ # # elif status.RELEASED:
141+ # # print("message not routed")
142+ # # elif status.REJECTED:
143+ # # print("message not rejected")
144+ #
133145 publisher .close ()
146+ #
147+ # print(
148+ # "create a consumer and consume the test message - press control + c to terminate to consume"
149+ # )
134150
135- print (
136- "create a consumer and consume the test message - press control + c to terminate to consume"
137- )
138-
139- stream_filter_options = StreamFilterOptions ()
140- stream_filter_options .offset (0 )
141-
142- consumer = connection .consumer (addr_queue , handler = MyMessageHandler (), stream_filter_options = stream_filter_options )
143-
144- try :
145- consumer .run ()
146- except KeyboardInterrupt :
147- pass
148-
151+ input ("Press Enter to continue..." )
149152 print ("cleanup" )
150- consumer .close ()
153+ # consumer.close()
151154 # once we finish consuming if we close the connection we need to create a new one
152155 # connection = create_connection()
153156 # management = connection.management()
154157
155158 print ("unbind" )
156- #management.unbind(bind_name)
159+ # management.unbind(bind_name)
157160
158161 print ("delete queue" )
159- #management.delete_queue(queue_name)
162+ # management.delete_queue(queue_name)
160163
161164 print ("delete exchange" )
162165 management .delete_exchange (exchange_name )
0 commit comments