Skip to content

Commit 155903b

Browse files
authored
Merge pull request datastax#1048 from nschrader/python-1044
[Python 1044] Driver hangs/deadlock if all connections dropped by heartbeat whilst request in flight and request times out
2 parents e300724 + ea97ce9 commit 155903b

File tree

7 files changed

+135
-8
lines changed

7 files changed

+135
-8
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Unreleased
55
Bug Fixes
66
---------
77
* Connection setup methods prevent using ExecutionProfile in cqlengine (PYTHON-1009)
8+
* Driver deadlock if all connections dropped by heartbeat whilst request in flight and request times out (PYTHON-1044)
89

910
3.19.0
1011
======

cassandra/cluster.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3722,10 +3722,14 @@ def _on_timeout(self, _attempts=0):
37223722
if self._connection is not None:
37233723
try:
37243724
self._connection._requests.pop(self._req_id)
3725-
# This prevents the race condition of the
3726-
# event loop thread just receiving the waited message
3727-
# If it arrives after this, it will be ignored
3725+
# PYTHON-1044
3726+
# This request might have been removed from the connection after the latter was defunct by heartbeat.
3727+
# We should still raise OperationTimedOut to reject the future so that the main event thread will not
3728+
# wait for it endlessly
37283729
except KeyError:
3730+
key = "Connection defunct by heartbeat"
3731+
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
3732+
self._set_final_exception(OperationTimedOut(errors, self._current_host))
37293733
return
37303734

37313735
pool = self.session._pools.get(self._current_host)

tests/integration/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def _get_dse_version_from_cass(cass_version):
126126
CASSANDRA_IP = os.getenv('CASSANDRA_IP', '127.0.0.1')
127127
CASSANDRA_DIR = os.getenv('CASSANDRA_DIR', None)
128128

129-
default_cassandra_version = '3.11'
129+
default_cassandra_version = '3.11.4'
130130
cv_string = os.getenv('CASSANDRA_VERSION', default_cassandra_version)
131131
mcv_string = os.getenv('MAPPED_CASSANDRA_VERSION', None)
132132
try:
@@ -366,6 +366,9 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,
366366
elif ccm_options is None:
367367
ccm_options = CCM_KWARGS.copy()
368368

369+
if 'version' in ccm_options and not isinstance(ccm_options['version'], Version):
370+
ccm_options['version'] = Version(ccm_options['version'])
371+
369372
cassandra_version = ccm_options.get('version', CCM_VERSION)
370373
dse_version = ccm_options.get('version', DSE_VERSION)
371374

tests/integration/simulacron/test_connection.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
from cassandra.policies import HostStateListener, RoundRobinPolicy
2828
from cassandra.io.asyncorereactor import AsyncoreConnection
2929
from tests import connection_class, thread_pool_executor_class
30+
from tests.unit.cython.utils import cythontest
3031
from tests.integration import (PROTOCOL_VERSION, requiressimulacron)
31-
from tests.integration.util import assert_quiescent_pool_state
32+
from tests.integration.util import assert_quiescent_pool_state, late
3233
from tests.integration.simulacron import SimulacronBase
3334
from tests.integration.simulacron.utils import (NO_THEN, PrimeOptions,
3435
prime_query, prime_request,
@@ -178,6 +179,80 @@ def test_callbacks_and_pool_when_oto(self):
178179
errback.assert_called_once()
179180
callback.assert_not_called()
180181

182+
@cythontest
183+
def test_heartbeat_defunct_deadlock(self):
184+
"""
185+
Ensure that there is no deadlock when request is in-flight and heartbeat defuncts connection
186+
@since 3.16
187+
@jira_ticket PYTHON-1044
188+
@expected_result an OperationTimeout is raised and no deadlock occurs
189+
190+
@test_category connection
191+
"""
192+
start_and_prime_singledc()
193+
194+
# This is all about timing. We will need the QUERY response future to time out and the heartbeat to defunct
195+
# at the same moment. The latter will schedule a QUERY retry to another node in case the pool is not
196+
# already shut down. If and only if the response future timeout falls in between the retry scheduling and
197+
# its execution the deadlock occurs. The odds are low, so we need to help fate a bit:
198+
# 1) Make one heartbeat messages be sent to every node
199+
# 2) Our QUERY goes always to the same host
200+
# 3) This host needs to defunct first
201+
# 4) Open a small time window for the response future timeout, i.e. block executor threads for retry
202+
# execution and last connection to defunct
203+
query_to_prime = "SELECT * from testkesypace.testtable"
204+
query_host = "127.0.0.2"
205+
heartbeat_interval = 1
206+
heartbeat_timeout = 1
207+
lag = 0.05
208+
never = 9999
209+
210+
class PatchedRoundRobinPolicy(RoundRobinPolicy):
211+
# Send always to same host
212+
def make_query_plan(self, working_keyspace=None, query=None):
213+
print query
214+
print self._live_hosts
215+
if query and query.query_string == query_to_prime:
216+
return filter(lambda h: h == query_host, self._live_hosts)
217+
else:
218+
return super(PatchedRoundRobinPolicy, self).make_query_plan()
219+
220+
class PatchedCluster(Cluster):
221+
# Make sure that QUERY connection will timeout first
222+
def get_connection_holders(self):
223+
holders = super(PatchedCluster, self).get_connection_holders()
224+
return sorted(holders, reverse=True, key=lambda v: int(v._connection.host == query_host))
225+
226+
# Block executor thread like closing a dead socket could do
227+
def connection_factory(self, *args, **kwargs):
228+
conn = super(PatchedCluster, self).connection_factory(*args, **kwargs)
229+
conn.defunct = late(seconds=2*lag)(conn.defunct)
230+
return conn
231+
232+
cluster = PatchedCluster(
233+
protocol_version=PROTOCOL_VERSION,
234+
compression=False,
235+
idle_heartbeat_interval=heartbeat_interval,
236+
idle_heartbeat_timeout=heartbeat_timeout,
237+
load_balancing_policy=PatchedRoundRobinPolicy()
238+
)
239+
session = cluster.connect()
240+
self.addCleanup(cluster.shutdown)
241+
242+
prime_query(query_to_prime, then={"delay_in_ms": never})
243+
244+
# Make heartbeat due
245+
time.sleep(heartbeat_interval)
246+
247+
future = session.execute_async(query_to_prime, timeout=heartbeat_interval+heartbeat_timeout+3*lag)
248+
# Delay thread execution like kernel could do
249+
future._retry_task = late(seconds=4*lag)(future._retry_task)
250+
251+
prime_request(PrimeOptions(then={"result": "no_result", "delay_in_ms": never}))
252+
prime_request(RejectConnections("unbind"))
253+
254+
self.assertRaisesRegexp(OperationTimedOut, "Connection defunct by heartbeat", future.result)
255+
181256
def test_close_when_query(self):
182257
"""
183258
Test to ensure the driver behaves correctly if the connection is closed

tests/integration/upgrade/test_upgrade.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828

2929
two_to_three_path = upgrade_paths([
30-
UpgradePath("2.2.9-3.11", {"version": "2.2.9"}, {"version": "3.11"}, {}),
30+
UpgradePath("2.2.9-3.11", {"version": "2.2.9"}, {"version": "3.11.4"}, {}),
3131
])
3232
class UpgradeTests(UpgradeBase):
3333
@two_to_three_path
@@ -176,7 +176,7 @@ def _assert_same_token_map(self, original, new):
176176

177177

178178
two_to_three_with_auth_path = upgrade_paths([
179-
UpgradePath("2.2.9-3.11-auth", {"version": "2.2.9"}, {"version": "3.11"},
179+
UpgradePath("2.2.9-3.11-auth", {"version": "2.2.9"}, {"version": "3.11.4"},
180180
{'authenticator': 'PasswordAuthenticator',
181181
'authorizer': 'CassandraAuthorizer'}),
182182
])

tests/integration/util.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from tests.integration import PROTOCOL_VERSION
16+
from functools import wraps
1617
import time
1718

1819

@@ -96,3 +97,13 @@ def wrapped_condition():
9697

9798
# last attempt, let the exception raise
9899
condition()
100+
101+
102+
def late(seconds=1):
103+
def decorator(func):
104+
@wraps(func)
105+
def wrapper(*args, **kwargs):
106+
time.sleep(seconds)
107+
func(*args, **kwargs)
108+
return wrapper
109+
return decorator

tests/unit/test_response_future.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from mock import Mock, MagicMock, ANY
2121

22-
from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType
22+
from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut
2323
from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion
2424
from cassandra.connection import Connection, ConnectionException
2525
from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage,
@@ -125,6 +125,39 @@ def test_other_result_message_kind(self):
125125
rf._set_result(None, None, None, Mock(spec=ResultMessage, kind=999, results=result))
126126
self.assertListEqual(list(rf.result()), result)
127127

128+
def test_heartbeat_defunct_deadlock(self):
129+
"""
130+
Heartbeat defuncts all connections and clears request queues. Response future times out and even
131+
if it has been removed from request queue, timeout exception must be thrown. Otherwise event loop
132+
will deadlock on eventual ResponseFuture.result() call.
133+
134+
PYTHON-1044
135+
"""
136+
137+
connection = MagicMock(spec=Connection)
138+
connection._requests = {}
139+
140+
pool = Mock()
141+
pool.is_shutdown = False
142+
pool.borrow_connection.return_value = [connection, 1]
143+
144+
session = self.make_basic_session()
145+
session.cluster._default_load_balancing_policy.make_query_plan.return_value = [Mock(), Mock()]
146+
session._pools.get.return_value = pool
147+
148+
query = SimpleStatement("SELECT * FROM foo")
149+
message = QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE)
150+
151+
rf = ResponseFuture(session, message, query, 1)
152+
rf.send_request()
153+
154+
# Simulate Connection.error_all_requests() after heartbeat defuncts
155+
connection._requests = {}
156+
157+
# Simulate ResponseFuture timing out
158+
rf._on_timeout()
159+
self.assertRaisesRegexp(OperationTimedOut, "Connection defunct by heartbeat", rf.result)
160+
128161
def test_read_timeout_error_message(self):
129162
session = self.make_session()
130163
query = SimpleStatement("SELECT * FROM foo")

0 commit comments

Comments
 (0)