|
19 | 19 |
|
20 | 20 | from mock import Mock, MagicMock, ANY
|
21 | 21 |
|
22 |
| -from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType |
| 22 | +from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut |
23 | 23 | from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion
|
24 | 24 | from cassandra.connection import Connection, ConnectionException
|
25 | 25 | from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage,
|
@@ -125,6 +125,39 @@ def test_other_result_message_kind(self):
|
125 | 125 | rf._set_result(None, None, None, Mock(spec=ResultMessage, kind=999, results=result))
|
126 | 126 | self.assertListEqual(list(rf.result()), result)
|
127 | 127 |
|
| 128 | + def test_heartbeat_defunct_deadlock(self): |
| 129 | + """ |
| 130 | + Heartbeat defuncts all connections and clears request queues. Response future times out and even |
| 131 | + if it has been removed from request queue, timeout exception must be thrown. Otherwise event loop |
| 132 | + will deadlock on eventual ResponseFuture.result() call. |
| 133 | +
|
| 134 | + PYTHON-1044 |
| 135 | + """ |
| 136 | + |
| 137 | + connection = MagicMock(spec=Connection) |
| 138 | + connection._requests = {} |
| 139 | + |
| 140 | + pool = Mock() |
| 141 | + pool.is_shutdown = False |
| 142 | + pool.borrow_connection.return_value = [connection, 1] |
| 143 | + |
| 144 | + session = self.make_basic_session() |
| 145 | + session.cluster._default_load_balancing_policy.make_query_plan.return_value = [Mock(), Mock()] |
| 146 | + session._pools.get.return_value = pool |
| 147 | + |
| 148 | + query = SimpleStatement("SELECT * FROM foo") |
| 149 | + message = QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE) |
| 150 | + |
| 151 | + rf = ResponseFuture(session, message, query, 1) |
| 152 | + rf.send_request() |
| 153 | + |
| 154 | + # Simulate Connection.error_all_requests() after heartbeat defuncts |
| 155 | + connection._requests = {} |
| 156 | + |
| 157 | + # Simulate ResponseFuture timing out |
| 158 | + rf._on_timeout() |
| 159 | + self.assertRaises(OperationTimedOut, rf.result) |
| 160 | + |
128 | 161 | def test_read_timeout_error_message(self):
|
129 | 162 | session = self.make_session()
|
130 | 163 | query = SimpleStatement("SELECT * FROM foo")
|
|
0 commit comments