Skip to content

Commit 8b78f06

Browse files
committed
metadata: save hosts based on host_id insted of endpoint
also keep mapping between endpoints to host_ids, so we control connection can still working with the inital endpoint, while the hosts list only known nodes with thier host_id
1 parent 6de917f commit 8b78f06

File tree

3 files changed

+45
-18
lines changed

3 files changed

+45
-18
lines changed

cassandra/cluster.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,15 +2130,15 @@ def signal_connection_failure(self, host, connection_exc, is_host_addition, expe
21302130
self.on_down(host, is_host_addition, expect_host_to_be_down)
21312131
return is_down
21322132

2133-
def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True):
2133+
def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True, host_id=None):
21342134
"""
21352135
Called when adding initial contact points and when the control
21362136
connection subsequently discovers a new node.
21372137
Returns a Host instance, and a flag indicating whether it was new in
21382138
the metadata.
21392139
Intended for internal use only.
21402140
"""
2141-
host, new = self.metadata.add_or_return_host(Host(endpoint, self.conviction_policy_factory, datacenter, rack))
2141+
host, new = self.metadata.add_or_return_host(Host(endpoint, self.conviction_policy_factory, datacenter, rack, host_id=host_id))
21422142
if new and signal:
21432143
log.info("New Cassandra host %r discovered", host)
21442144
self.on_add(host, refresh_nodes)
@@ -3817,9 +3817,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38173817
partitioner = None
38183818
token_map = {}
38193819

3820-
found_hosts = set()
3820+
found_host_ids = set()
38213821
if local_result.parsed_rows:
3822-
found_hosts.add(connection.endpoint)
38233822
local_rows = dict_factory(local_result.column_names, local_result.parsed_rows)
38243823
local_row = local_rows[0]
38253824
cluster_name = local_row["cluster_name"]
@@ -3833,7 +3832,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38333832
datacenter = local_row.get("data_center")
38343833
rack = local_row.get("rack")
38353834
self._update_location_info(host, datacenter, rack)
3835+
host.endpoint = self._cluster.endpoint_factory.create(local_row)
38363836
host.host_id = local_row.get("host_id")
3837+
found_host_ids.add(host.host_id)
38373838
host.listen_address = local_row.get("listen_address")
38383839
host.listen_port = local_row.get("listen_port")
38393840
host.broadcast_address = _NodeInfo.get_broadcast_address(local_row)
@@ -3872,6 +3873,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38723873
if partitioner and tokens:
38733874
token_map[host] = tokens
38743875

3876+
self._cluster.metadata.update_host(host, old_endpoint=connection.endpoint)
3877+
connection.original_endpoint = connection.endpoint = host.endpoint
38753878
# Check metadata.partitioner to see if we haven't built anything yet. If
38763879
# every node in the cluster was in the contact points, we won't discover
38773880
# any new nodes, so we need this additional check. (See PYTHON-90)
@@ -3884,24 +3887,26 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38843887
continue
38853888

38863889
endpoint = self._cluster.endpoint_factory.create(row)
3890+
host_id = row.get("host_id")
38873891

3888-
if endpoint in found_hosts:
3889-
log.warning("Found multiple hosts with the same endpoint (%s). Excluding peer %s", endpoint, row.get("peer"))
3892+
if host_id in found_host_ids:
3893+
log.warning("Found multiple hosts with the same host_id (%s). Excluding peer %s", host_id, row.get("peer"))
38903894
continue
38913895

3892-
found_hosts.add(endpoint)
3896+
found_host_ids.add(host_id)
38933897

38943898
host = self._cluster.metadata.get_host(endpoint)
38953899
datacenter = row.get("data_center")
38963900
rack = row.get("rack")
3901+
38973902
if host is None:
38983903
log.debug("[control connection] Found new host to connect to: %s", endpoint)
3899-
host, _ = self._cluster.add_host(endpoint, datacenter, rack, signal=True, refresh_nodes=False)
3904+
host, _ = self._cluster.add_host(endpoint, datacenter=datacenter, rack=rack, signal=True, refresh_nodes=False, host_id=host_id)
39003905
should_rebuild_token_map = True
39013906
else:
39023907
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)
39033908

3904-
host.host_id = row.get("host_id")
3909+
host.host_id = host_id
39053910
host.broadcast_address = _NodeInfo.get_broadcast_address(row)
39063911
host.broadcast_port = _NodeInfo.get_broadcast_port(row)
39073912
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row)
@@ -3915,11 +3920,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39153920
if partitioner and tokens and self._token_meta_enabled:
39163921
token_map[host] = tokens
39173922

3918-
for old_host in self._cluster.metadata.all_hosts():
3919-
if old_host.endpoint.address != connection.endpoint and old_host.endpoint not in found_hosts:
3923+
for old_host_id, old_host in self._cluster.metadata.all_hosts_items():
3924+
if old_host_id not in found_host_ids:
39203925
should_rebuild_token_map = True
39213926
log.debug("[control connection] Removing host not found in peers metadata: %r", old_host)
3922-
self._cluster.remove_host(old_host)
3927+
self._cluster.metadata.remove_host_by_host_id(old_host_id)
39233928

39243929
log.debug("[control connection] Finished fetching ring info")
39253930
if partitioner and should_rebuild_token_map:

cassandra/metadata.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def __init__(self):
124124
self.keyspaces = {}
125125
self.dbaas = False
126126
self._hosts = {}
127+
self._host_id_by_endpoint = {}
127128
self._hosts_lock = RLock()
128129

129130
def export_schema_as_string(self):
@@ -330,25 +331,39 @@ def add_or_return_host(self, host):
330331
"""
331332
with self._hosts_lock:
332333
try:
333-
return self._hosts[host.endpoint], False
334+
return self._hosts[host.host_id], False
334335
except KeyError:
335-
self._hosts[host.endpoint] = host
336+
self._host_id_by_endpoint[host.endpoint] = host.host_id
337+
self._hosts[host.host_id] = host
336338
return host, True
337339

338340
def remove_host(self, host):
339341
with self._hosts_lock:
340-
return bool(self._hosts.pop(host.endpoint, False))
342+
self._host_id_by_endpoint.pop(host.endpoint, False)
343+
return bool(self._hosts.pop(host.host_id, False))
344+
345+
def remove_host_by_host_id(self, host_id):
346+
with self._hosts_lock:
347+
return bool(self._hosts.pop(host_id, False))
348+
349+
def update_host(self, host, old_endpoint):
350+
host, created = self.add_or_return_host(host)
351+
with self._hosts_lock:
352+
self._host_id_by_endpoint.pop(old_endpoint, False)
353+
self._host_id_by_endpoint[host.endpoint] = host.host_id
341354

342355
def get_host(self, endpoint_or_address, port=None):
343356
"""
344357
Find a host in the metadata for a specific endpoint. If a string inet address and port are passed,
345358
iterate all hosts to match the :attr:`~.pool.Host.broadcast_rpc_address` and
346359
:attr:`~.pool.Host.broadcast_rpc_port` attributes.
347360
"""
348-
if not isinstance(endpoint_or_address, EndPoint):
349-
return self._get_host_by_address(endpoint_or_address, port)
361+
with self._hosts_lock:
362+
if not isinstance(endpoint_or_address, EndPoint):
363+
return self._get_host_by_address(endpoint_or_address, port)
350364

351-
return self._hosts.get(endpoint_or_address)
365+
host_id = self._host_id_by_endpoint.get(endpoint_or_address)
366+
return self._hosts.get(host_id)
352367

353368
def _get_host_by_address(self, address, port=None):
354369
for host in six.itervalues(self._hosts):
@@ -365,6 +380,10 @@ def all_hosts(self):
365380
with self._hosts_lock:
366381
return list(self._hosts.values())
367382

383+
def all_hosts_items(self):
384+
with self._hosts_lock:
385+
return list(self._hosts.items())
386+
368387

369388
REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."
370389

cassandra/pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import time
2323
import random
2424
import copy
25+
import uuid
2526
from threading import Lock, RLock, Condition
2627
import weakref
2728
try:
@@ -174,6 +175,8 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No
174175

175176
self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint)
176177
self.conviction_policy = conviction_policy_factory(self)
178+
if not host_id:
179+
host_id = uuid.uuid4()
177180
self.host_id = host_id
178181
self.set_location_info(datacenter, rack)
179182
self.lock = RLock()

0 commit comments

Comments
 (0)