Skip to content

Commit 79935a5

Browse files
committed
Merge branch 'master' into ngdg_master_ft
2 parents 543b278 + 3a5bd85 commit 79935a5

File tree

8 files changed

+141
-169
lines changed

8 files changed

+141
-169
lines changed

tests/integration/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def _get_cass_version_from_dse(dse_version):
126126
cass_ver = '4.0.0.2349'
127127
else:
128128
cass_ver = '4.0.0.' + ''.join(dse_version.split('.'))
129-
elif dse_version.startswith('6.7'):
129+
elif Version(dse_version) >= Version('6.7'):
130130
if dse_version == '6.7.0':
131131
cass_ver = "4.0.0.67"
132132
else:
@@ -490,7 +490,13 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,
490490
if CCM_CLUSTER:
491491
log.debug("Using external CCM cluster {0}".format(CCM_CLUSTER.name))
492492
else:
493-
log.debug("Using unnamed external cluster")
493+
ccm_path = os.getenv("CCM_PATH", None)
494+
ccm_name = os.getenv("CCM_NAME", None)
495+
if ccm_path and ccm_name:
496+
CCM_CLUSTER = CCMClusterFactory.load(ccm_path, ccm_name)
497+
log.debug("Using external CCM cluster {0}".format(CCM_CLUSTER.name))
498+
else:
499+
log.debug("Using unnamed external cluster")
494500
if set_keyspace and start:
495501
setup_keyspace(ipformat=ipformat, wait=False)
496502
return

tests/integration/cqlengine/query/test_named.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ def test_named_table_with_mv(self):
335335
SELECT * FROM {0}.scores
336336
WHERE game IS NOT NULL AND score IS NOT NULL AND user IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL
337337
PRIMARY KEY (game, score, user, year, month, day)
338-
WITH CLUSTERING ORDER BY (score DESC)""".format(ks)
338+
WITH CLUSTERING ORDER BY (score DESC, user DESC, year DESC, month DESC, day DESC)""".format(ks)
339339

340340
self.session.execute(create_mv_alltime)
341341

tests/integration/long/test_failure_types.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import sys
1717
import traceback
1818
import time
19+
20+
from ccmlib.dse_cluster import DseCluster
1921
from mock import Mock
2022

2123
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy
@@ -29,7 +31,7 @@
2931
from tests.integration import (
3032
use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster,
3133
get_node, start_cluster_wait_for_up, requiresmallclockgranularity,
32-
)
34+
local)
3335

3436

3537
try:
@@ -40,6 +42,7 @@
4042
log = logging.getLogger(__name__)
4143

4244

45+
@local
4346
def setup_module():
4447
"""
4548
We need some custom setup for this module. All unit tests in this module
@@ -52,7 +55,7 @@ def setup_module():
5255
use_singledc(start=False)
5356
ccm_cluster = get_cluster()
5457
ccm_cluster.stop()
55-
config_options = {'tombstone_failure_threshold': 2000, 'tombstone_warn_threshold': 1000}
58+
config_options = {'guardrails.tombstone_failure_threshold': 2000, 'guardrails.tombstone_warn_threshold': 1000}
5659
ccm_cluster.set_configuration_options(config_options)
5760
start_cluster_wait_for_up(ccm_cluster)
5861
setup_keyspace()
@@ -252,7 +255,7 @@ def test_tombstone_overflow_read_failure(self):
252255
parameters = [(x,) for x in range(3000)]
253256
self.execute_concurrent_args_helper(self.session, statement, parameters)
254257

255-
statement = self.session.prepare("DELETE v1 FROM test3rf.test2 WHERE k = 1 AND v0 =?")
258+
statement = self.session.prepare("DELETE FROM test3rf.test2 WHERE k = 1 AND v0 =?")
256259
parameters = [(x,) for x in range(2001)]
257260
self.execute_concurrent_args_helper(self.session, statement, parameters)
258261

tests/integration/simulacron/test_cluster.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,20 @@
1616
except ImportError:
1717
import unittest # noqa
1818

19-
from tests.integration.simulacron import SimulacronCluster
20-
from tests.integration import (requiressimulacron, PROTOCOL_VERSION)
21-
from tests.integration.simulacron.utils import prime_query
19+
import logging
20+
from packaging.version import Version
21+
22+
import cassandra
23+
from tests.integration.simulacron import SimulacronCluster, SimulacronBase
24+
from tests.integration import (requiressimulacron, PROTOCOL_VERSION, DSE_VERSION, MockLoggingHandler)
25+
from tests.integration.simulacron.utils import prime_query, start_and_prime_singledc
2226

2327
from cassandra import (WriteTimeout, WriteType,
2428
ConsistencyLevel, UnresolvableContactPoints)
25-
from cassandra.cluster import Cluster
29+
from cassandra.cluster import Cluster, ControlConnection
30+
2631

32+
PROTOCOL_VERSION = min(4, PROTOCOL_VERSION if (DSE_VERSION is None or DSE_VERSION >= Version('5.0')) else 3)
2733

2834
@requiressimulacron
2935
class ClusterTests(SimulacronCluster):
@@ -78,3 +84,30 @@ def test_connection_with_only_unresolvable_contact_points(self):
7884
self.cluster = Cluster(['dns.invalid'],
7985
protocol_version=PROTOCOL_VERSION,
8086
compression=False)
87+
88+
89+
@requiressimulacron
90+
class DuplicateRpcTest(SimulacronCluster):
91+
connect = False
92+
93+
def test_duplicate(self):
94+
mock_handler = MockLoggingHandler()
95+
logger = logging.getLogger(cassandra.cluster.__name__)
96+
logger.addHandler(mock_handler)
97+
address_column = "native_transport_address" if DSE_VERSION and DSE_VERSION > Version("6.0") else "rpc_address"
98+
rows = [
99+
{"peer": "127.0.0.1", "data_center": "dc", "host_id": "dontcare1", "rack": "rack1",
100+
"release_version": "3.11.4", address_column: "127.0.0.1", "schema_version": "dontcare", "tokens": "1"},
101+
{"peer": "127.0.0.2", "data_center": "dc", "host_id": "dontcare2", "rack": "rack1",
102+
"release_version": "3.11.4", address_column: "127.0.0.2", "schema_version": "dontcare", "tokens": "2"},
103+
]
104+
prime_query(ControlConnection._SELECT_PEERS, rows=rows)
105+
106+
cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
107+
session = cluster.connect(wait_for_all_pools=True)
108+
109+
warnings = mock_handler.messages.get("warning")
110+
self.assertEqual(len(warnings), 1)
111+
self.assertTrue('multiple hosts with the same endpoint' in warnings[0])
112+
logger.removeHandler(mock_handler)
113+
cluster.shutdown()

tests/integration/standard/test_cluster.py

Lines changed: 65 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from packaging.version import Version
2828

2929
import cassandra
30-
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
30+
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT, ControlConnection
3131
from cassandra.concurrent import execute_concurrent
3232
from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
3333
RetryPolicy, SimpleConvictionPolicy, HostDistance,
@@ -502,79 +502,70 @@ def test_refresh_schema_type(self):
502502
@local
503503
@notwindows
504504
def test_refresh_schema_no_wait(self):
505-
contact_points = [CASSANDRA_IP]
506-
with Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10,
507-
contact_points=contact_points,
508-
execution_profiles=
509-
{EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=
510-
HostFilterPolicy(
511-
RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP
512-
))}) as cluster:
513-
session = cluster.connect()
514-
515-
schema_ver = session.execute("SELECT schema_version FROM system.local WHERE key='local'")[0][0]
516-
new_schema_ver = uuid4()
517-
session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (new_schema_ver,))
518-
519-
try:
520-
agreement_timeout = 1
521-
522-
# cluster agreement wait exceeded
523-
c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=agreement_timeout)
524-
c.connect()
525-
self.assertTrue(c.metadata.keyspaces)
526-
527-
# cluster agreement wait used for refresh
528-
original_meta = c.metadata.keyspaces
529-
start_time = time.time()
530-
self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata)
531-
end_time = time.time()
532-
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
533-
self.assertIs(original_meta, c.metadata.keyspaces)
534-
535-
# refresh wait overrides cluster value
536-
original_meta = c.metadata.keyspaces
537-
start_time = time.time()
538-
c.refresh_schema_metadata(max_schema_agreement_wait=0)
539-
end_time = time.time()
540-
self.assertLess(end_time - start_time, agreement_timeout)
541-
self.assertIsNot(original_meta, c.metadata.keyspaces)
542-
self.assertEqual(original_meta, c.metadata.keyspaces)
543-
544-
c.shutdown()
545-
546-
refresh_threshold = 0.5
547-
# cluster agreement bypass
548-
c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=0)
549-
start_time = time.time()
550-
s = c.connect()
551-
end_time = time.time()
552-
self.assertLess(end_time - start_time, refresh_threshold)
553-
self.assertTrue(c.metadata.keyspaces)
554-
555-
# cluster agreement wait used for refresh
556-
original_meta = c.metadata.keyspaces
557-
start_time = time.time()
558-
c.refresh_schema_metadata()
559-
end_time = time.time()
560-
self.assertLess(end_time - start_time, refresh_threshold)
561-
self.assertIsNot(original_meta, c.metadata.keyspaces)
562-
self.assertEqual(original_meta, c.metadata.keyspaces)
563-
564-
# refresh wait overrides cluster value
565-
original_meta = c.metadata.keyspaces
566-
start_time = time.time()
567-
self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata,
568-
max_schema_agreement_wait=agreement_timeout)
569-
end_time = time.time()
570-
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
571-
self.assertIs(original_meta, c.metadata.keyspaces)
572-
c.shutdown()
573-
finally:
574-
# TODO once fixed this connect call
575-
session = cluster.connect()
576-
session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (schema_ver,))
577-
505+
original_wait_for_responses = connection.Connection.wait_for_responses
506+
507+
def patched_wait_for_responses(*args, **kwargs):
508+
# When selecting schema version, replace the real schema UUID with an unexpected UUID
509+
response = original_wait_for_responses(*args, **kwargs)
510+
if len(args) > 2 and hasattr(args[2], "query") and args[2].query == "SELECT schema_version FROM system.local WHERE key='local'":
511+
new_uuid = uuid4()
512+
response[1].parsed_rows[0] = (new_uuid,)
513+
return response
514+
515+
with patch.object(connection.Connection, "wait_for_responses", patched_wait_for_responses):
516+
agreement_timeout = 1
517+
518+
# cluster agreement wait exceeded
519+
c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=agreement_timeout)
520+
c.connect()
521+
self.assertTrue(c.metadata.keyspaces)
522+
523+
# cluster agreement wait used for refresh
524+
original_meta = c.metadata.keyspaces
525+
start_time = time.time()
526+
self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata)
527+
end_time = time.time()
528+
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
529+
self.assertIs(original_meta, c.metadata.keyspaces)
530+
531+
# refresh wait overrides cluster value
532+
original_meta = c.metadata.keyspaces
533+
start_time = time.time()
534+
c.refresh_schema_metadata(max_schema_agreement_wait=0)
535+
end_time = time.time()
536+
self.assertLess(end_time - start_time, agreement_timeout)
537+
self.assertIsNot(original_meta, c.metadata.keyspaces)
538+
self.assertEqual(original_meta, c.metadata.keyspaces)
539+
540+
c.shutdown()
541+
542+
refresh_threshold = 0.5
543+
# cluster agreement bypass
544+
c = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=0)
545+
start_time = time.time()
546+
s = c.connect()
547+
end_time = time.time()
548+
self.assertLess(end_time - start_time, refresh_threshold)
549+
self.assertTrue(c.metadata.keyspaces)
550+
551+
# cluster agreement wait used for refresh
552+
original_meta = c.metadata.keyspaces
553+
start_time = time.time()
554+
c.refresh_schema_metadata()
555+
end_time = time.time()
556+
self.assertLess(end_time - start_time, refresh_threshold)
557+
self.assertIsNot(original_meta, c.metadata.keyspaces)
558+
self.assertEqual(original_meta, c.metadata.keyspaces)
559+
560+
# refresh wait overrides cluster value
561+
original_meta = c.metadata.keyspaces
562+
start_time = time.time()
563+
self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata,
564+
max_schema_agreement_wait=agreement_timeout)
565+
end_time = time.time()
566+
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
567+
self.assertIs(original_meta, c.metadata.keyspaces)
568+
c.shutdown()
578569

579570
def test_trace(self):
580571
"""
@@ -1480,52 +1471,6 @@ def test_prepare_on_ignored_hosts(self):
14801471
cluster.shutdown()
14811472

14821473

1483-
@local
1484-
class DuplicateRpcTest(unittest.TestCase):
1485-
1486-
load_balancing_policy = HostFilterPolicy(RoundRobinPolicy(),
1487-
lambda host: host.address == "127.0.0.1")
1488-
1489-
def setUp(self):
1490-
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION,
1491-
execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=self.load_balancing_policy)})
1492-
self.session = self.cluster.connect()
1493-
1494-
self.address_column = "native_transport_address" if DSE_VERSION and DSE_VERSION >= Version("6.0") else "rpc_address"
1495-
self.session.execute("UPDATE system.peers SET {} = '127.0.0.1' WHERE peer='127.0.0.2'".
1496-
format(self.address_column))
1497-
1498-
def tearDown(self):
1499-
self.session.execute("UPDATE system.peers SET {} = '127.0.0.2' WHERE peer='127.0.0.2'".
1500-
format(self.address_column))
1501-
self.cluster.shutdown()
1502-
1503-
def test_duplicate(self):
1504-
"""
1505-
Test duplicate RPC addresses.
1506-
1507-
Modifies the system.peers table to make hosts have the same rpc address. Ensures such hosts are filtered out and a message is logged
1508-
1509-
@since 3.4
1510-
@jira_ticket PYTHON-366
1511-
@expected_result only one hosts' metadata will be populated
1512-
1513-
@test_category metadata
1514-
"""
1515-
mock_handler = MockLoggingHandler()
1516-
logger = logging.getLogger(cassandra.cluster.__name__)
1517-
logger.addHandler(mock_handler)
1518-
test_cluster = Cluster(protocol_version=PROTOCOL_VERSION,
1519-
execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=self.load_balancing_policy)})
1520-
1521-
test_cluster.connect()
1522-
warnings = mock_handler.messages.get("warning")
1523-
self.assertEqual(len(warnings), 1)
1524-
self.assertTrue('multiple' in warnings[0])
1525-
logger.removeHandler(mock_handler)
1526-
test_cluster.shutdown()
1527-
1528-
15291474
@protocolv5
15301475
class BetaProtocolTest(unittest.TestCase):
15311476

tests/integration/standard/test_control_connection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#
1515
#
1616
#
17+
from cassandra import InvalidRequest
1718

1819
try:
1920
import unittest2 as unittest
@@ -43,7 +44,7 @@ def setUp(self):
4344
def tearDown(self):
4445
try:
4546
self.session.execute("DROP KEYSPACE keyspacetodrop ")
46-
except (ConfigurationException):
47+
except (ConfigurationException, InvalidRequest):
4748
# we already removed the keyspace.
4849
pass
4950
self.cluster.shutdown()

0 commit comments

Comments
 (0)