Skip to content

Commit be0bcf2

Browse files
committed
Invalidate tablets when table or keyspace is deleted
Delete tablets for table or keyspace when one is deleted. When host is removed from cluster delete all tablets that have this host in it. Ensure that if it happens when control connection is reconnection.
1 parent 69191ac commit be0bcf2

File tree

3 files changed

+140
-13
lines changed

3 files changed

+140
-13
lines changed

cassandra/metadata.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import struct
2727
import random
2828
import itertools
29+
from typing import Optional
2930

3031
murmur3 = None
3132
try:
@@ -168,10 +169,13 @@ def _rebuild_all(self, parser):
168169
current_keyspaces = set()
169170
for keyspace_meta in parser.get_all_keyspaces():
170171
current_keyspaces.add(keyspace_meta.name)
171-
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
172+
old_keyspace_meta: Optional[KeyspaceMetadata] = self.keyspaces.get(keyspace_meta.name, None)
172173
self.keyspaces[keyspace_meta.name] = keyspace_meta
173174
if old_keyspace_meta:
174175
self._keyspace_updated(keyspace_meta.name)
176+
for table_name in old_keyspace_meta.tables.keys():
177+
if table_name not in keyspace_meta.tables:
178+
self._table_removed(keyspace_meta.name, table_name)
175179
else:
176180
self._keyspace_added(keyspace_meta.name)
177181

@@ -265,17 +269,22 @@ def _drop_aggregate(self, keyspace, aggregate):
265269
except KeyError:
266270
pass
267271

272+
def _table_removed(self, keyspace, table):
273+
self._tablets.drop_tablets(keyspace, table)
274+
268275
def _keyspace_added(self, ksname):
269276
if self.token_map:
270277
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
271278

272279
def _keyspace_updated(self, ksname):
273280
if self.token_map:
274281
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
282+
self._tablets.drop_tablets(ksname)
275283

276284
def _keyspace_removed(self, ksname):
277285
if self.token_map:
278286
self.token_map.remove_keyspace(ksname)
287+
self._tablets.drop_tablets(ksname)
279288

280289
def rebuild_token_map(self, partitioner, token_map):
281290
"""
@@ -340,11 +349,13 @@ def add_or_return_host(self, host):
340349
return host, True
341350

342351
def remove_host(self, host):
352+
self._tablets.drop_tablets_by_host_id(host.host_id)
343353
with self._hosts_lock:
344354
self._host_id_by_endpoint.pop(host.endpoint, False)
345355
return bool(self._hosts.pop(host.host_id, False))
346356

347357
def remove_host_by_host_id(self, host_id, endpoint=None):
358+
self._tablets.drop_tablets_by_host_id(host_id)
348359
with self._hosts_lock:
349360
if endpoint and self._host_id_by_endpoint[endpoint] == host_id:
350361
self._host_id_by_endpoint.pop(endpoint, False)

cassandra/tablets.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from threading import Lock
2+
from typing import Optional
3+
from uuid import UUID
24

35

46
class Tablet(object):
@@ -32,6 +34,12 @@ def from_row(first_token, last_token, replicas):
3234
return tablet
3335
return None
3436

37+
def replica_contains_host_id(self, uuid: UUID) -> bool:
38+
for replica in self.replicas:
39+
if replica[0] == uuid:
40+
return True
41+
return False
42+
3543

3644
class Tablets(object):
3745
_lock = None
@@ -51,6 +59,33 @@ def get_tablet_for_key(self, keyspace, table, t):
5159
return tablet[id]
5260
return None
5361

62+
def drop_tablets(self, keyspace: str, table: Optional[str] = None):
63+
with self._lock:
64+
if table is not None:
65+
self._tablets.pop((keyspace, table), None)
66+
return
67+
68+
to_be_deleted = []
69+
for key in self._tablets.keys():
70+
if key[0] == keyspace:
71+
to_be_deleted.append(key)
72+
73+
for key in to_be_deleted:
74+
del self._tablets[key]
75+
76+
def drop_tablets_by_host_id(self, host_id: Optional[UUID]):
77+
if host_id is None:
78+
return
79+
with self._lock:
80+
for key, tablets in self._tablets.items():
81+
to_be_deleted = []
82+
for tablet_id, tablet in enumerate(tablets):
83+
if tablet.replica_contains_host_id(host_id):
84+
to_be_deleted.append(tablet_id)
85+
86+
for tablet_id in reversed(to_be_deleted):
87+
tablets.pop(tablet_id)
88+
5489
def add_tablet(self, keyspace, table, tablet):
5590
with self._lock:
5691
tablets_for_table = self._tablets.setdefault((keyspace, table), [])

tests/integration/experiments/test_tablets.py

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
import time
2+
3+
import pytest
4+
15
from cassandra.cluster import Cluster
26
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
37

48
from tests.integration import PROTOCOL_VERSION, use_cluster
59
from tests.unit.test_host_connection_pool import LOGGER
610

11+
CCM_CLUSTER = None
12+
713
def setup_module():
8-
use_cluster('tablets', [3], start=True)
14+
global CCM_CLUSTER
15+
16+
CCM_CLUSTER = use_cluster('tablets', [3], start=True)
17+
918

1019
class TestTabletsIntegration:
1120
@classmethod
@@ -14,22 +23,22 @@ def setup_class(cls):
1423
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
1524
reconnection_policy=ConstantReconnectionPolicy(1))
1625
cls.session = cls.cluster.connect()
17-
cls.create_ks_and_cf(cls)
26+
cls.create_ks_and_cf(cls.session)
1827
cls.create_data(cls.session)
1928

2029
@classmethod
2130
def teardown_class(cls):
2231
cls.cluster.shutdown()
2332

24-
def verify_same_host_in_tracing(self, results):
33+
def verify_hosts_in_tracing(self, results, expected):
2534
traces = results.get_query_trace()
2635
events = traces.events
2736
host_set = set()
2837
for event in events:
2938
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
3039
host_set.add(event.source)
3140

32-
assert len(host_set) == 1
41+
assert len(host_set) == expected
3342
assert 'locally' in "\n".join([event.description for event in events])
3443

3544
trace_id = results.response_future.get_query_trace_ids()[0]
@@ -40,9 +49,13 @@ def verify_same_host_in_tracing(self, results):
4049
LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity)
4150
host_set.add(event.source)
4251

43-
assert len(host_set) == 1
52+
assert len(host_set) == expected
4453
assert 'locally' in "\n".join([event.activity for event in events])
4554

55+
def get_tablet_record(self, query):
56+
metadata = self.session.cluster.metadata
57+
return metadata._tablets.get_tablet_for_key(query.keyspace, query.table, metadata.token_map.token_class.from_key(query.routing_key))
58+
4659
def verify_same_shard_in_tracing(self, results):
4760
traces = results.get_query_trace()
4861
events = traces.events
@@ -65,24 +78,25 @@ def verify_same_shard_in_tracing(self, results):
6578
assert len(shard_set) == 1
6679
assert 'locally' in "\n".join([event.activity for event in events])
6780

68-
def create_ks_and_cf(self):
69-
self.session.execute(
81+
@classmethod
82+
def create_ks_and_cf(cls, session):
83+
session.execute(
7084
"""
7185
DROP KEYSPACE IF EXISTS test1
7286
"""
7387
)
74-
self.session.execute(
88+
session.execute(
7589
"""
7690
CREATE KEYSPACE test1
7791
WITH replication = {
7892
'class': 'NetworkTopologyStrategy',
79-
'replication_factor': 1
93+
'replication_factor': 2
8094
} AND tablets = {
8195
'initial': 8
8296
}
8397
""")
8498

85-
self.session.execute(
99+
session.execute(
86100
"""
87101
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
88102
""")
@@ -120,7 +134,7 @@ def query_data_host_select(self, session, verify_in_tracing=True):
120134
results = session.execute(bound, trace=True)
121135
assert results == [(2, 2, 0)]
122136
if verify_in_tracing:
123-
self.verify_same_host_in_tracing(results)
137+
self.verify_hosts_in_tracing(results, 1)
124138

125139
def query_data_shard_insert(self, session, verify_in_tracing=True):
126140
prepared = session.prepare(
@@ -142,7 +156,7 @@ def query_data_host_insert(self, session, verify_in_tracing=True):
142156
bound = prepared.bind([(52), (1), (2)])
143157
results = session.execute(bound, trace=True)
144158
if verify_in_tracing:
145-
self.verify_same_host_in_tracing(results)
159+
self.verify_hosts_in_tracing(results, 2)
146160

147161
def test_tablets(self):
148162
self.query_data_host_select(self.session)
@@ -151,3 +165,70 @@ def test_tablets(self):
151165
def test_tablets_shard_awareness(self):
152166
self.query_data_shard_select(self.session)
153167
self.query_data_shard_insert(self.session)
168+
169+
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
170+
def recreate_while_reconnecting(_):
171+
# Kill control connection
172+
conn = self.session.cluster.control_connection._connection
173+
self.session.cluster.control_connection._connection = None
174+
conn.close()
175+
176+
# Drop and recreate ks and table to trigger tablets invalidation
177+
self.create_ks_and_cf(self.cluster.connect())
178+
179+
# Start control connection
180+
self.session.cluster.control_connection._reconnect()
181+
182+
self.run_tablets_invalidation_test(recreate_while_reconnecting)
183+
184+
def test_tablets_invalidation_drop_ks(self):
185+
def drop_ks(_):
186+
# Drop and recreate ks and table to trigger tablets invalidation
187+
self.create_ks_and_cf(self.cluster.connect())
188+
time.sleep(3)
189+
190+
self.run_tablets_invalidation_test(drop_ks)
191+
192+
@pytest.mark.last
193+
def test_tablets_invalidation_decommission_non_cc_node(self):
194+
def decommission_non_cc_node(rec):
195+
# Drop and recreate ks and table to trigger tablets invalidation
196+
for node in CCM_CLUSTER.nodes.values():
197+
if self.cluster.control_connection._connection.endpoint.address == node.network_interfaces["storage"][0]:
198+
# Ignore node that control connection is connected to
199+
continue
200+
for replica in rec.replicas:
201+
if str(replica[0]) == str(node.node_hostid):
202+
node.decommission()
203+
break
204+
else:
205+
continue
206+
break
207+
else:
208+
assert False, "failed to find node to decommission"
209+
time.sleep(10)
210+
211+
self.run_tablets_invalidation_test(decommission_non_cc_node)
212+
213+
214+
def run_tablets_invalidation_test(self, invalidate):
215+
# Make sure driver holds tablet info
216+
# By landing query to the host that is not in replica set
217+
bound = self.session.prepare(
218+
"""
219+
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
220+
""").bind([(2)])
221+
222+
rec = None
223+
for host in self.cluster.metadata.all_hosts():
224+
self.session.execute(bound, host=host)
225+
rec = self.get_tablet_record(bound)
226+
if rec is not None:
227+
break
228+
229+
assert rec is not None, "failed to find tablet record"
230+
231+
invalidate(rec)
232+
233+
# Check if tablets information was purged
234+
assert self.get_tablet_record(bound) is None, "tablet was not deleted, invalidation did not work"

0 commit comments

Comments
 (0)