Skip to content

Commit a6d0626

Browse files
committed
add virtual metadata to parser
1 parent 0a90113 commit a6d0626

File tree

3 files changed

+233
-8
lines changed

3 files changed

+233
-8
lines changed

CHANGELOG.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
3.15.0
22
======
3+
4+
Features
5+
--------
6+
* Parse Virtual Keyspace Metadata (PYTHON-992)
7+
38
Bug Fixes
49
---------
510
* Tokenmap.get_replicas returns the wrong value if token coincides with the end of the range (PYTHON-978)

cassandra/metadata.py

Lines changed: 152 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,14 @@ class KeyspaceMetadata(object):
639639
A dict mapping view names to :class:`.MaterializedViewMetadata` instances.
640640
"""
641641

642+
virtual = False
643+
"""
644+
A boolean indicating if this is a virtual keyspace or not. Always ``False``
645+
for clusters running pre-4.0 versions of Cassandra.
646+
647+
.. versionadded:: 3.15
648+
"""
649+
642650
_exc_info = None
643651
""" set if metadata parsing failed """
644652

@@ -671,13 +679,20 @@ def export_as_string(self):
671679
ret += line
672680
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql
673681
return ret
682+
if self.virtual:
683+
return ("/*\nWarning: Keyspace {ks} is a virtual keyspace and cannot be recreated with CQL.\n"
684+
"Structure, for reference:*/\n"
685+
"{cql}\n"
686+
"").format(ks=self.name, cql=cql)
674687
return cql
675688

676689
def as_cql_query(self):
677690
"""
678691
Returns a CQL query string that can be used to recreate just this keyspace,
679692
not including user-defined types and tables.
680693
"""
694+
if self.virtual:
695+
return "// VIRTUAL KEYSPACE {}".format(protect_name(self.name))
681696
ret = "CREATE KEYSPACE %s WITH replication = %s " % (
682697
protect_name(self.name),
683698
self.replication_strategy.export_for_schema())
@@ -1065,11 +1080,21 @@ def primary_key(self):
10651080
_exc_info = None
10661081
""" set if metadata parsing failed """
10671082

1083+
virtual = False
1084+
"""
1085+
A boolean indicating if this is a virtual table or not. Always ``False``
1086+
for clusters running pre-4.0 versions of Cassandra.
1087+
1088+
.. versionadded:: 3.15
1089+
"""
1090+
10681091
@property
10691092
def is_cql_compatible(self):
10701093
"""
10711094
A boolean indicating if this table can be represented as CQL in export
10721095
"""
1096+
if self.virtual:
1097+
return False
10731098
comparator = getattr(self, 'comparator', None)
10741099
if comparator:
10751100
# no compact storage with more than one column beyond PK if there
@@ -1086,7 +1111,7 @@ def is_cql_compatible(self):
10861111
Metadata describing configuration for table extensions
10871112
"""
10881113

1089-
def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None):
1114+
def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None, virtual=False):
10901115
self.keyspace_name = keyspace_name
10911116
self.name = name
10921117
self.partition_key = [] if partition_key is None else partition_key
@@ -1097,6 +1122,7 @@ def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None,
10971122
self.comparator = None
10981123
self.triggers = OrderedDict() if triggers is None else triggers
10991124
self.views = {}
1125+
self.virtual = virtual
11001126

11011127
def export_as_string(self):
11021128
"""
@@ -1116,6 +1142,11 @@ def export_as_string(self):
11161142
ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \
11171143
(self.keyspace_name, self.name)
11181144
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql()
1145+
elif self.virtual:
1146+
ret = ('/*\nWarning: Table {ks}.{tab} is a virtual table and cannot be recreated with CQL.\n'
1147+
'Structure, for reference:\n'
1148+
'{cql}\n*/').format(ks=self.keyspace_name, tab=self.name, cql=self._all_as_cql())
1149+
11191150
else:
11201151
ret = self._all_as_cql()
11211152

@@ -1150,7 +1181,8 @@ def as_cql_query(self, formatted=False):
11501181
creations are not included). If `formatted` is set to :const:`True`,
11511182
extra whitespace will be added to make the query human readable.
11521183
"""
1153-
ret = "CREATE TABLE %s.%s (%s" % (
1184+
ret = "%s TABLE %s.%s (%s" % (
1185+
('VIRTUAL' if self.virtual else 'CREATE'),
11541186
protect_name(self.keyspace_name),
11551187
protect_name(self.name),
11561188
"\n" if formatted else "")
@@ -2245,28 +2277,32 @@ def _build_aggregate(aggregate_row):
22452277
aggregate_row['argument_types'], aggregate_row['state_func'], aggregate_row['state_type'],
22462278
aggregate_row['final_func'], aggregate_row['initcond'], aggregate_row['return_type'])
22472279

2248-
def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None):
2280+
def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None, virtual=False):
22492281
keyspace_name = row["keyspace_name"]
22502282
table_name = row[self._table_name_col]
22512283

22522284
col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][table_name]
22532285
trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][table_name]
22542286
index_rows = index_rows or self.keyspace_table_index_rows[keyspace_name][table_name]
22552287

2256-
table_meta = TableMetadataV3(keyspace_name, table_name)
2288+
table_meta = TableMetadataV3(keyspace_name, table_name, virtual=virtual)
22572289
try:
22582290
table_meta.options = self._build_table_options(row)
22592291
flags = row.get('flags', set())
22602292
if flags:
22612293
compact_static = False
22622294
table_meta.is_compact_storage = 'dense' in flags or 'super' in flags or 'compound' not in flags
22632295
is_dense = 'dense' in flags
2296+
elif virtual:
2297+
compact_static = False
2298+
table_meta.is_compact_storage = False
2299+
is_dense = False
22642300
else:
22652301
compact_static = True
22662302
table_meta.is_compact_storage = True
22672303
is_dense = False
22682304

2269-
self._build_table_columns(table_meta, col_rows, compact_static, is_dense)
2305+
self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual)
22702306

22712307
for trigger_row in trigger_rows:
22722308
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
@@ -2288,7 +2324,7 @@ def _build_table_options(self, row):
22882324
""" Setup the mostly-non-schema table options, like caching settings """
22892325
return dict((o, row.get(o)) for o in self.recognized_table_options if o in row)
22902326

2291-
def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False):
2327+
def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False, virtual=False):
22922328
# partition key
22932329
partition_rows = [r for r in col_rows
22942330
if r.get('kind', None) == "partition_key"]
@@ -2432,6 +2468,116 @@ class SchemaParserV4(SchemaParserV3):
24322468
)
24332469
)
24342470

2471+
_SELECT_VIRTUAL_KEYSPACES = 'SELECT * from system_virtual_schema.keyspaces'
2472+
_SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables'
2473+
_SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns'
2474+
2475+
def __init__(self, connection, timeout):
2476+
super(SchemaParserV4, self).__init__(connection, timeout)
2477+
self.virtual_keyspaces_rows = defaultdict(list)
2478+
self.virtual_tables_rows = defaultdict(list)
2479+
self.virtual_columns_rows = defaultdict(lambda: defaultdict(list))
2480+
2481+
def _query_all(self):
2482+
cl = ConsistencyLevel.ONE
2483+
# todo: this duplicates V3; we should find a way for _query_all methods
2484+
# to extend each other.
2485+
queries = [
2486+
# copied from V3
2487+
QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2488+
QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2489+
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2490+
QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2491+
QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2492+
QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2493+
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2494+
QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2495+
QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl),
2496+
# V4-only queries
2497+
QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl),
2498+
QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl),
2499+
QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl)
2500+
]
2501+
2502+
responses = self.connection.wait_for_responses(
2503+
*queries, timeout=self.timeout, fail_on_error=False)
2504+
(
2505+
# copied from V3
2506+
(ks_success, ks_result),
2507+
(table_success, table_result),
2508+
(col_success, col_result),
2509+
(types_success, types_result),
2510+
(functions_success, functions_result),
2511+
(aggregates_success, aggregates_result),
2512+
(triggers_success, triggers_result),
2513+
(indexes_success, indexes_result),
2514+
(views_success, views_result),
2515+
# V4-only responses
2516+
(virtual_ks_success, virtual_ks_result),
2517+
(virtual_table_success, virtual_table_result),
2518+
(virtual_column_success, virtual_column_result)
2519+
) = responses
2520+
2521+
# copied from V3
2522+
self.keyspaces_result = self._handle_results(ks_success, ks_result)
2523+
self.tables_result = self._handle_results(table_success, table_result)
2524+
self.columns_result = self._handle_results(col_success, col_result)
2525+
self.triggers_result = self._handle_results(triggers_success, triggers_result)
2526+
self.types_result = self._handle_results(types_success, types_result)
2527+
self.functions_result = self._handle_results(functions_success, functions_result)
2528+
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2529+
self.indexes_result = self._handle_results(indexes_success, indexes_result)
2530+
self.views_result = self._handle_results(views_success, views_result)
2531+
# V4-only results
2532+
self.virtual_keyspaces_result = self._handle_results(virtual_ks_success,
2533+
virtual_ks_result)
2534+
self.virtual_tables_result = self._handle_results(virtual_table_success,
2535+
virtual_table_result)
2536+
self.virtual_columns_result = self._handle_results(virtual_column_success,
2537+
virtual_column_result)
2538+
self._aggregate_results()
2539+
2540+
def _aggregate_results(self):
2541+
super(SchemaParserV4, self)._aggregate_results()
2542+
2543+
m = self.virtual_tables_rows
2544+
for row in self.virtual_tables_result:
2545+
m[row["keyspace_name"]].append(row)
2546+
2547+
m = self.virtual_columns_rows
2548+
for row in self.virtual_columns_result:
2549+
ks_name = row['keyspace_name']
2550+
tab_name = row[self._table_name_col]
2551+
m[ks_name][tab_name].append(row)
2552+
2553+
def get_all_keyspaces(self):
2554+
for x in super(SchemaParserV4, self).get_all_keyspaces():
2555+
yield x
2556+
2557+
for row in self.virtual_keyspaces_result:
2558+
ks_name = row['keyspace_name']
2559+
keyspace_meta = self._build_keyspace_metadata(row)
2560+
keyspace_meta.virtual = True
2561+
2562+
for table_row in self.virtual_tables_rows.get(ks_name, []):
2563+
table_name = table_row[self._table_name_col]
2564+
2565+
col_rows = self.virtual_columns_rows[ks_name][table_name]
2566+
keyspace_meta._add_table_metadata(
2567+
self._build_table_metadata(table_row,
2568+
col_rows=col_rows,
2569+
virtual=True)
2570+
)
2571+
yield keyspace_meta
2572+
2573+
@staticmethod
2574+
def _build_keyspace_metadata_internal(row):
2575+
# necessary fields that aren't int virtual ks
2576+
row["durable_writes"] = row.get("durable_writes", None)
2577+
row["replication"] = row.get("replication", {})
2578+
row["replication"]["class"] = row["replication"].get("class", None)
2579+
return super(SchemaParserV4, SchemaParserV4)._build_keyspace_metadata_internal(row)
2580+
24352581

24362582
class TableMetadataV3(TableMetadata):
24372583
compaction_options = {}

tests/integration/standard/test_metadata.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
except ImportError:
1818
import unittest # noqa
1919

20+
from collections import defaultdict
2021
import difflib
22+
import logging
2123
import six
2224
import sys
2325
import time
@@ -36,11 +38,14 @@
3638
BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase,
3739
BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION,
3840
get_supported_protocol_versions, greaterthanorequalcass30, lessthancass30, local,
39-
greaterthancass20)
41+
greaterthancass20, greaterthanorequalcass40)
4042

4143
from tests.integration import greaterthancass21
4244

4345

46+
log = logging.getLogger(__name__)
47+
48+
4449
def setup_module():
4550
use_singledc()
4651

@@ -2334,7 +2339,7 @@ def test_group_keys_by_host(self):
23342339

23352340
stmt = """SELECT * FROM {}.{}
23362341
WHERE k = ? """.format(self.ks_name, self.ks_name)
2337-
keys = ((1, ), (2, ), (2, ), (3, ))
2342+
keys = ((1,), (2,), (2,), (3,))
23382343
self._assert_group_keys_by_host(keys, self.ks_name, stmt)
23392344

23402345
def _assert_group_keys_by_host(self, keys, table_name, stmt):
@@ -2347,3 +2352,72 @@ def _assert_group_keys_by_host(self, keys, table_name, stmt):
23472352
hosts = self.cluster.metadata.get_replicas(self.ks_name, routing_key)
23482353
self.assertEqual(1, len(hosts)) # RF is 1 for this keyspace
23492354
self.assertIn(key, keys_per_host[hosts[0]])
2355+
2356+
2357+
class VirtualKeypaceTest(BasicSharedKeyspaceUnitTestCase):
2358+
virtual_ks_names = ('system_virtual_schema', 'system_views')
2359+
2360+
virtual_ks_structure = {
2361+
'system_views': {
2362+
# map from table names to sets of column names for unordered
2363+
# comparison
2364+
'caches': {'capacity_bytes', 'entry_count', 'hit_count',
2365+
'hit_ratio', 'name', 'recent_hit_rate_per_second',
2366+
'recent_request_rate_per_second', 'request_count',
2367+
'size_bytes'},
2368+
'clients': {'address', 'connection_stage', 'driver_name',
2369+
'driver_version', 'hostname', 'port',
2370+
'protocol_version', 'request_count',
2371+
'ssl_cipher_suite', 'ssl_enabled', 'ssl_protocol',
2372+
'username'},
2373+
'sstable_tasks': {'keyspace_name', 'kind', 'progress',
2374+
'table_name', 'task_id', 'total', 'unit'},
2375+
'thread_pools': {'active_tasks', 'active_tasks_limit',
2376+
'blocked_tasks', 'blocked_tasks_all_time',
2377+
'completed_tasks', 'name', 'pending_tasks'}
2378+
},
2379+
'system_virtual_schema': {
2380+
'columns': {'clustering_order', 'column_name',
2381+
'column_name_bytes', 'keyspace_name', 'kind',
2382+
'position', 'table_name', 'type'},
2383+
'keyspaces': {'keyspace_name'},
2384+
'tables': {'comment', 'keyspace_name', 'table_name'}
2385+
}
2386+
}
2387+
2388+
def test_existing_keyspaces_have_correct_virtual_tags(self):
2389+
for name, ks in self.cluster.metadata.keyspaces.items():
2390+
if name in self.virtual_ks_names:
2391+
self.assertTrue(
2392+
ks.virtual,
2393+
'incorrect .virtual value for {}'.format(name)
2394+
)
2395+
else:
2396+
self.assertFalse(
2397+
ks.virtual,
2398+
'incorrect .virtual value for {}'.format(name)
2399+
)
2400+
2401+
@greaterthanorequalcass40
2402+
def test_expected_keyspaces_exist_and_are_virtual(self):
2403+
for name in self.virtual_ks_names:
2404+
self.assertTrue(
2405+
self.cluster.metadata.keyspaces[name].virtual,
2406+
'incorrect .virtual value for {}'.format(name)
2407+
)
2408+
2409+
@greaterthanorequalcass40
2410+
def test_virtual_keyspaces_have_expected_schema_structure(self):
2411+
self.maxDiff = None
2412+
2413+
ingested_virtual_ks_structure = defaultdict(dict)
2414+
for ks_name, ks in self.cluster.metadata.keyspaces.items():
2415+
if not ks.virtual:
2416+
continue
2417+
for tab_name, tab in ks.tables.items():
2418+
ingested_virtual_ks_structure[ks_name][tab_name] = set(
2419+
tab.columns.keys()
2420+
)
2421+
2422+
self.assertDictEqual(ingested_virtual_ks_structure,
2423+
self.virtual_ks_structure)

0 commit comments

Comments
 (0)