@@ -80,6 +80,8 @@ def create_connection() -> Connection:
8080
8181def threaded_function (addr_queue ):
8282 connection = create_connection ()
83+ offset_specification = StreamFilterOptions ()
84+ offset_specification .offset (10 )
8385 consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
8486 try :
8587 consumer .run ()
@@ -101,10 +103,10 @@ def main() -> None:
101103 print ("declaring exchange and queue" )
102104 # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
103105
104- management .declare_queue (
105- StreamSpecification (name = queue_name )
106+ # management.declare_queue(
107+ # StreamSpecification(name=queue_name)
106108 # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
107- )
109+ # )
108110
109111 print ("binding queue to exchange" )
110112 # bind_name = management.bind(
@@ -124,7 +126,7 @@ def main() -> None:
124126 ## press control + c to terminate the consumer
125127
126128 # print("create a publisher and publish a test message")
127- publisher = connection .publisher (addr_queue )
129+ # publisher = connection.publisher(addr_queue)
128130
129131 # print("purging the queue")
130132 # messages_purged = management.purge_queue(queue_name)
@@ -133,16 +135,16 @@ def main() -> None:
133135 # management.close()
134136
135137 # publish 10 messages
136- for i in range (messages_to_publish ):
137- status = publisher .publish (Message (body = "test" ))
138+ # for i in range(messages_to_publish):
139+ # status = publisher.publish(Message(body="test"))
138140 # # if status.ACCEPTED:
139141 # # print("message accepted")
140142 # # elif status.RELEASED:
141143 # # print("message not routed")
142144 # # elif status.REJECTED:
143145 # # print("message not rejected")
144146 #
145- publisher .close ()
147+ # publisher.close()
146148 #
147149 # print(
148150 # "create a consumer and consume the test message - press control + c to terminate to consume"
0 commit comments