|
5 | 5 | ArgumentOutOfRangeException, |
6 | 6 | BindingSpecification, |
7 | 7 | Connection, |
| 8 | + ConnectionClosed, |
8 | 9 | ExchangeSpecification, |
9 | 10 | Message, |
10 | 11 | QuorumQueueSpecification, |
11 | 12 | ) |
12 | 13 |
|
13 | 14 | from .http_requests import delete_all_connections |
14 | 15 |
|
15 | | -disconnected = False |
16 | | - |
17 | | - |
18 | | -def on_disconnected(): |
19 | | - |
20 | | - print("disconnected") |
21 | | - global disconnected |
22 | | - disconnected = True |
23 | | - |
24 | 16 |
|
25 | 17 | def test_publish_queue(connection: Connection) -> None: |
26 | 18 |
|
@@ -166,31 +158,85 @@ def test_publish_purge(connection: Connection) -> None: |
166 | 158 | assert message_purged == 20 |
167 | 159 |
|
168 | 160 |
|
169 | | -def test_disconnection() -> None: |
170 | | - connection = Connection( |
| 161 | +def test_disconnection_reconnection() -> None: |
| 162 | + disconnected = False |
| 163 | + reconnected = False |
| 164 | + generic_exception_raised = False |
| 165 | + publisher = None |
| 166 | + queue_name = "test-queue" |
| 167 | + connection_test = None |
| 168 | + |
| 169 | + time.sleep(60) |
| 170 | + |
| 171 | + def on_disconnected(): |
| 172 | + |
| 173 | + nonlocal publisher |
| 174 | + nonlocal queue_name |
| 175 | + nonlocal connection_test |
| 176 | + |
| 177 | + # reconnect |
| 178 | + if connection_test is not None: |
| 179 | + connection_test = Connection("amqp://guest:guest@localhost:5672/") |
| 180 | + connection_test.dial() |
| 181 | + |
| 182 | + if publisher is not None: |
| 183 | + publisher = connection_test.publisher("/queues/" + queue_name) |
| 184 | + |
| 185 | + nonlocal reconnected |
| 186 | + reconnected = True |
| 187 | + |
| 188 | + connection_test = Connection( |
171 | 189 | "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected |
172 | 190 | ) |
173 | | - connection.dial() |
| 191 | + connection_test.dial() |
174 | 192 | # delay |
175 | | - time.sleep(10) |
176 | | - messages_to_publish = 20 |
| 193 | + time.sleep(5) |
| 194 | + messages_to_publish = 10000 |
177 | 195 | queue_name = "test-queue" |
178 | | - management = connection.management() |
| 196 | + management = connection_test.management() |
179 | 197 |
|
180 | 198 | management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
181 | 199 |
|
182 | | - try: |
183 | | - publisher = connection.publisher("/queues/" + queue_name) |
| 200 | + management.close() |
| 201 | + |
| 202 | + publisher = connection_test.publisher("/queues/" + queue_name) |
| 203 | + while True: |
| 204 | + |
184 | 205 | for i in range(messages_to_publish): |
185 | 206 | if i == 5: |
186 | 207 | # simulate a disconnection |
187 | 208 | delete_all_connections() |
188 | | - publisher.publish(Message(body="test")) |
| 209 | + try: |
| 210 | + publisher.publish(Message(body="test")) |
189 | 211 |
|
190 | | - except Exception: |
191 | | - pass |
| 212 | + except ConnectionClosed: |
| 213 | + disconnected = True |
| 214 | + continue |
| 215 | + |
| 216 | + except Exception: |
| 217 | + generic_exception_raised = True |
| 218 | + |
| 219 | + break |
| 220 | + |
| 221 | + publisher.close() |
| 222 | + |
| 223 | + # cleanup, we need to create a new connection as the previous one |
| 224 | + # was closed by the test |
| 225 | + |
| 226 | + connection_test = Connection("amqp://guest:guest@localhost:5672/") |
| 227 | + connection_test.dial() |
| 228 | + |
| 229 | + management = connection_test.management() |
| 230 | + |
| 231 | + # purge the queue and check number of published messages |
| 232 | + message_purged = management.purge_queue(queue_name) |
| 233 | + |
| 234 | + management.delete_queue(queue_name) |
| 235 | + management.close() |
192 | 236 |
|
193 | | - connection.close() |
| 237 | + connection_test.close() |
194 | 238 |
|
195 | | - global disconnected |
| 239 | + assert generic_exception_raised is False |
196 | 240 | assert disconnected is True |
| 241 | + assert reconnected is True |
| 242 | + assert message_purged == messages_to_publish - 1 |
0 commit comments