Skip to content

Commit adddec1

Browse files
Don't create Host instances with random host_id
Let control connection use resolved contact points from cluster config if lbp is not yet initialized.
1 parent dd1adc7 commit adddec1

File tree

9 files changed

+81
-54
lines changed

9 files changed

+81
-54
lines changed

cassandra/cluster.py

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831683
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
16841684
self.protocol_version = new_version
16851685

1686-
def _add_resolved_hosts(self):
1687-
for endpoint in self.endpoints_resolved:
1688-
host, new = self.add_host(endpoint, signal=False)
1689-
if new:
1690-
host.set_up()
1691-
for listener in self.listeners:
1692-
listener.on_add(host)
1693-
1686+
def _populate_hosts(self):
16941687
self.profile_manager.populate(
16951688
weakref.proxy(self), self.metadata.all_hosts())
16961689
self.load_balancing_policy.populate(
@@ -1717,17 +1710,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17171710
self.contact_points, self.protocol_version)
17181711
self.connection_class.initialize_reactor()
17191712
_register_cluster_shutdown(self)
1720-
1721-
self._add_resolved_hosts()
17221713

17231714
try:
17241715
self.control_connection.connect()
1725-
1726-
# we set all contact points up for connecting, but we won't infer state after this
1727-
for endpoint in self.endpoints_resolved:
1728-
h = self.metadata.get_host(endpoint)
1729-
if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
1730-
h.is_up = None
1716+
self._populate_hosts()
17311717

17321718
log.debug("Control connection created")
17331719
except Exception:
@@ -3534,18 +3520,20 @@ def _set_new_connection(self, conn):
35343520
if old:
35353521
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35363522
old.close()
3537-
3538-
def _connect_host_in_lbp(self):
3523+
3524+
def _connect_host(self):
35393525
errors = {}
3526+
35403527
lbp = (
35413528
self._cluster.load_balancing_policy
35423529
if self._cluster._config_mode == _ConfigMode.LEGACY else
35433530
self._cluster._default_load_balancing_policy
35443531
)
35453532

3533+
# use endpoints from the default LBP if it is already initialized
35463534
for host in lbp.make_query_plan():
35473535
try:
3548-
return (self._try_connect(host), None)
3536+
return (self._try_connect(host.endpoint), None)
35493537
except ConnectionException as exc:
35503538
errors[str(host.endpoint)] = exc
35513539
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
@@ -3555,7 +3543,22 @@ def _connect_host_in_lbp(self):
35553543
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
35563544
if self._is_shutdown:
35573545
raise DriverException("[control connection] Reconnection in progress during shutdown")
3558-
3546+
3547+
# if lbp not initialized use contact points provided to the cluster
3548+
if len(errors) == 0:
3549+
for endpoint in self._cluster.endpoints_resolved:
3550+
try:
3551+
return (self._try_connect(endpoint), None)
3552+
except ConnectionException as exc:
3553+
errors[str(endpoint)] = exc
3554+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
3555+
self._cluster.signal_connection_failure(endpoint, exc, is_host_addition=False)
3556+
except Exception as exc:
3557+
errors[str(endpoint)] = exc
3558+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
3559+
if self._is_shutdown:
3560+
raise DriverException("[control connection] Reconnection in progress during shutdown")
3561+
35593562
return (None, errors)
35603563

35613564
def _reconnect_internal(self):
@@ -3567,43 +3570,43 @@ def _reconnect_internal(self):
35673570
to the exception that was raised when an attempt was made to open
35683571
a connection to that host.
35693572
"""
3570-
(conn, _) = self._connect_host_in_lbp()
3573+
(conn, _) = self._connect_host()
35713574
if conn is not None:
35723575
return conn
35733576

35743577
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
35753578
self._cluster._resolve_hostnames()
35763579

3577-
self._cluster._add_resolved_hosts()
3580+
self._cluster._populate_hosts()
35783581

3579-
(conn, errors) = self._connect_host_in_lbp()
3582+
(conn, errors) = self._connect_host()
35803583
if conn is not None:
35813584
return conn
3582-
3585+
35833586
raise NoHostAvailable("Unable to connect to any servers", errors)
35843587

3585-
def _try_connect(self, host):
3588+
def _try_connect(self, endpoint):
35863589
"""
35873590
Creates a new Connection, registers for pushed events, and refreshes
35883591
node/token and schema metadata.
35893592
"""
3590-
log.debug("[control connection] Opening new connection to %s", host)
3593+
log.debug("[control connection] Opening new connection to %s", endpoint)
35913594

35923595
while True:
35933596
try:
3594-
connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
3597+
connection = self._cluster.connection_factory(endpoint, is_control_connection=True)
35953598
if self._is_shutdown:
35963599
connection.close()
35973600
raise DriverException("Reconnecting during shutdown")
35983601
break
35993602
except ProtocolVersionUnsupported as e:
3600-
self._cluster.protocol_downgrade(host.endpoint, e.startup_version)
3603+
self._cluster.protocol_downgrade(endpoint, e.startup_version)
36013604
except ProtocolException as e:
36023605
# protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver
36033606
# protocol version. If the protocol version was not explicitly specified,
36043607
# and that the server raises a beta protocol error, we should downgrade.
36053608
if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error:
3606-
self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version)
3609+
self._cluster.protocol_downgrade(endpoint, self._cluster.protocol_version)
36073610
else:
36083611
raise
36093612

@@ -3879,6 +3882,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38793882

38803883
self._cluster.metadata.update_host(host, old_endpoint=connection.endpoint)
38813884
connection.original_endpoint = connection.endpoint = host.endpoint
3885+
else:
3886+
log.info("Consider local host new found host")
3887+
peers_result.append(local_row)
38823888
# Check metadata.partitioner to see if we haven't built anything yet. If
38833889
# every node in the cluster was in the contact points, we won't discover
38843890
# any new nodes, so we need this additional check. (See PYTHON-90)
@@ -4177,8 +4183,8 @@ def _get_peers_query(self, peers_query_type, connection=None):
41774183
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
41784184
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
41794185
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
4180-
host_release_version = self._cluster.metadata.get_host(connection.original_endpoint).release_version
4181-
host_dse_version = self._cluster.metadata.get_host(connection.original_endpoint).dse_version
4186+
host_release_version = None if self._cluster.metadata.get_host(connection.original_endpoint) == None else self._cluster.metadata.get_host(connection.original_endpoint).release_version
4187+
host_dse_version = None if self._cluster.metadata.get_host(connection.original_endpoint) == None else self._cluster.metadata.get_host(connection.original_endpoint).dse_version
41824188
uses_native_address_query = (
41834189
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
41844190

cassandra/metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ def export_schema_as_string(self):
139139
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
140140
metadata_request_timeout=None, **kwargs):
141141

142+
if not self.get_host(connection.original_endpoint):
143+
return
144+
142145
server_version = self.get_host(connection.original_endpoint).release_version
143146
dse_version = self.get_host(connection.original_endpoint).dse_version
144147
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)

cassandra/policies.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def populate(self, cluster, hosts):
264264

265265
def distance(self, host):
266266
dc = self._dc(host)
267+
if not self.local_dc:
268+
self.local_dc = dc
269+
return HostDistance.LOCAL
267270
if dc == self.local_dc:
268271
return HostDistance.LOCAL
269272

cassandra/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
176176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
177177
self.conviction_policy = conviction_policy_factory(self)
178178
if not host_id:
179-
host_id = uuid.uuid4()
179+
raise ValueError("host_id may not be None")
180180
self.host_id = host_id
181181
self.set_location_info(datacenter, rack)
182182
self.lock = RLock()

tests/integration/standard/test_cluster.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -900,8 +900,9 @@ def test_profile_lb_swap(self):
900900
"""
901901
Tests that profile load balancing policies are not shared
902902
903-
Creates two LBP, runs a few queries, and validates that each LBP is execised
904-
seperately between EP's
903+
Creates two LBP, runs a few queries, and validates that each LBP is exercised
904+
separately between EP's. Each RoundRobinPolicy starts from its own random
905+
position and maintains independent round-robin ordering.
905906
906907
@since 3.5
907908
@jira_ticket PYTHON-569
@@ -916,17 +917,28 @@ def test_profile_lb_swap(self):
916917
with TestCluster(execution_profiles=exec_profiles) as cluster:
917918
session = cluster.connect(wait_for_all_pools=True)
918919

919-
# default is DCA RR for all hosts
920920
expected_hosts = set(cluster.metadata.all_hosts())
921-
rr1_queried_hosts = set()
922-
rr2_queried_hosts = set()
923-
924-
rs = session.execute(query, execution_profile='rr1')
925-
rr1_queried_hosts.add(rs.response_future._current_host)
926-
rs = session.execute(query, execution_profile='rr2')
927-
rr2_queried_hosts.add(rs.response_future._current_host)
928-
929-
assert rr2_queried_hosts == rr1_queried_hosts
921+
num_hosts = len(expected_hosts)
922+
assert num_hosts > 1, "Need at least 2 hosts for this test"
923+
924+
rr1_queried_hosts = []
925+
rr2_queried_hosts = []
926+
927+
for _ in range(num_hosts * 2):
928+
rs = session.execute(query, execution_profile='rr1')
929+
rr1_queried_hosts.append(rs.response_future._current_host)
930+
rs = session.execute(query, execution_profile='rr2')
931+
rr2_queried_hosts.append(rs.response_future._current_host)
932+
933+
# Both policies should have queried all hosts
934+
assert set(rr1_queried_hosts) == expected_hosts
935+
assert set(rr2_queried_hosts) == expected_hosts
936+
937+
# The order of hosts should demonstrate round-robin behavior
938+
# After num_hosts queries, the pattern should repeat
939+
for i in range(num_hosts):
940+
assert rr1_queried_hosts[i] == rr1_queried_hosts[i + num_hosts]
941+
assert rr2_queried_hosts[i] == rr2_queried_hosts[i + num_hosts]
930942

931943
def test_ta_lbp(self):
932944
"""

tests/integration/standard/test_control_connection.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,12 @@ def test_get_control_connection_host(self):
101101

102102
# reconnect and make sure that the new host is reflected correctly
103103
self.cluster.control_connection._reconnect()
104-
new_host = self.cluster.get_control_connection_host()
105-
assert host != new_host
104+
new_host1 = self.cluster.get_control_connection_host()
105+
106+
self.cluster.control_connection._reconnect()
107+
new_host2 = self.cluster.get_control_connection_host()
108+
109+
assert new_host1 != new_host2
106110

107111
# TODO: enable after https://github.com/scylladb/python-driver/issues/121 is fixed
108112
@unittest.skip('Fails on scylla due to the broadcast_rpc_port is None')

tests/integration/standard/test_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from cassandra.query import SimpleStatement
2323
from cassandra import ConsistencyLevel, WriteTimeout, Unavailable, ReadTimeout
24-
from cassandra.protocol import SyntaxException
24+
from cassandra.protocol import SyntaxException, UnavailableErrorMessage
2525

2626
from cassandra.cluster import NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
2727
from tests.integration import get_cluster, get_node, use_singledc, execute_until_pass, TestCluster
@@ -218,7 +218,7 @@ def test_metrics_per_cluster(self):
218218
try:
219219
# Test write
220220
query = SimpleStatement("INSERT INTO {0}.{0} (k, v) VALUES (2, 2)".format(self.ks_name), consistency_level=ConsistencyLevel.ALL)
221-
with pytest.raises(WriteTimeout):
221+
with pytest.raises((WriteTimeout, UnavailableErrorMessage)):
222222
self.session.execute(query, timeout=None)
223223
finally:
224224
get_node(1).resume()

tests/integration/standard/test_policies.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ def test_predicate_changes(self):
4545
external_event = True
4646
contact_point = DefaultEndPoint("127.0.0.1")
4747

48-
single_host = {Host(contact_point, SimpleConvictionPolicy)}
49-
all_hosts = {Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in (1, 2, 3)}
50-
5148
predicate = lambda host: host.endpoint == contact_point if external_event else True
5249
hfp = ExecutionProfile(
5350
load_balancing_policy=HostFilterPolicy(RoundRobinPolicy(), predicate=predicate)
@@ -62,7 +59,8 @@ def test_predicate_changes(self):
6259
response = session.execute("SELECT * from system.local WHERE key='local'")
6360
queried_hosts.update(response.response_future.attempted_hosts)
6461

65-
assert queried_hosts == single_host
62+
assert len(queried_hosts) == 1
63+
assert queried_hosts.pop().endpoint == contact_point
6664

6765
external_event = False
6866
futures = session.update_created_pools()
@@ -72,7 +70,7 @@ def test_predicate_changes(self):
7270
for _ in range(10):
7371
response = session.execute("SELECT * from system.local WHERE key='local'")
7472
queried_hosts.update(response.response_future.attempted_hosts)
75-
assert queried_hosts == all_hosts
73+
assert len(queried_hosts) == 3
7674

7775

7876
class WhiteListRoundRobinPolicyTests(unittest.TestCase):

tests/integration/standard/test_query.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,8 @@ def make_query_plan(self, working_keyspace=None, query=None):
460460
live_hosts = sorted(list(self._live_hosts))
461461
host = []
462462
try:
463-
host = [live_hosts[self.host_index_to_use]]
463+
if len(live_hosts) > self.host_index_to_use:
464+
host = [live_hosts[self.host_index_to_use]]
464465
except IndexError as e:
465466
raise IndexError(
466467
'You specified an index larger than the number of hosts. Total hosts: {}. Index specified: {}'.format(

0 commit comments

Comments
 (0)