diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 5187b75..9706446 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -1,5 +1,5 @@ # type: ignore - +import threading from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, @@ -78,8 +78,16 @@ def create_connection() -> Connection: return connection -def main() -> None: +def threaded_function(addr_queue): + connection = create_connection() + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + try: + consumer.run() + except KeyboardInterrupt: + pass + +def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -91,7 +99,7 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - #management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) management.declare_queue( StreamSpecification(name=queue_name) @@ -99,64 +107,59 @@ def main() -> None: ) print("binding queue to exchange") - #bind_name = management.bind( + # bind_name = management.bind( # BindingSpecification( # source_exchange=exchange_name, # destination_queue=queue_name, # binding_key=routing_key, # ) - #) + # ) - #addr = AddressHelper.exchange_address(exchange_name, routing_key) + # addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) - print("create a publisher and publish a test message") + thread = threading.Thread(target=threaded_function, args=(addr_queue,)) + thread.start() + ## press control + c to terminate the consumer + + # print("create a publisher and publish a test message") publisher = connection.publisher(addr_queue) - print("purging the queue") - #messages_purged = management.purge_queue(queue_name) + # print("purging the queue") + # messages_purged = management.purge_queue(queue_name) - #print("messages purged: " + str(messages_purged)) + # print("messages purged: " + str(messages_purged)) # management.close() # publish 10 messages for i in range(messages_to_publish): status = publisher.publish(Message(body="test")) - #if status.ACCEPTED: - # print("message accepted") - #elif status.RELEASED: - # print("message not routed") - #elif status.REJECTED: - # print("message not rejected") - + # # if status.ACCEPTED: + # # print("message accepted") + # # elif status.RELEASED: + # # print("message not routed") + # # elif status.REJECTED: + # # print("message not rejected") + # publisher.close() + # + # print( + # "create a consumer and consume the test message - press control + c to terminate to consume" + # ) - print( - "create a consumer and consume the test message - press control + c to terminate to consume" - ) - - stream_filter_options = StreamFilterOptions() - stream_filter_options.offset(0) - - consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=stream_filter_options) - - try: - consumer.run() - except KeyboardInterrupt: - pass - + input("Press Enter to continue...") print("cleanup") - consumer.close() + # consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() # management = connection.management() print("unbind") - #management.unbind(bind_name) + # management.unbind(bind_name) print("delete queue") - #management.delete_queue(queue_name) + # management.delete_queue(queue_name) print("delete exchange") management.delete_exchange(exchange_name) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 60a7e8b..18ede92 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -60,11 +60,13 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler ) + receiver.credit = 1 else: print("stream option is not None") receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler ) + receiver.credit = 1 return receiver