1010 ExchangeSpecification ,
1111 Message ,
1212 QuorumQueueSpecification ,
13+ StreamSpecification ,
14+ StreamFilterOptions
1315)
1416
1517
@@ -81,59 +83,63 @@ def main() -> None:
8183 exchange_name = "test-exchange"
8284 queue_name = "example-queue"
8385 routing_key = "routing-key"
84- messages_to_publish = 100000
86+ messages_to_publish = 1000
8587
8688 print ("connection to amqp server" )
8789 connection = create_connection ()
8890
8991 management = connection .management ()
9092
9193 print ("declaring exchange and queue" )
92- management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
94+ # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
9395
9496 management .declare_queue (
95- QuorumQueueSpecification (name = queue_name )
97+ StreamSpecification (name = queue_name )
9698 # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
9799 )
98100
99101 print ("binding queue to exchange" )
100- bind_name = management .bind (
101- BindingSpecification (
102- source_exchange = exchange_name ,
103- destination_queue = queue_name ,
104- binding_key = routing_key ,
105- )
106- )
102+ # bind_name = management.bind(
103+ # BindingSpecification(
104+ # source_exchange=exchange_name,
105+ # destination_queue=queue_name,
106+ # binding_key=routing_key,
107+ # )
108+ # )
107109
108- addr = AddressHelper .exchange_address (exchange_name , routing_key )
110+ # addr = AddressHelper.exchange_address(exchange_name, routing_key)
109111
110112 addr_queue = AddressHelper .queue_address (queue_name )
111113
112114 print ("create a publisher and publish a test message" )
113- publisher = connection .publisher (addr )
115+ publisher = connection .publisher (addr_queue )
114116
115117 print ("purging the queue" )
116- messages_purged = management .purge_queue (queue_name )
118+ # messages_purged = management.purge_queue(queue_name)
117119
118- print ("messages purged: " + str (messages_purged ))
120+ # print("messages purged: " + str(messages_purged))
119121 # management.close()
120122
121123 # publish 10 messages
122124 for i in range (messages_to_publish ):
123125 status = publisher .publish (Message (body = "test" ))
124- if status .ACCEPTED :
125- print ("message accepted" )
126- elif status .RELEASED :
127- print ("message not routed" )
128- elif status .REJECTED :
129- print ("message not rejected" )
126+ # if status.ACCEPTED:
127+ # print("message accepted")
128+ # elif status.RELEASED:
129+ # print("message not routed")
130+ # elif status.REJECTED:
131+ # print("message not rejected")
130132
131133 publisher .close ()
132134
133135 print (
134136 "create a consumer and consume the test message - press control + c to terminate to consume"
135137 )
136- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
138+
139+ stream_filter_options = StreamFilterOptions ()
140+ stream_filter_options .offset (0 )
141+
142+ consumer = connection .consumer (addr_queue , handler = MyMessageHandler (), stream_filter_options = stream_filter_options )
137143
138144 try :
139145 consumer .run ()
@@ -147,10 +153,10 @@ def main() -> None:
147153 # management = connection.management()
148154
149155 print ("unbind" )
150- management .unbind (bind_name )
156+ # management.unbind(bind_name)
151157
152158 print ("delete queue" )
153- management .delete_queue (queue_name )
159+ # management.delete_queue(queue_name)
154160
155161 print ("delete exchange" )
156162 management .delete_exchange (exchange_name )
0 commit comments