Skip to content

Commit 7f061e1

Browse files
Don't create Host instances with random host_id
1 parent dd1adc7 commit 7f061e1

File tree

5 files changed

+42
-41
lines changed

5 files changed

+42
-41
lines changed

cassandra/cluster.py

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,7 +1464,9 @@ def __init__(self,
14641464
self.schema_event_refresh_window, self.topology_event_refresh_window,
14651465
self.status_event_refresh_window,
14661466
schema_metadata_enabled, token_metadata_enabled,
1467-
schema_meta_page_size=schema_metadata_page_size)
1467+
schema_meta_page_size=schema_metadata_page_size,
1468+
resolved_contact_points=self.endpoints_resolved,
1469+
)
14681470

14691471
if client_id is None:
14701472
self.client_id = uuid.uuid4()
@@ -1683,14 +1685,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831685
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
16841686
self.protocol_version = new_version
16851687

1686-
def _add_resolved_hosts(self):
1687-
for endpoint in self.endpoints_resolved:
1688-
host, new = self.add_host(endpoint, signal=False)
1689-
if new:
1690-
host.set_up()
1691-
for listener in self.listeners:
1692-
listener.on_add(host)
1693-
1688+
def _populate_hosts(self):
16941689
self.profile_manager.populate(
16951690
weakref.proxy(self), self.metadata.all_hosts())
16961691
self.load_balancing_policy.populate(
@@ -1717,8 +1712,6 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17171712
self.contact_points, self.protocol_version)
17181713
self.connection_class.initialize_reactor()
17191714
_register_cluster_shutdown(self)
1720-
1721-
self._add_resolved_hosts()
17221715

17231716
try:
17241717
self.control_connection.connect()
@@ -3483,6 +3476,8 @@ class PeersQueryType(object):
34833476
_uses_peers_v2 = True
34843477
_tablets_routing_v1 = False
34853478

3479+
_resolved_contact_points = None
3480+
34863481
# for testing purposes
34873482
_time = time
34883483

@@ -3492,7 +3487,8 @@ def __init__(self, cluster, timeout,
34923487
status_event_refresh_window,
34933488
schema_meta_enabled=True,
34943489
token_meta_enabled=True,
3495-
schema_meta_page_size=1000):
3490+
schema_meta_page_size=1000,
3491+
resolved_contact_points=None):
34963492
# use a weak reference to allow the Cluster instance to be GC'ed (and
34973493
# shutdown) since implementing __del__ disables the cycle detector
34983494
self._cluster = weakref.proxy(cluster)
@@ -3514,6 +3510,8 @@ def __init__(self, cluster, timeout,
35143510

35153511
self._event_schedule_times = {}
35163512

3513+
self._resolved_contact_points = resolved_contact_points
3514+
35173515
def connect(self):
35183516
if self._is_shutdown:
35193517
return
@@ -3534,28 +3532,23 @@ def _set_new_connection(self, conn):
35343532
if old:
35353533
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35363534
old.close()
3537-
3538-
def _connect_host_in_lbp(self):
3535+
3536+
def _connect_host(self):
35393537
errors = {}
3540-
lbp = (
3541-
self._cluster.load_balancing_policy
3542-
if self._cluster._config_mode == _ConfigMode.LEGACY else
3543-
self._cluster._default_load_balancing_policy
3544-
)
35453538

3546-
for host in lbp.make_query_plan():
3539+
for endpoint in self._resolved_contact_points:
35473540
try:
3548-
return (self._try_connect(host), None)
3541+
return (self._try_connect(endpoint), None)
35493542
except ConnectionException as exc:
3550-
errors[str(host.endpoint)] = exc
3551-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3552-
self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
3543+
errors[str(endpoint)] = exc
3544+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
3545+
self._cluster.signal_connection_failure(endpoint, exc, is_host_addition=False)
35533546
except Exception as exc:
3554-
errors[str(host.endpoint)] = exc
3555-
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
3547+
errors[str(endpoint)] = exc
3548+
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
35563549
if self._is_shutdown:
35573550
raise DriverException("[control connection] Reconnection in progress during shutdown")
3558-
3551+
35593552
return (None, errors)
35603553

35613554
def _reconnect_internal(self):
@@ -3567,43 +3560,44 @@ def _reconnect_internal(self):
35673560
to the exception that was raised when an attempt was made to open
35683561
a connection to that host.
35693562
"""
3570-
(conn, _) = self._connect_host_in_lbp()
3563+
(conn, _) = self._connect_host()
35713564
if conn is not None:
35723565
return conn
35733566

35743567
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
35753568
self._cluster._resolve_hostnames()
3569+
self._resolved_contact_points = self._cluster.endpoints_resolved
35763570

35773571
self._cluster._add_resolved_hosts()
35783572

3579-
(conn, errors) = self._connect_host_in_lbp()
3573+
(conn, errors) = self._connect_host()
35803574
if conn is not None:
35813575
return conn
3582-
3576+
35833577
raise NoHostAvailable("Unable to connect to any servers", errors)
35843578

3585-
def _try_connect(self, host):
3579+
def _try_connect(self, endpoint):
35863580
"""
35873581
Creates a new Connection, registers for pushed events, and refreshes
35883582
node/token and schema metadata.
35893583
"""
3590-
log.debug("[control connection] Opening new connection to %s", host)
3584+
log.debug("[control connection] Opening new connection to %s", endpoint)
35913585

35923586
while True:
35933587
try:
3594-
connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
3588+
connection = self._cluster.connection_factory(endpoint, is_control_connection=True)
35953589
if self._is_shutdown:
35963590
connection.close()
35973591
raise DriverException("Reconnecting during shutdown")
35983592
break
35993593
except ProtocolVersionUnsupported as e:
3600-
self._cluster.protocol_downgrade(host.endpoint, e.startup_version)
3594+
self._cluster.protocol_downgrade(endpoint, e.startup_version)
36013595
except ProtocolException as e:
36023596
# protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver
36033597
# protocol version. If the protocol version was not explicitly specified,
36043598
# and that the server raises a beta protocol error, we should downgrade.
36053599
if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error:
3606-
self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version)
3600+
self._cluster.protocol_downgrade(endpoint, self._cluster.protocol_version)
36073601
else:
36083602
raise
36093603

@@ -3879,6 +3873,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38793873

38803874
self._cluster.metadata.update_host(host, old_endpoint=connection.endpoint)
38813875
connection.original_endpoint = connection.endpoint = host.endpoint
3876+
else:
3877+
log.info("Consider local host new found host")
3878+
peers_result.append(local_row)
38823879
# Check metadata.partitioner to see if we haven't built anything yet. If
38833880
# every node in the cluster was in the contact points, we won't discover
38843881
# any new nodes, so we need this additional check. (See PYTHON-90)

cassandra/metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ def export_schema_as_string(self):
139139
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
140140
metadata_request_timeout=None, **kwargs):
141141

142+
if not self.get_host(connection.original_endpoint):
143+
return
144+
142145
server_version = self.get_host(connection.original_endpoint).release_version
143146
dse_version = self.get_host(connection.original_endpoint).dse_version
144147
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)

cassandra/policies.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def populate(self, cluster, hosts):
264264

265265
def distance(self, host):
266266
dc = self._dc(host)
267+
if not self.local_dc:
268+
self.local_dc = dc
269+
return HostDistance.LOCAL
267270
if dc == self.local_dc:
268271
return HostDistance.LOCAL
269272

cassandra/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
176176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
177177
self.conviction_policy = conviction_policy_factory(self)
178178
if not host_id:
179-
host_id = uuid.uuid4()
179+
raise ValueError("host_id may not be None")
180180
self.host_id = host_id
181181
self.set_location_info(datacenter, rack)
182182
self.lock = RLock()

tests/integration/standard/test_policies.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ def test_predicate_changes(self):
4545
external_event = True
4646
contact_point = DefaultEndPoint("127.0.0.1")
4747

48-
single_host = {Host(contact_point, SimpleConvictionPolicy)}
49-
all_hosts = {Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in (1, 2, 3)}
50-
5148
predicate = lambda host: host.endpoint == contact_point if external_event else True
5249
hfp = ExecutionProfile(
5350
load_balancing_policy=HostFilterPolicy(RoundRobinPolicy(), predicate=predicate)
@@ -62,7 +59,8 @@ def test_predicate_changes(self):
6259
response = session.execute("SELECT * from system.local WHERE key='local'")
6360
queried_hosts.update(response.response_future.attempted_hosts)
6461

65-
assert queried_hosts == single_host
62+
assert len(queried_hosts) == 1
63+
assert queried_hosts.pop().endpoint == contact_point
6664

6765
external_event = False
6866
futures = session.update_created_pools()
@@ -72,7 +70,7 @@ def test_predicate_changes(self):
7270
for _ in range(10):
7371
response = session.execute("SELECT * from system.local WHERE key='local'")
7472
queried_hosts.update(response.response_future.attempted_hosts)
75-
assert queried_hosts == all_hosts
73+
assert len(queried_hosts) == 3
7674

7775

7876
class WhiteListRoundRobinPolicyTests(unittest.TestCase):

0 commit comments

Comments
 (0)