Skip to content

Commit 4a3a8ad

Browse files
committed
Invalidate tablets when table or keyspace is deleted
Delete tablets for table or keyspace when one is deleted. When host is removed from cluster delete all tablets that have this host in it. Ensure that if it happens when control connection is reconnection.
1 parent 69191ac commit 4a3a8ad

File tree

4 files changed

+139
-13
lines changed

4 files changed

+139
-13
lines changed

cassandra/cluster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3641,6 +3641,7 @@ def _set_new_connection(self, conn):
36413641
with self._lock:
36423642
old = self._connection
36433643
self._connection = conn
3644+
self.refresh_schema()
36443645

36453646
if old:
36463647
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)

cassandra/metadata.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,13 @@ def _rebuild_all(self, parser):
168168
current_keyspaces = set()
169169
for keyspace_meta in parser.get_all_keyspaces():
170170
current_keyspaces.add(keyspace_meta.name)
171-
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
171+
old_keyspace_meta: KeyspaceMetadata = self.keyspaces.get(keyspace_meta.name, None)
172172
self.keyspaces[keyspace_meta.name] = keyspace_meta
173173
if old_keyspace_meta:
174174
self._keyspace_updated(keyspace_meta.name)
175+
for table_name in old_keyspace_meta.tables.keys():
176+
if keyspace_meta.tables.get(table_name, None) is None:
177+
self._table_removed(keyspace_meta.name, table_name)
175178
else:
176179
self._keyspace_added(keyspace_meta.name)
177180

@@ -265,17 +268,22 @@ def _drop_aggregate(self, keyspace, aggregate):
265268
except KeyError:
266269
pass
267270

271+
def _table_removed(self, keyspace, table):
272+
self._tablets.drop_tablets(keyspace, table)
273+
268274
def _keyspace_added(self, ksname):
269275
if self.token_map:
270276
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
271277

272278
def _keyspace_updated(self, ksname):
273279
if self.token_map:
274280
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
281+
self._tablets.drop_tablets(ksname)
275282

276283
def _keyspace_removed(self, ksname):
277284
if self.token_map:
278285
self.token_map.remove_keyspace(ksname)
286+
self._tablets.drop_tablets(ksname)
279287

280288
def rebuild_token_map(self, partitioner, token_map):
281289
"""
@@ -340,11 +348,13 @@ def add_or_return_host(self, host):
340348
return host, True
341349

342350
def remove_host(self, host):
351+
self._tablets.drop_tablet_by_host_id(host.host_id)
343352
with self._hosts_lock:
344353
self._host_id_by_endpoint.pop(host.endpoint, False)
345354
return bool(self._hosts.pop(host.host_id, False))
346355

347356
def remove_host_by_host_id(self, host_id, endpoint=None):
357+
self._tablets.drop_tablet_by_host_id(host_id)
348358
with self._hosts_lock:
349359
if endpoint and self._host_id_by_endpoint[endpoint] == host_id:
350360
self._host_id_by_endpoint.pop(endpoint, False)

cassandra/tablets.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from threading import Lock
2+
from uuid import UUID
23

34

45
class Tablet(object):
@@ -32,6 +33,12 @@ def from_row(first_token, last_token, replicas):
3233
return tablet
3334
return None
3435

36+
def replica_contains_host_id(self, uuid: UUID) -> bool:
37+
for replica in self.replicas:
38+
if replica[0] == uuid:
39+
return True
40+
return False
41+
3542

3643
class Tablets(object):
3744
_lock = None
@@ -51,6 +58,33 @@ def get_tablet_for_key(self, keyspace, table, t):
5158
return tablet[id]
5259
return None
5360

61+
def drop_tablets(self, keyspace: str, table: str = None):
62+
with self._lock:
63+
if table is not None:
64+
self._tablets.pop((keyspace, table), None)
65+
return
66+
67+
to_be_deleted = []
68+
for key in self._tablets.keys():
69+
if key[0] == keyspace:
70+
to_be_deleted.append(key)
71+
72+
for key in to_be_deleted:
73+
del self._tablets[key]
74+
75+
def drop_tablet_by_host_id(self, host_id: UUID):
76+
if host_id is None:
77+
return
78+
with self._lock:
79+
for key, tablets in self._tablets.items():
80+
to_be_deleted = []
81+
for tablet_id, tablet in enumerate(tablets):
82+
if tablet.replica_contains_host_id(host_id):
83+
to_be_deleted.append(tablet_id)
84+
85+
for tablet_id in reversed(to_be_deleted):
86+
tablets.pop(tablet_id)
87+
5488
def add_tablet(self, keyspace, table, tablet):
5589
with self._lock:
5690
tablets_for_table = self._tablets.setdefault((keyspace, table), [])

tests/integration/experiments/test_tablets.py

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
import time
2+
3+
import pytest
4+
15
from cassandra.cluster import Cluster
26
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
37

48
from tests.integration import PROTOCOL_VERSION, use_cluster
59
from tests.unit.test_host_connection_pool import LOGGER
610

11+
CCM_CLUSTER = None
12+
713
def setup_module():
8-
use_cluster('tablets', [3], start=True)
14+
global CCM_CLUSTER
15+
16+
CCM_CLUSTER = use_cluster('tablets', [3], start=True)
17+
918

1019
class TestTabletsIntegration:
1120
@classmethod
@@ -14,22 +23,22 @@ def setup_class(cls):
1423
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
1524
reconnection_policy=ConstantReconnectionPolicy(1))
1625
cls.session = cls.cluster.connect()
17-
cls.create_ks_and_cf(cls)
26+
cls.create_ks_and_cf(cls.session)
1827
cls.create_data(cls.session)
1928

2029
@classmethod
2130
def teardown_class(cls):
2231
cls.cluster.shutdown()
2332

24-
def verify_same_host_in_tracing(self, results):
33+
def verify_hosts_in_tracing(self, results, expected):
2534
traces = results.get_query_trace()
2635
events = traces.events
2736
host_set = set()
2837
for event in events:
2938
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
3039
host_set.add(event.source)
3140

32-
assert len(host_set) == 1
41+
assert len(host_set) == expected
3342
assert 'locally' in "\n".join([event.description for event in events])
3443

3544
trace_id = results.response_future.get_query_trace_ids()[0]
@@ -40,9 +49,13 @@ def verify_same_host_in_tracing(self, results):
4049
LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity)
4150
host_set.add(event.source)
4251

43-
assert len(host_set) == 1
52+
assert len(host_set) == expected
4453
assert 'locally' in "\n".join([event.activity for event in events])
4554

55+
def get_tablet_record(self, query):
56+
metadata = self.session.cluster.metadata
57+
return metadata._tablets.get_tablet_for_key(query.keyspace, query.table, metadata.token_map.token_class.from_key(query.routing_key))
58+
4659
def verify_same_shard_in_tracing(self, results):
4760
traces = results.get_query_trace()
4861
events = traces.events
@@ -65,24 +78,25 @@ def verify_same_shard_in_tracing(self, results):
6578
assert len(shard_set) == 1
6679
assert 'locally' in "\n".join([event.activity for event in events])
6780

68-
def create_ks_and_cf(self):
69-
self.session.execute(
81+
@classmethod
82+
def create_ks_and_cf(cls, session):
83+
session.execute(
7084
"""
7185
DROP KEYSPACE IF EXISTS test1
7286
"""
7387
)
74-
self.session.execute(
88+
session.execute(
7589
"""
7690
CREATE KEYSPACE test1
7791
WITH replication = {
7892
'class': 'NetworkTopologyStrategy',
79-
'replication_factor': 1
93+
'replication_factor': 2
8094
} AND tablets = {
8195
'initial': 8
8296
}
8397
""")
8498

85-
self.session.execute(
99+
session.execute(
86100
"""
87101
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
88102
""")
@@ -120,7 +134,7 @@ def query_data_host_select(self, session, verify_in_tracing=True):
120134
results = session.execute(bound, trace=True)
121135
assert results == [(2, 2, 0)]
122136
if verify_in_tracing:
123-
self.verify_same_host_in_tracing(results)
137+
self.verify_hosts_in_tracing(results, 1)
124138

125139
def query_data_shard_insert(self, session, verify_in_tracing=True):
126140
prepared = session.prepare(
@@ -142,7 +156,7 @@ def query_data_host_insert(self, session, verify_in_tracing=True):
142156
bound = prepared.bind([(52), (1), (2)])
143157
results = session.execute(bound, trace=True)
144158
if verify_in_tracing:
145-
self.verify_same_host_in_tracing(results)
159+
self.verify_hosts_in_tracing(results, 2)
146160

147161
def test_tablets(self):
148162
self.query_data_host_select(self.session)
@@ -151,3 +165,70 @@ def test_tablets(self):
151165
def test_tablets_shard_awareness(self):
152166
self.query_data_shard_select(self.session)
153167
self.query_data_shard_insert(self.session)
168+
169+
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
170+
def recreate_while_reconnecting(_):
171+
# Kill control connection
172+
conn = self.session.cluster.control_connection._connection
173+
self.session.cluster.control_connection._connection = None
174+
conn.close()
175+
176+
# Drop and recreate ks and table to trigger tablets invalidation
177+
self.create_ks_and_cf(self.cluster.connect())
178+
179+
# Start control connection
180+
self.session.cluster.control_connection._reconnect()
181+
182+
self.run_tablets_invalidation_test(recreate_while_reconnecting)
183+
184+
def test_tablets_invalidation_drop_ks(self):
185+
def drop_ks(_):
186+
# Drop and recreate ks and table to trigger tablets invalidation
187+
self.create_ks_and_cf(self.cluster.connect())
188+
time.sleep(3)
189+
190+
self.run_tablets_invalidation_test(drop_ks)
191+
192+
@pytest.mark.last
193+
def test_tablets_invalidation_decommission_non_cc_node(self):
194+
def decommission_non_cc_node(rec):
195+
# Drop and recreate ks and table to trigger tablets invalidation
196+
for node in CCM_CLUSTER.nodes.values():
197+
if self.cluster.control_connection._connection.endpoint.address == node.network_interfaces["storage"][0]:
198+
# Ignore node that control connection is connected to
199+
continue
200+
for replica in rec.replicas:
201+
if str(replica[0]) == str(node.node_hostid):
202+
node.decommission()
203+
break
204+
else:
205+
continue
206+
break
207+
else:
208+
assert False, "failed to find node to decommission"
209+
time.sleep(10)
210+
211+
self.run_tablets_invalidation_test(decommission_non_cc_node)
212+
213+
214+
def run_tablets_invalidation_test(self, invalidate):
215+
# Make sure driver holds tablet info
216+
# By landing query to the host that is not in replica set
217+
bound = self.session.prepare(
218+
"""
219+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
220+
""").bind([(2)])
221+
222+
rec = None
223+
for host in self.cluster.metadata.all_hosts():
224+
self.session.execute(bound, host=host)
225+
rec = self.get_tablet_record(bound)
226+
if rec is not None:
227+
break
228+
229+
assert rec is not None, "failed to find tablet record"
230+
231+
invalidate(rec)
232+
233+
# Check if tablets information was purged
234+
assert self.get_tablet_record(bound) is None, "tablet was not deleted, invalidation did not work"

0 commit comments

Comments
 (0)