Skip to content

Commit 6faef19

Browse files
committed
Invalidate tablets when table or keyspace is deleted
Delete tablets for table or keyspace when one is deleted. When host is removed from cluster delete all tablets that have this host in it. Ensure that if it happens when control connection is reconnection.
1 parent 69191ac commit 6faef19

File tree

4 files changed

+147
-10
lines changed

4 files changed

+147
-10
lines changed

cassandra/cluster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3641,6 +3641,7 @@ def _set_new_connection(self, conn):
36413641
with self._lock:
36423642
old = self._connection
36433643
self._connection = conn
3644+
self.refresh_schema()
36443645

36453646
if old:
36463647
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)

cassandra/metadata.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,14 @@ def _rebuild_all(self, parser):
168168
current_keyspaces = set()
169169
for keyspace_meta in parser.get_all_keyspaces():
170170
current_keyspaces.add(keyspace_meta.name)
171-
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
171+
old_keyspace_meta: KeyspaceMetadata = self.keyspaces.get(keyspace_meta.name, None)
172172
self.keyspaces[keyspace_meta.name] = keyspace_meta
173173
if old_keyspace_meta:
174174
self._keyspace_updated(keyspace_meta.name)
175+
for table_name, old_table_meta in old_keyspace_meta.tables.items():
176+
new_table_meta: TableMetadata = keyspace_meta.tables.get(table_name, None)
177+
if new_table_meta is None:
178+
self._table_removed(keyspace_meta.name, table_name)
175179
else:
176180
self._keyspace_added(keyspace_meta.name)
177181

@@ -265,17 +269,22 @@ def _drop_aggregate(self, keyspace, aggregate):
265269
except KeyError:
266270
pass
267271

272+
def _table_removed(self, keyspace, table):
273+
self._tablets.drop_tablet(keyspace, table)
274+
268275
def _keyspace_added(self, ksname):
269276
if self.token_map:
270277
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
271278

272279
def _keyspace_updated(self, ksname):
273280
if self.token_map:
274281
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
282+
self._tablets.drop_tablet(ksname)
275283

276284
def _keyspace_removed(self, ksname):
277285
if self.token_map:
278286
self.token_map.remove_keyspace(ksname)
287+
self._tablets.drop_tablet(ksname)
279288

280289
def rebuild_token_map(self, partitioner, token_map):
281290
"""
@@ -340,11 +349,13 @@ def add_or_return_host(self, host):
340349
return host, True
341350

342351
def remove_host(self, host):
352+
self._tablets.drop_tablet_by_host_id(host.host_id)
343353
with self._hosts_lock:
344354
self._host_id_by_endpoint.pop(host.endpoint, False)
345355
return bool(self._hosts.pop(host.host_id, False))
346356

347357
def remove_host_by_host_id(self, host_id, endpoint=None):
358+
self._tablets.drop_tablet_by_host_id(host_id)
348359
with self._hosts_lock:
349360
if endpoint and self._host_id_by_endpoint[endpoint] == host_id:
350361
self._host_id_by_endpoint.pop(endpoint, False)

cassandra/tablets.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from threading import Lock
2+
from uuid import UUID
23

34

45
class Tablet(object):
@@ -32,6 +33,12 @@ def from_row(first_token, last_token, replicas):
3233
return tablet
3334
return None
3435

36+
def replica_contains_host_id(self, uuid: UUID) -> bool:
37+
for replica in self.replicas:
38+
if replica[0] == uuid:
39+
return True
40+
return False
41+
3542

3643
class Tablets(object):
3744
_lock = None
@@ -51,7 +58,42 @@ def get_tablet_for_key(self, keyspace, table, t):
5158
return tablet[id]
5259
return None
5360

61+
def drop_tablet(self, keyspace: str, table: str = None):
62+
print("drop tablets for ", keyspace, table)
63+
with self._lock:
64+
if table is not None:
65+
self._tablets.pop((keyspace, table), None)
66+
return
67+
68+
to_be_deleted = []
69+
for key in self._tablets.keys():
70+
if key[0] == keyspace:
71+
to_be_deleted.append(key)
72+
73+
for key in to_be_deleted:
74+
del self._tablets[key]
75+
76+
def drop_tablet_by_host_id(self, host_id: UUID):
77+
print("drop tablets for ", host_id)
78+
if host_id is None:
79+
return
80+
with self._lock:
81+
for key, tablets in self._tablets.items():
82+
to_be_deleted = []
83+
for tablet_id, tablet in enumerate(tablets):
84+
if tablet.replica_contains_host_id(host_id):
85+
to_be_deleted.append(tablet_id)
86+
87+
if len(to_be_deleted) == 0:
88+
continue
89+
90+
for tablet_id in reversed(to_be_deleted):
91+
print("drop tablets - ", tablets.pop(tablet_id))
92+
93+
self._tablets[key] = tablets
94+
5495
def add_tablet(self, keyspace, table, tablet):
96+
print("adding tablet", keyspace, table, tablet)
5597
with self._lock:
5698
tablets_for_table = self._tablets.setdefault((keyspace, table), [])
5799

tests/integration/experiments/test_tablets.py

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
import time
2+
3+
import pytest
4+
15
from cassandra.cluster import Cluster
26
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
37

48
from tests.integration import PROTOCOL_VERSION, use_cluster
59
from tests.unit.test_host_connection_pool import LOGGER
610

11+
CCM_CLUSTER = None
12+
713
def setup_module():
8-
use_cluster('tablets', [3], start=True)
14+
global CCM_CLUSTER
15+
16+
CCM_CLUSTER = use_cluster('tablets', [3], start=True)
17+
918

1019
class TestTabletsIntegration:
1120
@classmethod
@@ -14,7 +23,7 @@ def setup_class(cls):
1423
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
1524
reconnection_policy=ConstantReconnectionPolicy(1))
1625
cls.session = cls.cluster.connect()
17-
cls.create_ks_and_cf(cls)
26+
cls.create_ks_and_cf(cls.session)
1827
cls.create_data(cls.session)
1928

2029
@classmethod
@@ -29,7 +38,7 @@ def verify_same_host_in_tracing(self, results):
2938
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
3039
host_set.add(event.source)
3140

32-
assert len(host_set) == 1
41+
assert len(host_set) in [1, 2]
3342
assert 'locally' in "\n".join([event.description for event in events])
3443

3544
trace_id = results.response_future.get_query_trace_ids()[0]
@@ -40,9 +49,13 @@ def verify_same_host_in_tracing(self, results):
4049
LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity)
4150
host_set.add(event.source)
4251

43-
assert len(host_set) == 1
52+
assert len(host_set) in [1, 2]
4453
assert 'locally' in "\n".join([event.activity for event in events])
4554

55+
def get_tablet_record(self, query):
56+
metadata = self.session.cluster.metadata
57+
return metadata._tablets.get_tablet_for_key(query.keyspace, query.table, metadata.token_map.token_class.from_key(query.routing_key))
58+
4659
def verify_same_shard_in_tracing(self, results):
4760
traces = results.get_query_trace()
4861
events = traces.events
@@ -65,24 +78,25 @@ def verify_same_shard_in_tracing(self, results):
6578
assert len(shard_set) == 1
6679
assert 'locally' in "\n".join([event.activity for event in events])
6780

68-
def create_ks_and_cf(self):
69-
self.session.execute(
81+
@classmethod
82+
def create_ks_and_cf(cls, session):
83+
session.execute(
7084
"""
7185
DROP KEYSPACE IF EXISTS test1
7286
"""
7387
)
74-
self.session.execute(
88+
session.execute(
7589
"""
7690
CREATE KEYSPACE test1
7791
WITH replication = {
7892
'class': 'NetworkTopologyStrategy',
79-
'replication_factor': 1
93+
'replication_factor': 2
8094
} AND tablets = {
8195
'initial': 8
8296
}
8397
""")
8498

85-
self.session.execute(
99+
session.execute(
86100
"""
87101
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
88102
""")
@@ -151,3 +165,72 @@ def test_tablets(self):
151165
def test_tablets_shard_awareness(self):
152166
self.query_data_shard_select(self.session)
153167
self.query_data_shard_insert(self.session)
168+
169+
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
170+
def recreate_while_reconnecting(_):
171+
# Kill control connection
172+
conn = self.session.cluster.control_connection._connection
173+
self.session.cluster.control_connection._connection = None
174+
conn.close()
175+
176+
# Drop and recreate ks and table to trigger tablets invalidation
177+
self.create_ks_and_cf(self.cluster.connect())
178+
179+
# Start control connection
180+
self.session.cluster.control_connection._reconnect()
181+
182+
self.run_tablets_invalidation_test(recreate_while_reconnecting)
183+
184+
def test_tablets_invalidation_drop_ks(self):
185+
def drop_ks(_):
186+
# Drop and recreate ks and table to trigger tablets invalidation
187+
self.create_ks_and_cf(self.cluster.connect())
188+
time.sleep(3)
189+
190+
self.run_tablets_invalidation_test(drop_ks)
191+
192+
@pytest.mark.last
193+
def test_tablets_invalidation_decommission_non_cc_node(self):
194+
global CCM_CLUSTER
195+
196+
def decommission_non_cc_node(rec):
197+
# Drop and recreate ks and table to trigger tablets invalidation
198+
for node in CCM_CLUSTER.nodes.values():
199+
if self.cluster.control_connection._connection.endpoint.address == node.network_interfaces["storage"][0]:
200+
# Ignore node that control connection is connected to
201+
continue
202+
for replica in rec.replicas:
203+
if str(replica[0]) == str(node.node_hostid):
204+
node.decommission()
205+
break
206+
else:
207+
continue
208+
break
209+
else:
210+
assert False, "failed to find node to decommission"
211+
time.sleep(10)
212+
213+
self.run_tablets_invalidation_test(decommission_non_cc_node)
214+
215+
216+
def run_tablets_invalidation_test(self, invalidate):
217+
# Make sure driver holds tablet info
218+
# By landing query to the host that is not in replica set
219+
bound = self.session.prepare(
220+
"""
221+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
222+
""").bind([(2)])
223+
224+
rec = None
225+
for host in self.cluster.metadata.all_hosts():
226+
self.session.execute(bound, host=host)
227+
rec = self.get_tablet_record(bound)
228+
if rec is not None:
229+
break
230+
231+
assert rec is not None, "failed to find tablet record"
232+
233+
invalidate(rec)
234+
235+
# Check if tablets information was purged
236+
assert self.get_tablet_record(bound) is None, "tablet was not deleted, invalidation did not work"

0 commit comments

Comments
 (0)