|
1 | 1 | # type: ignore |
2 | | -import threading |
| 2 | + |
3 | 3 |
|
4 | 4 | from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, |
5 | 5 | AddressHelper, |
|
10 | 10 | ExchangeSpecification, |
11 | 11 | Message, |
12 | 12 | QuorumQueueSpecification, |
13 | | - StreamSpecification, |
14 | | - StreamFilterOptions |
15 | 13 | ) |
16 | 14 |
|
17 | 15 |
|
@@ -78,90 +76,81 @@ def create_connection() -> Connection: |
78 | 76 | return connection |
79 | 77 |
|
80 | 78 |
|
81 | | -def threaded_function(addr_queue): |
82 | | - connection = create_connection() |
83 | | - offset_specification = StreamFilterOptions() |
84 | | - offset_specification.offset(10) |
85 | | - consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=offset_specification) |
86 | | - try: |
87 | | - consumer.run() |
88 | | - except KeyboardInterrupt: |
89 | | - pass |
90 | | - |
91 | | - |
92 | 79 | def main() -> None: |
| 80 | + |
93 | 81 | exchange_name = "test-exchange" |
94 | 82 | queue_name = "example-queue" |
95 | 83 | routing_key = "routing-key" |
96 | | - messages_to_publish = 1000 |
| 84 | + messages_to_publish = 100000 |
97 | 85 |
|
98 | 86 | print("connection to amqp server") |
99 | 87 | connection = create_connection() |
100 | 88 |
|
101 | 89 | management = connection.management() |
102 | 90 |
|
103 | 91 | print("declaring exchange and queue") |
104 | | - # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) |
| 92 | + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) |
105 | 93 |
|
106 | | - #management.declare_queue( |
107 | | - # StreamSpecification(name=queue_name) |
| 94 | + management.declare_queue( |
| 95 | + QuorumQueueSpecification(name=queue_name) |
108 | 96 | # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") |
109 | | - #) |
| 97 | + ) |
110 | 98 |
|
111 | 99 | print("binding queue to exchange") |
112 | | - # bind_name = management.bind( |
113 | | - # BindingSpecification( |
114 | | - # source_exchange=exchange_name, |
115 | | - # destination_queue=queue_name, |
116 | | - # binding_key=routing_key, |
117 | | - # ) |
118 | | - # ) |
| 100 | + bind_name = management.bind( |
| 101 | + BindingSpecification( |
| 102 | + source_exchange=exchange_name, |
| 103 | + destination_queue=queue_name, |
| 104 | + binding_key=routing_key, |
| 105 | + ) |
| 106 | + ) |
119 | 107 |
|
120 | | - # addr = AddressHelper.exchange_address(exchange_name, routing_key) |
| 108 | + addr = AddressHelper.exchange_address(exchange_name, routing_key) |
121 | 109 |
|
122 | 110 | addr_queue = AddressHelper.queue_address(queue_name) |
123 | 111 |
|
124 | | - thread = threading.Thread(target=threaded_function, args=(addr_queue,)) |
125 | | - thread.start() |
126 | | - ## press control + c to terminate the consumer |
127 | | - |
128 | | - # print("create a publisher and publish a test message") |
129 | | - #publisher = connection.publisher(addr_queue) |
| 112 | + print("create a publisher and publish a test message") |
| 113 | + publisher = connection.publisher(addr) |
130 | 114 |
|
131 | | - # print("purging the queue") |
132 | | - # messages_purged = management.purge_queue(queue_name) |
| 115 | + print("purging the queue") |
| 116 | + messages_purged = management.purge_queue(queue_name) |
133 | 117 |
|
134 | | - # print("messages purged: " + str(messages_purged)) |
| 118 | + print("messages purged: " + str(messages_purged)) |
135 | 119 | # management.close() |
136 | 120 |
|
137 | 121 | # publish 10 messages |
138 | | - #for i in range(messages_to_publish): |
139 | | - # status = publisher.publish(Message(body="test")) |
140 | | - # # if status.ACCEPTED: |
141 | | - # # print("message accepted") |
142 | | - # # elif status.RELEASED: |
143 | | - # # print("message not routed") |
144 | | - # # elif status.REJECTED: |
145 | | - # # print("message not rejected") |
146 | | - # |
147 | | - #publisher.close() |
148 | | - # |
149 | | - # print( |
150 | | - # "create a consumer and consume the test message - press control + c to terminate to consume" |
151 | | - # ) |
| 122 | + for i in range(messages_to_publish): |
| 123 | + status = publisher.publish(Message(body="test")) |
| 124 | + if status.ACCEPTED: |
| 125 | + print("message accepted") |
| 126 | + elif status.RELEASED: |
| 127 | + print("message not routed") |
| 128 | + elif status.REJECTED: |
| 129 | + print("message not rejected") |
| 130 | + |
| 131 | + publisher.close() |
| 132 | + |
| 133 | + print( |
| 134 | + "create a consumer and consume the test message - press control + c to terminate to consume" |
| 135 | + ) |
| 136 | + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) |
| 137 | + |
| 138 | + try: |
| 139 | + consumer.run() |
| 140 | + except KeyboardInterrupt: |
| 141 | + pass |
152 | 142 |
|
153 | | - input("Press Enter to continue...") |
154 | 143 | print("cleanup") |
155 | | - # consumer.close() |
| 144 | + consumer.close() |
156 | 145 | # once we finish consuming if we close the connection we need to create a new one |
157 | 146 | # connection = create_connection() |
158 | 147 | # management = connection.management() |
159 | 148 |
|
160 | 149 | print("unbind") |
161 | | - # management.unbind(bind_name) |
| 150 | + management.unbind(bind_name) |
162 | 151 |
|
163 | 152 | print("delete queue") |
164 | | - # management.delete_queue(queue_name) |
| 153 | + management.delete_queue(queue_name) |
165 | 154 |
|
166 | 155 | print("delete exchange") |
167 | 156 | management.delete_exchange(exchange_name) |
|
0 commit comments