|
28 | 28 | from cassandra.io.asyncorereactor import AsyncoreConnection
|
29 | 29 | from tests import connection_class, thread_pool_executor_class
|
30 | 30 | from tests.integration import (PROTOCOL_VERSION, requiressimulacron)
|
31 |
| -from tests.integration.util import assert_quiescent_pool_state |
| 31 | +from tests.integration.util import assert_quiescent_pool_state, late |
32 | 32 | from tests.integration.simulacron import SimulacronBase
|
33 | 33 | from tests.integration.simulacron.utils import (NO_THEN, PrimeOptions,
|
34 | 34 | prime_query, prime_request,
|
@@ -178,6 +178,79 @@ def test_callbacks_and_pool_when_oto(self):
|
178 | 178 | errback.assert_called_once()
|
179 | 179 | callback.assert_not_called()
|
180 | 180 |
|
| 181 | + def test_heartbeat_defunct_deadlock(self): |
| 182 | + """ |
| 183 | + Ensure that there is no deadlock when request is in-flight and heartbeat defuncts connection |
| 184 | + @since 3.16 |
| 185 | + @jira_ticket PYTHON-1044 |
| 186 | + @expected_result an OperationTimeout is raised and no deadlock occurs |
| 187 | +
|
| 188 | + @test_category connection |
| 189 | + """ |
| 190 | + start_and_prime_singledc() |
| 191 | + |
| 192 | + # This is all about timing. We will need the QUERY response future to time out and the heartbeat to defunct |
| 193 | + # at the same moment. The latter will schedule a QUERY retry to another node in case the pool is not |
| 194 | + # already shut down. If and only if the response future timeout falls in between the retry scheduling and |
| 195 | + # its execution the deadlock occurs. The odds are low, so we need to help fate a bit: |
| 196 | + # 1) Make one heartbeat messages be sent to every node |
| 197 | + # 2) Our QUERY goes always to the same host |
| 198 | + # 3) This host needs to defunct first |
| 199 | + # 4) Open a small time window for the response future timeout, i.e. block executor threads for retry |
| 200 | + # execution and last connection to defunct |
| 201 | + query_to_prime = "SELECT * from testkesypace.testtable" |
| 202 | + query_host = "127.0.0.2" |
| 203 | + heartbeat_interval = 1 |
| 204 | + heartbeat_timeout = 1 |
| 205 | + lag = 0.05 |
| 206 | + never = 9999 |
| 207 | + |
| 208 | + class PatchedRoundRobinPolicy(RoundRobinPolicy): |
| 209 | + # Send always to same host |
| 210 | + def make_query_plan(self, working_keyspace=None, query=None): |
| 211 | + print query |
| 212 | + print self._live_hosts |
| 213 | + if query and query.query_string == query_to_prime: |
| 214 | + return filter(lambda h: h == query_host, self._live_hosts) |
| 215 | + else: |
| 216 | + return super(PatchedRoundRobinPolicy, self).make_query_plan() |
| 217 | + |
| 218 | + class PatchedCluster(Cluster): |
| 219 | + # Make sure that QUERY connection will timeout first |
| 220 | + def get_connection_holders(self): |
| 221 | + holders = super(PatchedCluster, self).get_connection_holders() |
| 222 | + return sorted(holders, reverse=True, key=lambda v: int(v._connection.host == query_host)) |
| 223 | + |
| 224 | + # Block executor thread like closing a dead socket could do |
| 225 | + def connection_factory(self, *args, **kwargs): |
| 226 | + conn = super(PatchedCluster, self).connection_factory(*args, **kwargs) |
| 227 | + conn.defunct = late(seconds=2*lag)(conn.defunct) |
| 228 | + return conn |
| 229 | + |
| 230 | + cluster = PatchedCluster( |
| 231 | + protocol_version=PROTOCOL_VERSION, |
| 232 | + compression=False, |
| 233 | + idle_heartbeat_interval=heartbeat_interval, |
| 234 | + idle_heartbeat_timeout=heartbeat_timeout, |
| 235 | + load_balancing_policy=PatchedRoundRobinPolicy() |
| 236 | + ) |
| 237 | + session = cluster.connect() |
| 238 | + self.addCleanup(cluster.shutdown) |
| 239 | + |
| 240 | + prime_query(query_to_prime, then={"delay_in_ms": never}) |
| 241 | + |
| 242 | + # Make heartbeat due |
| 243 | + time.sleep(heartbeat_interval) |
| 244 | + |
| 245 | + future = session.execute_async(query_to_prime, timeout=heartbeat_interval+heartbeat_timeout+3*lag) |
| 246 | + # Delay thread execution like kernel could do |
| 247 | + future._retry_task = late(seconds=4*lag)(future._retry_task) |
| 248 | + |
| 249 | + prime_request(PrimeOptions(then={"result": "no_result", "delay_in_ms": never})) |
| 250 | + prime_request(RejectConnections("unbind")) |
| 251 | + |
| 252 | + self.assertRaisesRegexp(OperationTimedOut, "Connection defunct by heartbeat", future.result) |
| 253 | + |
181 | 254 | def test_close_when_query(self):
|
182 | 255 | """
|
183 | 256 | Test to ensure the driver behaves correctly if the connection is closed
|
|
0 commit comments