Skip to content

Commit b1b3f86

Browse files
authored
Merge pull request datastax#978 from datastax/python-992_virtual-keyspaces
Python 992 virtual keyspaces
2 parents 8d9fb39 + a6d0626 commit b1b3f86

File tree

3 files changed

+259
-21
lines changed

3 files changed

+259
-21
lines changed

CHANGELOG.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
3.15.0
22
======
3+
4+
Features
5+
--------
6+
* Parse Virtual Keyspace Metadata (PYTHON-992)
7+
38
Bug Fixes
49
---------
510
* Tokenmap.get_replicas returns the wrong value if token coincides with the end of the range (PYTHON-978)

cassandra/metadata.py

Lines changed: 167 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,14 @@ class KeyspaceMetadata(object):
639639
A dict mapping view names to :class:`.MaterializedViewMetadata` instances.
640640
"""
641641

642+
virtual = False
643+
"""
644+
A boolean indicating if this is a virtual keyspace or not. Always ``False``
645+
for clusters running pre-4.0 versions of Cassandra.
646+
647+
.. versionadded:: 3.15
648+
"""
649+
642650
_exc_info = None
643651
""" set if metadata parsing failed """
644652

@@ -671,13 +679,20 @@ def export_as_string(self):
671679
ret += line
672680
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql
673681
return ret
682+
if self.virtual:
683+
return ("/*\nWarning: Keyspace {ks} is a virtual keyspace and cannot be recreated with CQL.\n"
684+
"Structure, for reference:*/\n"
685+
"{cql}\n"
686+
"").format(ks=self.name, cql=cql)
674687
return cql
675688

676689
def as_cql_query(self):
677690
"""
678691
Returns a CQL query string that can be used to recreate just this keyspace,
679692
not including user-defined types and tables.
680693
"""
694+
if self.virtual:
695+
return "// VIRTUAL KEYSPACE {}".format(protect_name(self.name))
681696
ret = "CREATE KEYSPACE %s WITH replication = %s " % (
682697
protect_name(self.name),
683698
self.replication_strategy.export_for_schema())
@@ -1065,11 +1080,21 @@ def primary_key(self):
10651080
_exc_info = None
10661081
""" set if metadata parsing failed """
10671082

1083+
virtual = False
1084+
"""
1085+
A boolean indicating if this is a virtual table or not. Always ``False``
1086+
for clusters running pre-4.0 versions of Cassandra.
1087+
1088+
.. versionadded:: 3.15
1089+
"""
1090+
10681091
@property
10691092
def is_cql_compatible(self):
10701093
"""
10711094
A boolean indicating if this table can be represented as CQL in export
10721095
"""
1096+
if self.virtual:
1097+
return False
10731098
comparator = getattr(self, 'comparator', None)
10741099
if comparator:
10751100
# no compact storage with more than one column beyond PK if there
@@ -1086,7 +1111,7 @@ def is_cql_compatible(self):
10861111
Metadata describing configuration for table extensions
10871112
"""
10881113

1089-
def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None):
1114+
def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None, virtual=False):
10901115
self.keyspace_name = keyspace_name
10911116
self.name = name
10921117
self.partition_key = [] if partition_key is None else partition_key
@@ -1097,6 +1122,7 @@ def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None,
10971122
self.comparator = None
10981123
self.triggers = OrderedDict() if triggers is None else triggers
10991124
self.views = {}
1125+
self.virtual = virtual
11001126

11011127
def export_as_string(self):
11021128
"""
@@ -1116,6 +1142,11 @@ def export_as_string(self):
11161142
ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \
11171143
(self.keyspace_name, self.name)
11181144
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql()
1145+
elif self.virtual:
1146+
ret = ('/*\nWarning: Table {ks}.{tab} is a virtual table and cannot be recreated with CQL.\n'
1147+
'Structure, for reference:\n'
1148+
'{cql}\n*/').format(ks=self.keyspace_name, tab=self.name, cql=self._all_as_cql())
1149+
11191150
else:
11201151
ret = self._all_as_cql()
11211152

@@ -1150,7 +1181,8 @@ def as_cql_query(self, formatted=False):
11501181
creations are not included). If `formatted` is set to :const:`True`,
11511182
extra whitespace will be added to make the query human readable.
11521183
"""
1153-
ret = "CREATE TABLE %s.%s (%s" % (
1184+
ret = "%s TABLE %s.%s (%s" % (
1185+
('VIRTUAL' if self.virtual else 'CREATE'),
11541186
protect_name(self.keyspace_name),
11551187
protect_name(self.name),
11561188
"\n" if formatted else "")
@@ -2064,12 +2096,16 @@ def _query_all(self):
20642096
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl)
20652097
]
20662098

2067-
responses = self.connection.wait_for_responses(*queries, timeout=self.timeout, fail_on_error=False)
2068-
((ks_success, ks_result), (table_success, table_result),
2069-
(col_success, col_result), (types_success, types_result),
2099+
((ks_success, ks_result),
2100+
(table_success, table_result),
2101+
(col_success, col_result),
2102+
(types_success, types_result),
20702103
(functions_success, functions_result),
20712104
(aggregates_success, aggregates_result),
2072-
(triggers_success, triggers_result)) = responses
2105+
(triggers_success, triggers_result)) = (
2106+
self.connection.wait_for_responses(*queries, timeout=self.timeout,
2107+
fail_on_error=False)
2108+
)
20732109

20742110
self.keyspaces_result = self._handle_results(ks_success, ks_result)
20752111
self.tables_result = self._handle_results(table_success, table_result)
@@ -2241,28 +2277,32 @@ def _build_aggregate(aggregate_row):
22412277
aggregate_row['argument_types'], aggregate_row['state_func'], aggregate_row['state_type'],
22422278
aggregate_row['final_func'], aggregate_row['initcond'], aggregate_row['return_type'])
22432279

2244-
def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None):
2280+
def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None, virtual=False):
22452281
keyspace_name = row["keyspace_name"]
22462282
table_name = row[self._table_name_col]
22472283

22482284
col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][table_name]
22492285
trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][table_name]
22502286
index_rows = index_rows or self.keyspace_table_index_rows[keyspace_name][table_name]
22512287

2252-
table_meta = TableMetadataV3(keyspace_name, table_name)
2288+
table_meta = TableMetadataV3(keyspace_name, table_name, virtual=virtual)
22532289
try:
22542290
table_meta.options = self._build_table_options(row)
22552291
flags = row.get('flags', set())
22562292
if flags:
22572293
compact_static = False
22582294
table_meta.is_compact_storage = 'dense' in flags or 'super' in flags or 'compound' not in flags
22592295
is_dense = 'dense' in flags
2296+
elif virtual:
2297+
compact_static = False
2298+
table_meta.is_compact_storage = False
2299+
is_dense = False
22602300
else:
22612301
compact_static = True
22622302
table_meta.is_compact_storage = True
22632303
is_dense = False
22642304

2265-
self._build_table_columns(table_meta, col_rows, compact_static, is_dense)
2305+
self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual)
22662306

22672307
for trigger_row in trigger_rows:
22682308
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
@@ -2284,7 +2324,7 @@ def _build_table_options(self, row):
22842324
""" Setup the mostly-non-schema table options, like caching settings """
22852325
return dict((o, row.get(o)) for o in self.recognized_table_options if o in row)
22862326

2287-
def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False):
2327+
def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False, virtual=False):
22882328
# partition key
22892329
partition_rows = [r for r in col_rows
22902330
if r.get('kind', None) == "partition_key"]
@@ -2375,14 +2415,17 @@ def _query_all(self):
23752415
QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl)
23762416
]
23772417

2378-
responses = self.connection.wait_for_responses(*queries, timeout=self.timeout, fail_on_error=False)
2379-
((ks_success, ks_result), (table_success, table_result),
2380-
(col_success, col_result), (types_success, types_result),
2418+
((ks_success, ks_result),
2419+
(table_success, table_result),
2420+
(col_success, col_result),
2421+
(types_success, types_result),
23812422
(functions_success, functions_result),
23822423
(aggregates_success, aggregates_result),
23832424
(triggers_success, triggers_result),
23842425
(indexes_success, indexes_result),
2385-
(views_success, views_result)) = responses
2426+
(views_success, views_result)) = self.connection.wait_for_responses(
2427+
*queries, timeout=self.timeout, fail_on_error=False
2428+
)
23862429

23872430
self.keyspaces_result = self._handle_results(ks_success, ks_result)
23882431
self.tables_result = self._handle_results(table_success, table_result)
@@ -2425,6 +2468,116 @@ class SchemaParserV4(SchemaParserV3):
24252468
)
24262469
)
24272470

2471+
_SELECT_VIRTUAL_KEYSPACES = 'SELECT * from system_virtual_schema.keyspaces'
2472+
_SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables'
2473+
_SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns'
2474+
2475+
def __init__(self, connection, timeout):
2476+
super(SchemaParserV4, self).__init__(connection, timeout)
2477+
self.virtual_keyspaces_rows = defaultdict(list)
2478+
self.virtual_tables_rows = defaultdict(list)
2479+
self.virtual_columns_rows = defaultdict(lambda: defaultdict(list))
2480+
2481+
def _query_all(self):
2482+
cl = ConsistencyLevel.ONE
2483+
# todo: this duplicates V3; we should find a way for _query_all methods
2484+
# to extend each other.
2485+
queries = [
2486+
# copied from V3
2487+
QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2488+
QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2489+
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2490+
QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2491+
QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2492+
QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2493+
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2494+
QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2495+
QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl),
2496+
# V4-only queries
2497+
QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl),
2498+
QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl),
2499+
QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl)
2500+
]
2501+
2502+
responses = self.connection.wait_for_responses(
2503+
*queries, timeout=self.timeout, fail_on_error=False)
2504+
(
2505+
# copied from V3
2506+
(ks_success, ks_result),
2507+
(table_success, table_result),
2508+
(col_success, col_result),
2509+
(types_success, types_result),
2510+
(functions_success, functions_result),
2511+
(aggregates_success, aggregates_result),
2512+
(triggers_success, triggers_result),
2513+
(indexes_success, indexes_result),
2514+
(views_success, views_result),
2515+
# V4-only responses
2516+
(virtual_ks_success, virtual_ks_result),
2517+
(virtual_table_success, virtual_table_result),
2518+
(virtual_column_success, virtual_column_result)
2519+
) = responses
2520+
2521+
# copied from V3
2522+
self.keyspaces_result = self._handle_results(ks_success, ks_result)
2523+
self.tables_result = self._handle_results(table_success, table_result)
2524+
self.columns_result = self._handle_results(col_success, col_result)
2525+
self.triggers_result = self._handle_results(triggers_success, triggers_result)
2526+
self.types_result = self._handle_results(types_success, types_result)
2527+
self.functions_result = self._handle_results(functions_success, functions_result)
2528+
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2529+
self.indexes_result = self._handle_results(indexes_success, indexes_result)
2530+
self.views_result = self._handle_results(views_success, views_result)
2531+
# V4-only results
2532+
self.virtual_keyspaces_result = self._handle_results(virtual_ks_success,
2533+
virtual_ks_result)
2534+
self.virtual_tables_result = self._handle_results(virtual_table_success,
2535+
virtual_table_result)
2536+
self.virtual_columns_result = self._handle_results(virtual_column_success,
2537+
virtual_column_result)
2538+
self._aggregate_results()
2539+
2540+
def _aggregate_results(self):
2541+
super(SchemaParserV4, self)._aggregate_results()
2542+
2543+
m = self.virtual_tables_rows
2544+
for row in self.virtual_tables_result:
2545+
m[row["keyspace_name"]].append(row)
2546+
2547+
m = self.virtual_columns_rows
2548+
for row in self.virtual_columns_result:
2549+
ks_name = row['keyspace_name']
2550+
tab_name = row[self._table_name_col]
2551+
m[ks_name][tab_name].append(row)
2552+
2553+
def get_all_keyspaces(self):
2554+
for x in super(SchemaParserV4, self).get_all_keyspaces():
2555+
yield x
2556+
2557+
for row in self.virtual_keyspaces_result:
2558+
ks_name = row['keyspace_name']
2559+
keyspace_meta = self._build_keyspace_metadata(row)
2560+
keyspace_meta.virtual = True
2561+
2562+
for table_row in self.virtual_tables_rows.get(ks_name, []):
2563+
table_name = table_row[self._table_name_col]
2564+
2565+
col_rows = self.virtual_columns_rows[ks_name][table_name]
2566+
keyspace_meta._add_table_metadata(
2567+
self._build_table_metadata(table_row,
2568+
col_rows=col_rows,
2569+
virtual=True)
2570+
)
2571+
yield keyspace_meta
2572+
2573+
@staticmethod
2574+
def _build_keyspace_metadata_internal(row):
2575+
# necessary fields that aren't int virtual ks
2576+
row["durable_writes"] = row.get("durable_writes", None)
2577+
row["replication"] = row.get("replication", {})
2578+
row["replication"]["class"] = row["replication"].get("class", None)
2579+
return super(SchemaParserV4, SchemaParserV4)._build_keyspace_metadata_internal(row)
2580+
24282581

24292582
class TableMetadataV3(TableMetadata):
24302583
compaction_options = {}

0 commit comments

Comments
 (0)