Skip to content

Commit ef382b9

Browse files
Don't create Host instances with random host_id
1 parent dd1adc7 commit ef382b9

File tree

5 files changed

+44
-48
lines changed

5 files changed

+44
-48
lines changed

cassandra/cluster.py

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,7 +1464,9 @@ def __init__(self,
14641464
self.schema_event_refresh_window, self.topology_event_refresh_window,
14651465
self.status_event_refresh_window,
14661466
schema_metadata_enabled, token_metadata_enabled,
1467-
schema_meta_page_size=schema_metadata_page_size)
1467+
schema_meta_page_size=schema_metadata_page_size,
1468+
resolved_contact_points=self.endpoints_resolved,
1469+
)
14681470

14691471
if client_id is None:
14701472
self.client_id = uuid.uuid4()
@@ -1683,14 +1685,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831685
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
16841686
self.protocol_version = new_version
16851687

1686-
def _add_resolved_hosts(self):
1687-
for endpoint in self.endpoints_resolved:
1688-
host, new = self.add_host(endpoint, signal=False)
1689-
if new:
1690-
host.set_up()
1691-
for listener in self.listeners:
1692-
listener.on_add(host)
1693-
1688+
def _populate_hosts(self):
16941689
self.profile_manager.populate(
16951690
weakref.proxy(self), self.metadata.all_hosts())
16961691
self.load_balancing_policy.populate(
@@ -1717,17 +1712,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17171712
self.contact_points, self.protocol_version)
17181713
self.connection_class.initialize_reactor()
17191714
_register_cluster_shutdown(self)
1720-
1721-
self._add_resolved_hosts()
17221715

17231716
try:
17241717
self.control_connection.connect()
1725-
1726-
# we set all contact points up for connecting, but we won't infer state after this
1727-
for endpoint in self.endpoints_resolved:
1728-
h = self.metadata.get_host(endpoint)
1729-
if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
1730-
h.is_up = None
1718+
self._populate_hosts()
17311719

17321720
log.debug("Control connection created")
17331721
except Exception:
@@ -3483,6 +3471,8 @@ class PeersQueryType(object):
34833471
_uses_peers_v2 = True
34843472
_tablets_routing_v1 = False
34853473

3474+
_resolved_contact_points = None
3475+
34863476
# for testing purposes
34873477
_time = time
34883478

@@ -3492,7 +3482,8 @@ def __init__(self, cluster, timeout,
34923482
status_event_refresh_window,
34933483
schema_meta_enabled=True,
34943484
token_meta_enabled=True,
3495-
schema_meta_page_size=1000):
3485+
schema_meta_page_size=1000,
3486+
resolved_contact_points=None):
34963487
# use a weak reference to allow the Cluster instance to be GC'ed (and
34973488
# shutdown) since implementing __del__ disables the cycle detector
34983489
self._cluster = weakref.proxy(cluster)
@@ -3514,6 +3505,8 @@ def __init__(self, cluster, timeout,
35143505

35153506
self._event_schedule_times = {}
35163507

3508+
self._resolved_contact_points = resolved_contact_points
3509+
35173510
def connect(self):
35183511
if self._is_shutdown:
35193512
return
@@ -3534,28 +3527,23 @@ def _set_new_connection(self, conn):
35343527
if old:
35353528
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35363529
old.close()
3537-
3538-
def _connect_host_in_lbp(self):
3530+
3531+
def _connect_host(self):
35393532
errors = {}
3540-
lbp = (
3541-
self._cluster.load_balancing_policy
3542-
if self._cluster._config_mode == _ConfigMode.LEGACY else
3543-
self._cluster._default_load_balancing_policy
3544-
)
35453533

3546-
for host in lbp.make_query_plan():
3534+
for endpoint in self._resolved_contact_points:
35473535
try:
3548-
return (self._try_connect(host), None)
3536+
return (self._try_connect(endpoint), None)
35493537
except ConnectionException as exc:
3550-
errors[str(host.endpoint)] = exc
3551-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3552-
self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
3538+
errors[str(endpoint)] = exc
3539+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
3540+
self._cluster.signal_connection_failure(endpoint, exc, is_host_addition=False)
35533541
except Exception as exc:
3554-
errors[str(host.endpoint)] = exc
3555-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3542+
errors[str(endpoint)] = exc
3543+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
35563544
if self._is_shutdown:
35573545
raise DriverException("[control connection] Reconnection in progress during shutdown")
3558-
3546+
35593547
return (None, errors)
35603548

35613549
def _reconnect_internal(self):
@@ -3567,43 +3555,44 @@ def _reconnect_internal(self):
35673555
to the exception that was raised when an attempt was made to open
35683556
a connection to that host.
35693557
"""
3570-
(conn, _) = self._connect_host_in_lbp()
3558+
(conn, _) = self._connect_host()
35713559
if conn is not None:
35723560
return conn
35733561

35743562
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
35753563
self._cluster._resolve_hostnames()
3564+
self._resolved_contact_points = self._cluster.endpoints_resolved
35763565

3577-
self._cluster._add_resolved_hosts()
3566+
self._cluster._populate_hosts()
35783567

3579-
(conn, errors) = self._connect_host_in_lbp()
3568+
(conn, errors) = self._connect_host()
35803569
if conn is not None:
35813570
return conn
3582-
3571+
35833572
raise NoHostAvailable("Unable to connect to any servers", errors)
35843573

3585-
def _try_connect(self, host):
3574+
def _try_connect(self, endpoint):
35863575
"""
35873576
Creates a new Connection, registers for pushed events, and refreshes
35883577
node/token and schema metadata.
35893578
"""
3590-
log.debug("[control connection] Opening new connection to %s", host)
3579+
log.debug("[control connection] Opening new connection to %s", endpoint)
35913580

35923581
while True:
35933582
try:
3594-
connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
3583+
connection = self._cluster.connection_factory(endpoint, is_control_connection=True)
35953584
if self._is_shutdown:
35963585
connection.close()
35973586
raise DriverException("Reconnecting during shutdown")
35983587
break
35993588
except ProtocolVersionUnsupported as e:
3600-
self._cluster.protocol_downgrade(host.endpoint, e.startup_version)
3589+
self._cluster.protocol_downgrade(endpoint, e.startup_version)
36013590
except ProtocolException as e:
36023591
# protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver
36033592
# protocol version. If the protocol version was not explicitly specified,
36043593
# and that the server raises a beta protocol error, we should downgrade.
36053594
if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error:
3606-
self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version)
3595+
self._cluster.protocol_downgrade(endpoint, self._cluster.protocol_version)
36073596
else:
36083597
raise
36093598

@@ -3879,6 +3868,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38793868

38803869
self._cluster.metadata.update_host(host, old_endpoint=connection.endpoint)
38813870
connection.original_endpoint = connection.endpoint = host.endpoint
3871+
else:
3872+
log.info("Consider local host new found host")
3873+
peers_result.append(local_row)
38823874
# Check metadata.partitioner to see if we haven't built anything yet. If
38833875
# every node in the cluster was in the contact points, we won't discover
38843876
# any new nodes, so we need this additional check. (See PYTHON-90)

cassandra/metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ def export_schema_as_string(self):
139139
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
140140
metadata_request_timeout=None, **kwargs):
141141

142+
if not self.get_host(connection.original_endpoint):
143+
return
144+
142145
server_version = self.get_host(connection.original_endpoint).release_version
143146
dse_version = self.get_host(connection.original_endpoint).dse_version
144147
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)

cassandra/policies.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def populate(self, cluster, hosts):
264264

265265
def distance(self, host):
266266
dc = self._dc(host)
267+
if not self.local_dc:
268+
self.local_dc = dc
269+
return HostDistance.LOCAL
267270
if dc == self.local_dc:
268271
return HostDistance.LOCAL
269272

cassandra/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
176176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
177177
self.conviction_policy = conviction_policy_factory(self)
178178
if not host_id:
179-
host_id = uuid.uuid4()
179+
raise ValueError("host_id may not be None")
180180
self.host_id = host_id
181181
self.set_location_info(datacenter, rack)
182182
self.lock = RLock()

tests/integration/standard/test_policies.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ def test_predicate_changes(self):
4545
external_event = True
4646
contact_point = DefaultEndPoint("127.0.0.1")
4747

48-
single_host = {Host(contact_point, SimpleConvictionPolicy)}
49-
all_hosts = {Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in (1, 2, 3)}
50-
5148
predicate = lambda host: host.endpoint == contact_point if external_event else True
5249
hfp = ExecutionProfile(
5350
load_balancing_policy=HostFilterPolicy(RoundRobinPolicy(), predicate=predicate)
@@ -62,7 +59,8 @@ def test_predicate_changes(self):
6259
response = session.execute("SELECT * from system.local WHERE key='local'")
6360
queried_hosts.update(response.response_future.attempted_hosts)
6461

65-
assert queried_hosts == single_host
62+
assert len(queried_hosts) == 1
63+
assert queried_hosts.pop().endpoint == contact_point
6664

6765
external_event = False
6866
futures = session.update_created_pools()
@@ -72,7 +70,7 @@ def test_predicate_changes(self):
7270
for _ in range(10):
7371
response = session.execute("SELECT * from system.local WHERE key='local'")
7472
queried_hosts.update(response.response_future.attempted_hosts)
75-
assert queried_hosts == all_hosts
73+
assert len(queried_hosts) == 3
7674

7775

7876
class WhiteListRoundRobinPolicyTests(unittest.TestCase):

0 commit comments

Comments
 (0)