Skip to content

Commit c89784f

Browse files
committed
fix unittest to match the logic change in metadata
1 parent 8b78f06 commit c89784f

File tree

2 files changed

+64
-35
lines changed

2 files changed

+64
-35
lines changed

cassandra/cluster.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3498,7 +3498,7 @@ class ControlConnection(object):
34983498
_SELECT_PEERS = "SELECT * FROM system.peers"
34993499
_SELECT_PEERS_NO_TOKENS_TEMPLATE = "SELECT host_id, peer, data_center, rack, rpc_address, {nt_col_name}, release_version, schema_version FROM system.peers"
35003500
_SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'"
3501-
_SELECT_LOCAL_NO_TOKENS = "SELECT host_id, cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
3501+
_SELECT_LOCAL_NO_TOKENS = "SELECT host_id, cluster_name, data_center, rack, partitioner, release_version, schema_version, rpc_address FROM system.local WHERE key='local'"
35023502
# Used only when token_metadata_enabled is set to False
35033503
_SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS = "SELECT rpc_address FROM system.local WHERE key='local'"
35043504

@@ -3832,7 +3832,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38323832
datacenter = local_row.get("data_center")
38333833
rack = local_row.get("rack")
38343834
self._update_location_info(host, datacenter, rack)
3835-
host.endpoint = self._cluster.endpoint_factory.create(local_row)
3835+
new_endpoint = self._cluster.endpoint_factory.create(local_row)
3836+
if new_endpoint.address:
3837+
host.endpoint = new_endpoint
38363838
host.host_id = local_row.get("host_id")
38373839
found_host_ids.add(host.host_id)
38383840
host.listen_address = local_row.get("listen_address")
@@ -3919,7 +3921,6 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39193921
tokens = row.get("tokens", None)
39203922
if partitioner and tokens and self._token_meta_enabled:
39213923
token_map[host] = tokens
3922-
39233924
for old_host_id, old_host in self._cluster.metadata.all_hosts_items():
39243925
if old_host_id not in found_host_ids:
39253926
should_rebuild_token_map = True

tests/unit/test_control_connection.py

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,14 @@ class MockMetadata(object):
3434

3535
def __init__(self):
3636
self.hosts = {
37-
DefaultEndPoint("192.168.1.0"): Host(DefaultEndPoint("192.168.1.0"), SimpleConvictionPolicy),
38-
DefaultEndPoint("192.168.1.1"): Host(DefaultEndPoint("192.168.1.1"), SimpleConvictionPolicy),
39-
DefaultEndPoint("192.168.1.2"): Host(DefaultEndPoint("192.168.1.2"), SimpleConvictionPolicy)
37+
'uuid1': Host(endpoint=DefaultEndPoint("192.168.1.0"), conviction_policy_factory=SimpleConvictionPolicy, host_id='uuid1'),
38+
'uuid2': Host(endpoint=DefaultEndPoint("192.168.1.1"), conviction_policy_factory=SimpleConvictionPolicy, host_id='uuid2'),
39+
'uuid3': Host(endpoint=DefaultEndPoint("192.168.1.2"), conviction_policy_factory=SimpleConvictionPolicy, host_id='uuid3')
40+
}
41+
self._host_id_by_endpoint = {
42+
DefaultEndPoint("192.168.1.0"): 'uuid1',
43+
DefaultEndPoint("192.168.1.1"): 'uuid2',
44+
DefaultEndPoint("192.168.1.2"): 'uuid3',
4045
}
4146
for host in self.hosts.values():
4247
host.set_up()
@@ -45,6 +50,7 @@ def __init__(self):
4550
self.cluster_name = None
4651
self.partitioner = None
4752
self.token_map = {}
53+
self.removed_hosts = []
4854

4955
def get_host(self, endpoint_or_address, port=None):
5056
if not isinstance(endpoint_or_address, EndPoint):
@@ -53,7 +59,8 @@ def get_host(self, endpoint_or_address, port=None):
5359
(port is None or host.broadcast_rpc_port is None or host.broadcast_rpc_port == port)):
5460
return host
5561
else:
56-
return self.hosts.get(endpoint_or_address)
62+
host_id = self._host_id_by_endpoint.get(endpoint_or_address)
63+
return self.hosts.get(host_id)
5764

5865
def all_hosts(self):
5966
return self.hosts.values()
@@ -62,6 +69,26 @@ def rebuild_token_map(self, partitioner, token_map):
6269
self.partitioner = partitioner
6370
self.token_map = token_map
6471

72+
def add_or_return_host(self, host):
73+
try:
74+
return self.hosts[host.host_id], False
75+
except KeyError:
76+
self._host_id_by_endpoint[host.endpoint] = host.host_id
77+
self.hosts[host.host_id] = host
78+
return host, True
79+
80+
def update_host(self, host, old_endpoint):
81+
host, created = self.add_or_return_host(host)
82+
self._host_id_by_endpoint[host.endpoint] = host.host_id
83+
self._host_id_by_endpoint.pop(old_endpoint, False)
84+
85+
def all_hosts_items(self):
86+
return list(self.hosts.items())
87+
88+
def remove_host_by_host_id(self, host_id):
89+
self.removed_hosts.append(self.hosts.pop(host_id, False))
90+
return bool(self.hosts.pop(host_id, False))
91+
6592

6693
class MockCluster(object):
6794

@@ -76,20 +103,20 @@ class MockCluster(object):
76103
def __init__(self):
77104
self.metadata = MockMetadata()
78105
self.added_hosts = []
79-
self.removed_hosts = []
80106
self.scheduler = Mock(spec=_Scheduler)
81107
self.executor = Mock(spec=ThreadPoolExecutor)
82108
self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(RoundRobinPolicy())
83109
self.endpoint_factory = DefaultEndPointFactory().configure(self)
84110
self.ssl_options = None
85111

86-
def add_host(self, endpoint, datacenter, rack, signal=False, refresh_nodes=True):
87-
host = Host(endpoint, SimpleConvictionPolicy, datacenter, rack)
112+
def add_host(self, endpoint, datacenter, rack, signal=False, refresh_nodes=True, host_id=None):
113+
host = Host(endpoint, SimpleConvictionPolicy, datacenter, rack, host_id=host_id)
114+
host, _ = self.metadata.add_or_return_host(host)
88115
self.added_hosts.append(host)
89116
return host, True
90117

91118
def remove_host(self, host):
92-
self.removed_hosts.append(host)
119+
pass
93120

94121
def on_up(self, host):
95122
pass
@@ -121,20 +148,20 @@ def __init__(self):
121148
self.endpoint = DefaultEndPoint("192.168.1.0")
122149
self.original_endpoint = self.endpoint
123150
self.local_results = [
124-
["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens"],
125-
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"]]]
151+
["rpc_address", "schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
152+
[["192.168.1.0", "a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]
126153
]
127154

128155
self.peer_results = [
129156
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
130-
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
131-
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"]]
157+
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
158+
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]
132159
]
133160

134161
self.peer_results_v2 = [
135162
["native_address", "native_port", "peer", "peer_port", "schema_version", "data_center", "rack", "tokens", "host_id"],
136-
[["192.168.1.1", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
137-
["192.168.1.2", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"]]
163+
[["192.168.1.1", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
164+
["192.168.1.2", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]
138165
]
139166
self.wait_for_responses = Mock(return_value=_node_meta_results(self.local_results, self.peer_results))
140167

@@ -154,15 +181,15 @@ def sleep(self, amount):
154181
class ControlConnectionTest(unittest.TestCase):
155182

156183
_matching_schema_preloaded_results = _node_meta_results(
157-
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
158-
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
184+
local_results=(["rpc_address", "schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
185+
[["192.168.1.0", "a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
159186
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
160187
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
161188
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]))
162189

163190
_nonmatching_schema_preloaded_results = _node_meta_results(
164-
local_results=(["schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
165-
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
191+
local_results=(["rpc_address", "schema_version", "cluster_name", "data_center", "rack", "partitioner", "release_version", "tokens", "host_id"],
192+
[["192.168.1.0", "a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", "2.2.0", ["0", "100", "200"], "uuid1"]]),
166193
peer_results=(["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
167194
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"],
168195
["192.168.1.2", "10.0.0.2", "b", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]))
@@ -240,10 +267,11 @@ def test_wait_for_schema_agreement_rpc_lookup(self):
240267
If the rpc_address is 0.0.0.0, the "peer" column should be used instead.
241268
"""
242269
self.connection.peer_results[1].append(
243-
["0.0.0.0", PEER_IP, "b", "dc1", "rack1", ["3", "103", "203"]]
270+
["0.0.0.0", PEER_IP, "b", "dc1", "rack1", ["3", "103", "203"], "uuid6"]
244271
)
245-
host = Host(DefaultEndPoint("0.0.0.0"), SimpleConvictionPolicy)
246-
self.cluster.metadata.hosts[DefaultEndPoint("foobar")] = host
272+
host = Host(DefaultEndPoint("0.0.0.0"), SimpleConvictionPolicy, host_id='uuid6')
273+
self.cluster.metadata.hosts[host.host_id] = host
274+
self.cluster.metadata._host_id_by_endpoint[DefaultEndPoint(PEER_IP)] = host.host_id
247275
host.is_up = False
248276

249277
# even though the new host has a different schema version, it's
@@ -285,7 +313,7 @@ def refresh_and_validate_added_hosts():
285313
del self.connection.peer_results[:]
286314
self.connection.peer_results.extend([
287315
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"],
288-
[["192.168.1.3", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], 'uuid5'],
316+
[["192.168.1.3", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], 'uuid6'],
289317
# all others are invalid
290318
[None, None, "a", "dc1", "rack1", ["1", "101", "201"], 'uuid1'],
291319
["192.168.1.7", "10.0.0.1", "a", None, "rack1", ["1", "101", "201"], 'uuid2'],
@@ -299,7 +327,7 @@ def refresh_and_validate_added_hosts():
299327
del self.connection.peer_results[:]
300328
self.connection.peer_results.extend([
301329
["native_address", "native_port", "peer", "peer_port", "schema_version", "data_center", "rack", "tokens", "host_id"],
302-
[["192.168.1.4", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid1"],
330+
[["192.168.1.4", 9042, "10.0.0.1", 7042, "a", "dc1", "rack1", ["1", "101", "201"], "uuid6"],
303331
# all others are invalid
304332
[None, 9042, None, 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid2"],
305333
["192.168.1.5", 9042, "10.0.0.2", 7040, "a", None, "rack1", ["2", "102", "202"], "uuid2"],
@@ -336,29 +364,29 @@ def test_refresh_nodes_and_tokens_no_partitioner(self):
336364
Test handling of an unknown partitioner.
337365
"""
338366
# set the partitioner column to None
339-
self.connection.local_results[1][0][4] = None
367+
self.connection.local_results[1][0][5] = None
340368
self.control_connection.refresh_node_list_and_token_map()
341369
meta = self.cluster.metadata
342370
self.assertEqual(meta.partitioner, None)
343371
self.assertEqual(meta.token_map, {})
344372

345373
def test_refresh_nodes_and_tokens_add_host(self):
346374
self.connection.peer_results[1].append(
347-
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
375+
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"], "uuid4"]
348376
)
349377
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)
350378
self.control_connection.refresh_node_list_and_token_map()
351379
self.assertEqual(1, len(self.cluster.added_hosts))
352380
self.assertEqual(self.cluster.added_hosts[0].address, "192.168.1.3")
353381
self.assertEqual(self.cluster.added_hosts[0].datacenter, "dc1")
354382
self.assertEqual(self.cluster.added_hosts[0].rack, "rack1")
355-
self.assertEqual(self.cluster.added_hosts[0].host_id, "uuid3")
383+
self.assertEqual(self.cluster.added_hosts[0].host_id, "uuid4")
356384

357385
def test_refresh_nodes_and_tokens_remove_host(self):
358386
del self.connection.peer_results[1][1]
359387
self.control_connection.refresh_node_list_and_token_map()
360-
self.assertEqual(1, len(self.cluster.removed_hosts))
361-
self.assertEqual(self.cluster.removed_hosts[0].address, "192.168.1.2")
388+
self.assertEqual(1, len(self.cluster.metadata.removed_hosts))
389+
self.assertEqual(self.cluster.metadata.removed_hosts[0].address, "192.168.1.2")
362390

363391
def test_refresh_nodes_and_tokens_timeout(self):
364392

@@ -423,7 +451,7 @@ def test_handle_status_change(self):
423451
}
424452
self.cluster.scheduler.reset_mock()
425453
self.control_connection._handle_status_change(event)
426-
host = self.cluster.metadata.hosts[DefaultEndPoint('192.168.1.0')]
454+
host = self.cluster.metadata.get_host(DefaultEndPoint('192.168.1.0'))
427455
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.on_up, host)
428456

429457
self.cluster.scheduler.schedule.reset_mock()
@@ -440,7 +468,7 @@ def test_handle_status_change(self):
440468
'address': ('192.168.1.0', 9000)
441469
}
442470
self.control_connection._handle_status_change(event)
443-
host = self.cluster.metadata.hosts[DefaultEndPoint('192.168.1.0')]
471+
host = self.cluster.metadata.get_host(DefaultEndPoint('192.168.1.0'))
444472
self.assertIs(host, self.cluster.down_host)
445473

446474
def test_handle_schema_change(self):
@@ -516,7 +544,7 @@ def test_refresh_nodes_and_tokens_add_host_detects_port(self):
516544
del self.connection.peer_results[:]
517545
self.connection.peer_results.extend(self.connection.peer_results_v2)
518546
self.connection.peer_results[1].append(
519-
["192.168.1.3", 555, "10.0.0.3", 666, "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
547+
["192.168.1.3", 555, "10.0.0.3", 666, "a", "dc1", "rack1", ["3", "103", "203"], "uuid4"]
520548
)
521549
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
522550
self.connection.local_results, self.connection.peer_results))
@@ -536,7 +564,7 @@ def test_refresh_nodes_and_tokens_add_host_detects_invalid_port(self):
536564
del self.connection.peer_results[:]
537565
self.connection.peer_results.extend(self.connection.peer_results_v2)
538566
self.connection.peer_results[1].append(
539-
["192.168.1.3", -1, "10.0.0.3", 0, "a", "dc1", "rack1", ["3", "103", "203"], "uuid3"]
567+
["192.168.1.3", -1, "10.0.0.3", 0, "a", "dc1", "rack1", ["3", "103", "203"], "uuid4"]
540568
)
541569
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(
542570
self.connection.local_results, self.connection.peer_results))

0 commit comments

Comments
 (0)