Skip to content

Commit 12fdb74

Browse files
Don't create Host instances with random host_id
Let control connection use resolved contact points from cluster config if lbp is not yet initialized.
1 parent 0173fc3 commit 12fdb74

File tree

3 files changed

+28
-40
lines changed

3 files changed

+28
-40
lines changed

cassandra/cluster.py

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831683
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
16841684
self.protocol_version = new_version
16851685

1686-
def _add_resolved_hosts(self):
1687-
for endpoint in self.endpoints_resolved:
1688-
host, new = self.add_host(endpoint, signal=False)
1689-
if new:
1690-
host.set_up()
1691-
for listener in self.listeners:
1692-
listener.on_add(host)
1693-
1686+
def _populate_hosts(self):
16941687
self.profile_manager.populate(
16951688
weakref.proxy(self), self.metadata.all_hosts())
16961689
self.load_balancing_policy.populate(
@@ -1717,17 +1710,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17171710
self.contact_points, self.protocol_version)
17181711
self.connection_class.initialize_reactor()
17191712
_register_cluster_shutdown(self)
1720-
1721-
self._add_resolved_hosts()
17221713

17231714
try:
17241715
self.control_connection.connect()
1725-
1726-
# we set all contact points up for connecting, but we won't infer state after this
1727-
for endpoint in self.endpoints_resolved:
1728-
h = self.metadata.get_host(endpoint)
1729-
if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
1730-
h.is_up = None
1716+
self._populate_hosts()
17311717

17321718
log.debug("Control connection created")
17331719
except Exception:
@@ -3534,28 +3520,22 @@ def _set_new_connection(self, conn):
35343520
if old:
35353521
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35363522
old.close()
3537-
3538-
def _connect_host_in_lbp(self):
3523+
3524+
def _try_connect_to_hosts(self):
35393525
errors = {}
3540-
lbp = (
3541-
self._cluster.load_balancing_policy
3542-
if self._cluster._config_mode == _ConfigMode.LEGACY else
3543-
self._cluster._default_load_balancing_policy
3544-
)
35453526

3546-
for host in lbp.make_query_plan():
3527+
lbp = self._cluster.load_balancing_policy \
3528+
if self._cluster._config_mode == _ConfigMode.LEGACY else self._cluster._default_load_balancing_policy
3529+
3530+
for endpoint in chain((host.endpoint for host in lbp.make_query_plan()), self._cluster.endpoints_resolved):
35473531
try:
3548-
return (self._try_connect(host.endpoint), None)
3549-
except ConnectionException as exc:
3550-
errors[str(host.endpoint)] = exc
3551-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3552-
self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
3532+
return (self._try_connect(endpoint), None)
35533533
except Exception as exc:
3554-
errors[str(host.endpoint)] = exc
3555-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3534+
errors[str(endpoint)] = exc
3535+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
35563536
if self._is_shutdown:
35573537
raise DriverException("[control connection] Reconnection in progress during shutdown")
3558-
3538+
35593539
return (None, errors)
35603540

35613541
def _reconnect_internal(self):
@@ -3567,19 +3547,19 @@ def _reconnect_internal(self):
35673547
to the exception that was raised when an attempt was made to open
35683548
a connection to that host.
35693549
"""
3570-
(conn, _) = self._connect_host_in_lbp()
3550+
(conn, _) = self._try_connect_to_hosts()
35713551
if conn is not None:
35723552
return conn
35733553

35743554
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
35753555
self._cluster._resolve_hostnames()
35763556

3577-
self._cluster._add_resolved_hosts()
3557+
self._cluster._populate_hosts()
35783558

3579-
(conn, errors) = self._connect_host_in_lbp()
3559+
(conn, errors) = self._try_connect_to_hosts()
35803560
if conn is not None:
35813561
return conn
3582-
3562+
35833563
raise NoHostAvailable("Unable to connect to any servers", errors)
35843564

35853565
def _try_connect(self, endpoint):
@@ -3821,7 +3801,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38213801
tokens = local_row.get("tokens")
38223802

38233803
host = self._cluster.metadata.get_host(connection.original_endpoint)
3824-
if host:
3804+
if not host:
3805+
log.info("[control connection] Local host %s not found in metadata, adding it", connection.original_endpoint)
3806+
peers_result.append(local_row)
3807+
else:
38253808
datacenter = local_row.get("data_center")
38263809
rack = local_row.get("rack")
38273810
self._update_location_info(host, datacenter, rack)
@@ -4177,8 +4160,9 @@ def _get_peers_query(self, peers_query_type, connection=None):
41774160
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
41784161
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
41794162
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
4180-
host_release_version = self._cluster.metadata.get_host(connection.original_endpoint).release_version
4181-
host_dse_version = self._cluster.metadata.get_host(connection.original_endpoint).dse_version
4163+
original_endpoint_host = self._cluster.metadata.get_host(connection.original_endpoint)
4164+
host_release_version = None if original_endpoint_host is None else original_endpoint_host.release_version
4165+
host_dse_version = None if original_endpoint_host is None else original_endpoint_host.dse_version
41824166
uses_native_address_query = (
41834167
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
41844168

cassandra/metadata.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ def export_schema_as_string(self):
139139
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
140140
metadata_request_timeout=None, **kwargs):
141141

142+
# If the host is not in metadata, we can't proceed, hosts should be added after succesfully establishing control connection
143+
if not self.get_host(connection.original_endpoint):
144+
return
145+
142146
server_version = self.get_host(connection.original_endpoint).release_version
143147
dse_version = self.get_host(connection.original_endpoint).dse_version
144148
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)

cassandra/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
176176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
177177
self.conviction_policy = conviction_policy_factory(self)
178178
if not host_id:
179-
host_id = uuid.uuid4()
179+
raise ValueError("host_id may not be None")
180180
self.host_id = host_id
181181
self.set_location_info(datacenter, rack)
182182
self.lock = RLock()

0 commit comments

Comments
 (0)