|
5 | 5 | ArgumentOutOfRangeException, |
6 | 6 | BindingSpecification, |
7 | 7 | Connection, |
| 8 | + ConnectionClosed, |
8 | 9 | ExchangeSpecification, |
9 | 10 | Message, |
10 | 11 | QuorumQueueSpecification, |
11 | 12 | ) |
12 | 13 |
|
13 | 14 | from .http_requests import delete_all_connections |
14 | 15 |
|
15 | | -disconnected = False |
16 | | - |
17 | | - |
18 | | -def on_disconnected(): |
19 | | - |
20 | | - print("disconnected") |
21 | | - global disconnected |
22 | | - disconnected = True |
23 | | - |
24 | 16 |
|
25 | 17 | def test_publish_queue(connection: Connection) -> None: |
26 | 18 |
|
@@ -143,31 +135,85 @@ def test_publish_purge(connection: Connection) -> None: |
143 | 135 | assert message_purged == 20 |
144 | 136 |
|
145 | 137 |
|
146 | | -def test_disconnection() -> None: |
147 | | - connection = Connection( |
| 138 | +def test_disconnection_reconnection() -> None: |
| 139 | + disconnected = False |
| 140 | + reconnected = False |
| 141 | + generic_exception_raised = False |
| 142 | + publisher = None |
| 143 | + queue_name = "test-queue" |
| 144 | + connection_test = None |
| 145 | + |
| 146 | + time.sleep(60) |
| 147 | + |
| 148 | + def on_disconnected(): |
| 149 | + |
| 150 | + nonlocal publisher |
| 151 | + nonlocal queue_name |
| 152 | + nonlocal connection_test |
| 153 | + |
| 154 | + # reconnect |
| 155 | + if connection_test is not None: |
| 156 | + connection_test = Connection("amqp://guest:guest@localhost:5672/") |
| 157 | + connection_test.dial() |
| 158 | + |
| 159 | + if publisher is not None: |
| 160 | + publisher = connection_test.publisher("/queues/" + queue_name) |
| 161 | + |
| 162 | + nonlocal reconnected |
| 163 | + reconnected = True |
| 164 | + |
| 165 | + connection_test = Connection( |
148 | 166 | "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected |
149 | 167 | ) |
150 | | - connection.dial() |
| 168 | + connection_test.dial() |
151 | 169 | # delay |
152 | | - time.sleep(10) |
153 | | - messages_to_publish = 20 |
| 170 | + time.sleep(5) |
| 171 | + messages_to_publish = 10000 |
154 | 172 | queue_name = "test-queue" |
155 | | - management = connection.management() |
| 173 | + management = connection_test.management() |
156 | 174 |
|
157 | 175 | management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
158 | 176 |
|
159 | | - try: |
160 | | - publisher = connection.publisher("/queues/" + queue_name) |
| 177 | + management.close() |
| 178 | + |
| 179 | + publisher = connection_test.publisher("/queues/" + queue_name) |
| 180 | + while True: |
| 181 | + |
161 | 182 | for i in range(messages_to_publish): |
162 | 183 | if i == 5: |
163 | 184 | # simulate a disconnection |
164 | 185 | delete_all_connections() |
165 | | - publisher.publish(Message(body="test")) |
| 186 | + try: |
| 187 | + publisher.publish(Message(body="test")) |
166 | 188 |
|
167 | | - except Exception: |
168 | | - pass |
| 189 | + except ConnectionClosed: |
| 190 | + disconnected = True |
| 191 | + continue |
| 192 | + |
| 193 | + except Exception: |
| 194 | + generic_exception_raised = True |
| 195 | + |
| 196 | + break |
| 197 | + |
| 198 | + publisher.close() |
| 199 | + |
| 200 | + # cleanup, we need to create a new connection as the previous one |
| 201 | + # was closed by the test |
| 202 | + |
| 203 | + connection_test = Connection("amqp://guest:guest@localhost:5672/") |
| 204 | + connection_test.dial() |
| 205 | + |
| 206 | + management = connection_test.management() |
| 207 | + |
| 208 | + # purge the queue and check number of published messages |
| 209 | + message_purged = management.purge_queue(queue_name) |
| 210 | + |
| 211 | + management.delete_queue(queue_name) |
| 212 | + management.close() |
169 | 213 |
|
170 | | - connection.close() |
| 214 | + connection_test.close() |
171 | 215 |
|
172 | | - global disconnected |
| 216 | + assert generic_exception_raised is False |
173 | 217 | assert disconnected is True |
| 218 | + assert reconnected is True |
| 219 | + assert message_purged == messages_to_publish - 1 |
0 commit comments