Skip to content

Commit dcd8db3

Browse files
committed
Invalidate tablets when table or keyspace is deleted
Delete tablets for table or keyspace when one is deleted. When host is removed from cluster delete all tablets that have this host in it. Ensure that if it happens when control connection is reconnection.
1 parent 347f332 commit dcd8db3

File tree

4 files changed

+135
-10
lines changed

4 files changed

+135
-10
lines changed

cassandra/cluster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3641,6 +3641,7 @@ def _set_new_connection(self, conn):
36413641
with self._lock:
36423642
old = self._connection
36433643
self._connection = conn
3644+
self.refresh_schema()
36443645

36453646
if old:
36463647
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)

cassandra/metadata.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,14 @@ def _rebuild_all(self, parser):
168168
current_keyspaces = set()
169169
for keyspace_meta in parser.get_all_keyspaces():
170170
current_keyspaces.add(keyspace_meta.name)
171-
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
171+
old_keyspace_meta: KeyspaceMetadata = self.keyspaces.get(keyspace_meta.name, None)
172172
self.keyspaces[keyspace_meta.name] = keyspace_meta
173173
if old_keyspace_meta:
174174
self._keyspace_updated(keyspace_meta.name)
175+
for table_name, old_table_meta in old_keyspace_meta.tables.items():
176+
new_table_meta: TableMetadata = keyspace_meta.tables.get(table_name, None)
177+
if new_table_meta is None:
178+
self._table_removed(keyspace_meta.name, table_name)
175179
else:
176180
self._keyspace_added(keyspace_meta.name)
177181

@@ -265,17 +269,22 @@ def _drop_aggregate(self, keyspace, aggregate):
265269
except KeyError:
266270
pass
267271

272+
def _table_removed(self, keyspace, table):
273+
self._tablets.drop_tablet(keyspace, table)
274+
268275
def _keyspace_added(self, ksname):
269276
if self.token_map:
270277
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
271278

272279
def _keyspace_updated(self, ksname):
273280
if self.token_map:
274281
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
282+
self._tablets.drop_tablet(ksname)
275283

276284
def _keyspace_removed(self, ksname):
277285
if self.token_map:
278286
self.token_map.remove_keyspace(ksname)
287+
self._tablets.drop_tablet(ksname)
279288

280289
def rebuild_token_map(self, partitioner, token_map):
281290
"""
@@ -340,11 +349,13 @@ def add_or_return_host(self, host):
340349
return host, True
341350

342351
def remove_host(self, host):
352+
self._tablets.drop_tablet_by_host_id(host.host_id)
343353
with self._hosts_lock:
344354
self._host_id_by_endpoint.pop(host.endpoint, False)
345355
return bool(self._hosts.pop(host.host_id, False))
346356

347357
def remove_host_by_host_id(self, host_id, endpoint=None):
358+
self._tablets.drop_tablet_by_host_id(host_id)
348359
with self._hosts_lock:
349360
if endpoint and self._host_id_by_endpoint[endpoint] == host_id:
350361
self._host_id_by_endpoint.pop(endpoint, False)

cassandra/tablets.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from threading import Lock
2+
from uuid import UUID
23

34

45
class Tablet(object):
@@ -32,6 +33,12 @@ def from_row(first_token, last_token, replicas):
3233
return tablet
3334
return None
3435

36+
def replica_contains_host_id(self, uuid: UUID) -> bool:
37+
for replica in self.replicas:
38+
if replica[0] == uuid:
39+
return True
40+
return False
41+
3542

3643
class Tablets(object):
3744
_lock = None
@@ -51,6 +58,38 @@ def get_tablet_for_key(self, keyspace, table, t):
5158
return tablet[id]
5259
return None
5360

61+
def drop_tablet(self, keyspace: str, table: str = None):
62+
with self._lock:
63+
if table is not None:
64+
self._tablets.pop((keyspace, table), None)
65+
return
66+
67+
to_be_deleted = []
68+
for key in self._tablets.keys():
69+
if key[0] == keyspace:
70+
to_be_deleted.append(key)
71+
72+
for key in to_be_deleted:
73+
del self._tablets[key]
74+
75+
def drop_tablet_by_host_id(self, host_id: UUID):
76+
if host_id is None:
77+
return
78+
with self._lock:
79+
for key, tablets in self._tablets.items():
80+
to_be_deleted = []
81+
for tablet_id, tablet in enumerate(tablets):
82+
if tablet.replica_contains_host_id(host_id):
83+
to_be_deleted.append(tablet_id)
84+
85+
if len(to_be_deleted) == 0:
86+
continue
87+
88+
for tablet_id in reversed(to_be_deleted):
89+
tablets.pop(tablet_id)
90+
91+
self._tablets[key] = tablets
92+
5493
def add_tablet(self, keyspace, table, tablet):
5594
with self._lock:
5695
tablets_for_table = self._tablets.setdefault((keyspace, table), [])

tests/integration/experiments/test_tablets.py

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
import time
22
import unittest
3-
import pytest
4-
import os
3+
54
from cassandra.cluster import Cluster
65
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
76

87
from tests.integration import PROTOCOL_VERSION, use_cluster
98
from tests.unit.test_host_connection_pool import LOGGER
109

10+
CCM_CLUSTER = None
11+
1112
def setup_module():
12-
use_cluster('tablets', [3], start=True)
13+
global CCM_CLUSTER
14+
15+
CCM_CLUSTER = use_cluster('tablets', [3], start=True)
1316

1417
class TestTabletsIntegration(unittest.TestCase):
1518
@classmethod
@@ -18,7 +21,7 @@ def setup_class(cls):
1821
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
1922
reconnection_policy=ConstantReconnectionPolicy(1))
2023
cls.session = cls.cluster.connect()
21-
cls.create_ks_and_cf(cls)
24+
cls.create_ks_and_cf(cls.session)
2225
cls.create_data(cls.session)
2326

2427
@classmethod
@@ -47,6 +50,10 @@ def verify_same_host_in_tracing(self, results):
4750
self.assertEqual(len(host_set), 1)
4851
self.assertIn('locally', "\n".join([event.activity for event in events]))
4952

53+
def get_tablet_record(self, query):
54+
metadata = self.session.cluster.metadata
55+
return metadata._tablets.get_tablet_for_key(query.keyspace, query.table, metadata.token_map.token_class.from_key(query.routing_key))
56+
5057
def verify_same_shard_in_tracing(self, results):
5158
traces = results.get_query_trace()
5259
events = traces.events
@@ -69,24 +76,25 @@ def verify_same_shard_in_tracing(self, results):
6976
self.assertEqual(len(shard_set), 1)
7077
self.assertIn('locally', "\n".join([event.activity for event in events]))
7178

72-
def create_ks_and_cf(self):
73-
self.session.execute(
79+
@classmethod
80+
def create_ks_and_cf(cls, session):
81+
session.execute(
7482
"""
7583
DROP KEYSPACE IF EXISTS test1
7684
"""
7785
)
78-
self.session.execute(
86+
session.execute(
7987
"""
8088
CREATE KEYSPACE test1
8189
WITH replication = {
8290
'class': 'NetworkTopologyStrategy',
83-
'replication_factor': 1
91+
'replication_factor': 2
8492
} AND tablets = {
8593
'initial': 8
8694
}
8795
""")
8896

89-
self.session.execute(
97+
session.execute(
9098
"""
9199
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
92100
""")
@@ -109,6 +117,8 @@ def query_data_shard_select(self, session, verify_in_tracing=True):
109117
""")
110118

111119
bound = prepared.bind([(2)])
120+
assert self.get_tablet_record(bound) is not None
121+
112122
results = session.execute(bound, trace=True)
113123
self.assertEqual(results, [(2, 2, 0)])
114124
if verify_in_tracing:
@@ -121,6 +131,8 @@ def query_data_host_select(self, session, verify_in_tracing=True):
121131
""")
122132

123133
bound = prepared.bind([(2)])
134+
assert self.get_tablet_record(bound) is not None
135+
124136
results = session.execute(bound, trace=True)
125137
self.assertEqual(results, [(2, 2, 0)])
126138
if verify_in_tracing:
@@ -133,6 +145,8 @@ def query_data_shard_insert(self, session, verify_in_tracing=True):
133145
""")
134146

135147
bound = prepared.bind([(51), (1), (2)])
148+
assert self.get_tablet_record(bound) is not None
149+
136150
results = session.execute(bound, trace=True)
137151
if verify_in_tracing:
138152
self.verify_same_shard_in_tracing(results)
@@ -144,6 +158,8 @@ def query_data_host_insert(self, session, verify_in_tracing=True):
144158
""")
145159

146160
bound = prepared.bind([(52), (1), (2)])
161+
assert self.get_tablet_record(bound) is not None
162+
147163
results = session.execute(bound, trace=True)
148164
if verify_in_tracing:
149165
self.verify_same_host_in_tracing(results)
@@ -155,3 +171,61 @@ def test_tablets(self):
155171
def test_tablets_shard_awareness(self):
156172
self.query_data_shard_select(self.session)
157173
self.query_data_shard_insert(self.session)
174+
175+
def test_tablets_invalidation(self):
176+
global CCM_CLUSTER
177+
178+
def recreate_and_wait(_):
179+
# Drop and recreate ks and table to trigger tablets invalidation
180+
self.create_ks_and_cf(self.cluster.connect())
181+
time.sleep(3)
182+
183+
def recreate_while_reconnecting(_):
184+
# Kill control connection
185+
conn = self.session.cluster.control_connection._connection
186+
self.session.cluster.control_connection._connection = None
187+
conn.close()
188+
189+
# Drop and recreate ks and table to trigger tablets invalidation
190+
self.create_ks_and_cf(self.cluster.connect())
191+
192+
# Start control connection
193+
self.session.cluster.control_connection._reconnect()
194+
195+
def decommission_non_cc_node(rec):
196+
# Drop and recreate ks and table to trigger tablets invalidation
197+
for node in CCM_CLUSTER.nodes.values():
198+
if self.cluster.control_connection._connection.endpoint.address == node.network_interfaces["storage"][0]:
199+
# Ignore node that control connection is connected to
200+
continue
201+
for replica in rec.replicas:
202+
if str(replica[0]) == str(node.node_hostid):
203+
node.decommission()
204+
break
205+
else:
206+
continue
207+
break
208+
else:
209+
assert False, "failed to find node to decommission"
210+
time.sleep(10)
211+
212+
self.run_tablets_invalidation_test(recreate_and_wait)
213+
self.run_tablets_invalidation_test(recreate_while_reconnecting)
214+
self.run_tablets_invalidation_test(decommission_non_cc_node)
215+
216+
217+
def run_tablets_invalidation_test(self, invalidate):
218+
# Make sure driver holds tablet info
219+
bound = self.session.prepare(
220+
"""
221+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
222+
""").bind([(2)])
223+
self.session.execute(bound)
224+
225+
rec = self.get_tablet_record(bound)
226+
assert rec is not None
227+
228+
invalidate(rec)
229+
230+
# Check if tablets information was purged
231+
assert self.get_tablet_record(bound) is None

0 commit comments

Comments
 (0)