Skip to content

Commit 14f78b5

Browse files
Don't create Host instances with random host_id
Previously, we used endpoints provided to the cluster to create Host instances with random host_ids in order to populate the LBP before the ControlConnection was established. This logic led to creating many connections that were opened and then quickly closed, because once we learned the correct host_ids from system.peers, we removed the old Hosts with random IDs and created new ones with the proper host_ids. This commit introduces a new approach. To establish the ControlConnection, we now use only the resolved contact points from the cluster configuration. Only after a successful connection do we populate Host information in the LBP. If the LBP is already initialized during ControlConnection reconnection, we reuse the existing values.
1 parent 6de799f commit 14f78b5

File tree

3 files changed

+25
-33
lines changed

3 files changed

+25
-33
lines changed

cassandra/cluster.py

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831683
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
16841684
self.protocol_version = new_version
16851685

1686-
def _add_resolved_hosts(self):
1687-
for endpoint in self.endpoints_resolved:
1688-
host, new = self.add_host(endpoint, signal=False)
1689-
if new:
1690-
host.set_up()
1691-
for listener in self.listeners:
1692-
listener.on_add(host)
1693-
1686+
def _populate_hosts(self):
16941687
self.profile_manager.populate(
16951688
weakref.proxy(self), self.metadata.all_hosts())
16961689
self.load_balancing_policy.populate(
@@ -1718,16 +1711,9 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17181711
self.connection_class.initialize_reactor()
17191712
_register_cluster_shutdown(self)
17201713

1721-
self._add_resolved_hosts()
1722-
17231714
try:
17241715
self.control_connection.connect()
1725-
1726-
# we set all contact points up for connecting, but we won't infer state after this
1727-
for endpoint in self.endpoints_resolved:
1728-
h = self.metadata.get_host(endpoint)
1729-
if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
1730-
h.is_up = None
1716+
self._populate_hosts()
17311717

17321718
log.debug("Control connection created")
17331719
except Exception:
@@ -3535,20 +3521,18 @@ def _set_new_connection(self, conn):
35353521
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35363522
old.close()
35373523

3538-
def _connect_host_in_lbp(self):
3524+
def _try_connect_to_hosts(self):
35393525
errors = {}
3540-
lbp = (
3541-
self._cluster.load_balancing_policy
3542-
if self._cluster._config_mode == _ConfigMode.LEGACY else
3543-
self._cluster._default_load_balancing_policy
3544-
)
35453526

3546-
for host in lbp.make_query_plan():
3527+
lbp = self._cluster.load_balancing_policy \
3528+
if self._cluster._config_mode == _ConfigMode.LEGACY else self._cluster._default_load_balancing_policy
3529+
3530+
for endpoint in chain((host.endpoint for host in lbp.make_query_plan()), self._cluster.endpoints_resolved):
35473531
try:
3548-
return (self._try_connect(host.endpoint), None)
3532+
return (self._try_connect(endpoint), None)
35493533
except Exception as exc:
3550-
errors[str(host.endpoint)] = exc
3551-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3534+
errors[str(endpoint)] = exc
3535+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
35523536
if self._is_shutdown:
35533537
raise DriverException("[control connection] Reconnection in progress during shutdown")
35543538

@@ -3563,16 +3547,16 @@ def _reconnect_internal(self):
35633547
to the exception that was raised when an attempt was made to open
35643548
a connection to that host.
35653549
"""
3566-
(conn, _) = self._connect_host_in_lbp()
3550+
(conn, _) = self._try_connect_to_hosts()
35673551
if conn is not None:
35683552
return conn
35693553

35703554
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
35713555
self._cluster._resolve_hostnames()
35723556

3573-
self._cluster._add_resolved_hosts()
3557+
self._cluster._populate_hosts()
35743558

3575-
(conn, errors) = self._connect_host_in_lbp()
3559+
(conn, errors) = self._try_connect_to_hosts()
35763560
if conn is not None:
35773561
return conn
35783562

@@ -3817,7 +3801,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38173801
tokens = local_row.get("tokens")
38183802

38193803
host = self._cluster.metadata.get_host(connection.original_endpoint)
3820-
if host:
3804+
if not host:
3805+
log.info("[control connection] Local host %s not found in metadata, adding it", connection.original_endpoint)
3806+
peers_result.append(local_row)
3807+
else:
38213808
datacenter = local_row.get("data_center")
38223809
rack = local_row.get("rack")
38233810
self._update_location_info(host, datacenter, rack)
@@ -4173,8 +4160,9 @@ def _get_peers_query(self, peers_query_type, connection=None):
41734160
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
41744161
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
41754162
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
4176-
host_release_version = self._cluster.metadata.get_host(connection.original_endpoint).release_version
4177-
host_dse_version = self._cluster.metadata.get_host(connection.original_endpoint).dse_version
4163+
original_endpoint_host = self._cluster.metadata.get_host(connection.original_endpoint)
4164+
host_release_version = None if original_endpoint_host is None else original_endpoint_host.release_version
4165+
host_dse_version = None if original_endpoint_host is None else original_endpoint_host.dse_version
41784166
uses_native_address_query = (
41794167
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
41804168

cassandra/metadata.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ def export_schema_as_string(self):
139139
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
140140
metadata_request_timeout=None, **kwargs):
141141

142+
# If the host is not in metadata, we can't proceed, hosts should be added after succesfully establishing control connection
143+
if not self.get_host(connection.original_endpoint):
144+
return
145+
142146
server_version = self.get_host(connection.original_endpoint).release_version
143147
dse_version = self.get_host(connection.original_endpoint).dse_version
144148
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)

cassandra/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
176176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
177177
self.conviction_policy = conviction_policy_factory(self)
178178
if not host_id:
179-
host_id = uuid.uuid4()
179+
raise ValueError("host_id may not be None")
180180
self.host_id = host_id
181181
self.set_location_info(datacenter, rack)
182182
self.lock = RLock()

0 commit comments

Comments
 (0)