Skip to content

Commit 6ad836c

Browse files
committed
Metadata/Schema paginated queries
New Cluster property `schema_metadata_page_size` that controls the page size of metadata queries, defaults to 1000. Works only on CQL protocol v3/v4 Fixes: #139
1 parent acc7263 commit 6ad836c

File tree

3 files changed

+108
-61
lines changed

3 files changed

+108
-61
lines changed

cassandra/cluster.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,17 @@ def schema_metadata_enabled(self):
10381038
def schema_metadata_enabled(self, enabled):
10391039
self.control_connection._schema_meta_enabled = bool(enabled)
10401040

1041+
@property
1042+
def schema_metadata_page_size(self):
1043+
"""
1044+
Number controling page size when schema metadata is fetched.
1045+
"""
1046+
return self.control_connection._schema_meta_page_size
1047+
1048+
@schema_metadata_page_size.setter
1049+
def schema_metadata_page_size(self, size):
1050+
self.control_connection._schema_meta_page_size = size
1051+
10411052
@property
10421053
def token_metadata_enabled(self):
10431054
"""
@@ -1108,6 +1119,7 @@ def __init__(self,
11081119
connect_timeout=5,
11091120
schema_metadata_enabled=True,
11101121
token_metadata_enabled=True,
1122+
schema_metadata_page_size=1000,
11111123
address_translator=None,
11121124
status_event_refresh_window=2,
11131125
prepare_on_all_hosts=True,
@@ -1373,7 +1385,8 @@ def __init__(self,
13731385
self, self.control_connection_timeout,
13741386
self.schema_event_refresh_window, self.topology_event_refresh_window,
13751387
self.status_event_refresh_window,
1376-
schema_metadata_enabled, token_metadata_enabled)
1388+
schema_metadata_enabled, token_metadata_enabled,
1389+
schema_meta_page_size=schema_metadata_page_size)
13771390

13781391
if client_id is None:
13791392
self.client_id = uuid.uuid4()
@@ -3485,6 +3498,7 @@ class PeersQueryType(object):
34853498

34863499
_schema_meta_enabled = True
34873500
_token_meta_enabled = True
3501+
_schema_meta_page_size = 1000
34883502

34893503
_uses_peers_v2 = True
34903504

@@ -3496,7 +3510,8 @@ def __init__(self, cluster, timeout,
34963510
topology_event_refresh_window,
34973511
status_event_refresh_window,
34983512
schema_meta_enabled=True,
3499-
token_meta_enabled=True):
3513+
token_meta_enabled=True,
3514+
schema_meta_page_size=1000):
35003515
# use a weak reference to allow the Cluster instance to be GC'ed (and
35013516
# shutdown) since implementing __del__ disables the cycle detector
35023517
self._cluster = weakref.proxy(cluster)
@@ -3508,6 +3523,7 @@ def __init__(self, cluster, timeout,
35083523
self._status_event_refresh_window = status_event_refresh_window
35093524
self._schema_meta_enabled = schema_meta_enabled
35103525
self._token_meta_enabled = token_meta_enabled
3526+
self._schema_meta_page_size = schema_meta_page_size
35113527

35123528
self._lock = RLock()
35133529
self._schema_agreement_lock = Lock()
@@ -3732,7 +3748,7 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
37323748
log.debug("Skipping schema refresh due to lack of schema agreement")
37333749
return False
37343750

3735-
self._cluster.metadata.refresh(connection, self._timeout, **kwargs)
3751+
self._cluster.metadata.refresh(connection, self._timeout, fetch_size=self._schema_meta_page_size, **kwargs)
37363752

37373753
return True
37383754

cassandra/metadata.py

Lines changed: 79 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from threading import RLock
2727
import struct
2828
import random
29+
import itertools
2930

3031
murmur3 = None
3132
try:
@@ -132,11 +133,11 @@ def export_schema_as_string(self):
132133
"""
133134
return "\n\n".join(ks.export_as_string() for ks in self.keyspaces.values())
134135

135-
def refresh(self, connection, timeout, target_type=None, change_type=None, **kwargs):
136+
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None, **kwargs):
136137

137138
server_version = self.get_host(connection.original_endpoint).release_version
138139
dse_version = self.get_host(connection.original_endpoint).dse_version
139-
parser = get_schema_parser(connection, server_version, dse_version, timeout)
140+
parser = get_schema_parser(connection, server_version, dse_version, timeout, fetch_size)
140141

141142
if not target_type:
142143
self._rebuild_all(parser)
@@ -1924,7 +1925,7 @@ def __init__(self, connection, timeout):
19241925
self.connection = connection
19251926
self.timeout = timeout
19261927

1927-
def _handle_results(self, success, result, expected_failures=tuple()):
1928+
def _handle_results(self, success, result, expected_failures=tuple(), query_msg=None, timeout=None):
19281929
"""
19291930
Given a bool and a ResultSet (the form returned per result from
19301931
Connection.wait_for_responses), return a dictionary containing the
@@ -1945,9 +1946,26 @@ def _handle_results(self, success, result, expected_failures=tuple()):
19451946
query failed, but raised an instance of an expected failure class, this
19461947
will ignore the failure and return an empty list.
19471948
"""
1949+
timeout = timeout or self.timeout
19481950
if not success and isinstance(result, expected_failures):
19491951
return []
19501952
elif success:
1953+
if result.paging_state and query_msg:
1954+
def get_next_pages():
1955+
next_result = None
1956+
while True:
1957+
query_msg.paging_state = next_result.paging_state if next_result else result.paging_state
1958+
next_success, next_result = self.connection.wait_for_response(query_msg, timeout=timeout,
1959+
fail_on_error=False)
1960+
if not next_success and isinstance(next_result, expected_failures):
1961+
continue
1962+
elif not next_success:
1963+
raise next_result
1964+
if not next_result.paging_state:
1965+
break
1966+
yield next_result.parsed_rows
1967+
1968+
result.parsed_rows += itertools.chain(*get_next_pages())
19511969
return dict_factory(result.column_names, result.parsed_rows) if result else []
19521970
else:
19531971
raise result
@@ -2532,8 +2550,9 @@ class SchemaParserV3(SchemaParserV22):
25322550
'read_repair_chance',
25332551
'speculative_retry')
25342552

2535-
def __init__(self, connection, timeout):
2553+
def __init__(self, connection, timeout, fetch_size):
25362554
super(SchemaParserV3, self).__init__(connection, timeout)
2555+
self.fetch_size = fetch_size
25372556
self.indexes_result = []
25382557
self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list))
25392558
self.keyspace_view_rows = defaultdict(list)
@@ -2726,17 +2745,18 @@ def _build_trigger_metadata(table_metadata, row):
27262745

27272746
def _query_all(self):
27282747
cl = ConsistencyLevel.ONE
2748+
fetch_size = self.fetch_size
27292749
queries = [
2730-
QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2731-
QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2732-
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2733-
QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2734-
QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2735-
QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2736-
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2737-
QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2738-
QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl),
2739-
QueryMessage(query=self._SELECT_SCYLLA, consistency_level=cl)
2750+
QueryMessage(query=self._SELECT_KEYSPACES, fetch_size=fetch_size, consistency_level=cl),
2751+
QueryMessage(query=self._SELECT_TABLES, fetch_size=fetch_size, consistency_level=cl),
2752+
QueryMessage(query=self._SELECT_COLUMNS, fetch_size=fetch_size, consistency_level=cl),
2753+
QueryMessage(query=self._SELECT_TYPES, fetch_size=fetch_size, consistency_level=cl),
2754+
QueryMessage(query=self._SELECT_FUNCTIONS, fetch_size=fetch_size, consistency_level=cl),
2755+
QueryMessage(query=self._SELECT_AGGREGATES, fetch_size=fetch_size, consistency_level=cl),
2756+
QueryMessage(query=self._SELECT_TRIGGERS, fetch_size=fetch_size, consistency_level=cl),
2757+
QueryMessage(query=self._SELECT_INDEXES, fetch_size=fetch_size, consistency_level=cl),
2758+
QueryMessage(query=self._SELECT_VIEWS, fetch_size=fetch_size, consistency_level=cl),
2759+
QueryMessage(query=self._SELECT_SCYLLA, fetch_size=fetch_size, consistency_level=cl)
27402760
]
27412761

27422762
((ks_success, ks_result),
@@ -2752,16 +2772,16 @@ def _query_all(self):
27522772
*queries, timeout=self.timeout, fail_on_error=False
27532773
)
27542774

2755-
self.keyspaces_result = self._handle_results(ks_success, ks_result)
2756-
self.tables_result = self._handle_results(table_success, table_result)
2757-
self.columns_result = self._handle_results(col_success, col_result)
2758-
self.triggers_result = self._handle_results(triggers_success, triggers_result)
2759-
self.types_result = self._handle_results(types_success, types_result)
2760-
self.functions_result = self._handle_results(functions_success, functions_result)
2761-
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2762-
self.indexes_result = self._handle_results(indexes_success, indexes_result)
2763-
self.views_result = self._handle_results(views_success, views_result)
2764-
self.scylla_result = self._handle_results(scylla_success, scylla_result, expected_failures=(InvalidRequest,))
2775+
self.keyspaces_result = self._handle_results(ks_success, ks_result, query_msg=queries[0])
2776+
self.tables_result = self._handle_results(table_success, table_result, query_msg=queries[1])
2777+
self.columns_result = self._handle_results(col_success, col_result, query_msg=queries[2])
2778+
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[6])
2779+
self.types_result = self._handle_results(types_success, types_result, query_msg=queries[3])
2780+
self.functions_result = self._handle_results(functions_success, functions_result, query_msg=queries[4])
2781+
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result, query_msg=queries[5])
2782+
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[7])
2783+
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[8])
2784+
self.scylla_result = self._handle_results(scylla_success, scylla_result, expected_failures=(InvalidRequest,), query_msg=queries[9])
27652785

27662786
self._aggregate_results()
27672787

@@ -2814,8 +2834,8 @@ class SchemaParserV4(SchemaParserV3):
28142834
_SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables'
28152835
_SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns'
28162836

2817-
def __init__(self, connection, timeout):
2818-
super(SchemaParserV4, self).__init__(connection, timeout)
2837+
def __init__(self, connection, timeout, fetch_size):
2838+
super(SchemaParserV4, self).__init__(connection, timeout, fetch_size)
28192839
self.virtual_keyspaces_rows = defaultdict(list)
28202840
self.virtual_tables_rows = defaultdict(list)
28212841
self.virtual_columns_rows = defaultdict(lambda: defaultdict(list))
@@ -2824,21 +2844,22 @@ def _query_all(self):
28242844
cl = ConsistencyLevel.ONE
28252845
# todo: this duplicates V3; we should find a way for _query_all methods
28262846
# to extend each other.
2847+
fetch_size = self.fetch_size
28272848
queries = [
28282849
# copied from V3
2829-
QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2830-
QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2831-
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2832-
QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2833-
QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2834-
QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2835-
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2836-
QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2837-
QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl),
2850+
QueryMessage(query=self._SELECT_KEYSPACES, fetch_size=fetch_size, consistency_level=cl),
2851+
QueryMessage(query=self._SELECT_TABLES, fetch_size=fetch_size, consistency_level=cl),
2852+
QueryMessage(query=self._SELECT_COLUMNS, fetch_size=fetch_size, consistency_level=cl),
2853+
QueryMessage(query=self._SELECT_TYPES, fetch_size=fetch_size, consistency_level=cl),
2854+
QueryMessage(query=self._SELECT_FUNCTIONS, fetch_size=fetch_size, consistency_level=cl),
2855+
QueryMessage(query=self._SELECT_AGGREGATES, fetch_size=fetch_size, consistency_level=cl),
2856+
QueryMessage(query=self._SELECT_TRIGGERS, fetch_size=fetch_size, consistency_level=cl),
2857+
QueryMessage(query=self._SELECT_INDEXES, fetch_size=fetch_size, consistency_level=cl),
2858+
QueryMessage(query=self._SELECT_VIEWS, fetch_size=fetch_size, consistency_level=cl),
28382859
# V4-only queries
2839-
QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl),
2840-
QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl),
2841-
QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl)
2860+
QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, fetch_size=fetch_size, consistency_level=cl),
2861+
QueryMessage(query=self._SELECT_VIRTUAL_TABLES, fetch_size=fetch_size, consistency_level=cl),
2862+
QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, fetch_size=fetch_size, consistency_level=cl)
28422863
]
28432864

28442865
responses = self.connection.wait_for_responses(
@@ -2861,29 +2882,29 @@ def _query_all(self):
28612882
) = responses
28622883

28632884
# copied from V3
2864-
self.keyspaces_result = self._handle_results(ks_success, ks_result)
2865-
self.tables_result = self._handle_results(table_success, table_result)
2866-
self.columns_result = self._handle_results(col_success, col_result)
2867-
self.triggers_result = self._handle_results(triggers_success, triggers_result)
2868-
self.types_result = self._handle_results(types_success, types_result)
2869-
self.functions_result = self._handle_results(functions_success, functions_result)
2870-
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2871-
self.indexes_result = self._handle_results(indexes_success, indexes_result)
2872-
self.views_result = self._handle_results(views_success, views_result)
2885+
self.keyspaces_result = self._handle_results(ks_success, ks_result, query_msg=queries[0])
2886+
self.tables_result = self._handle_results(table_success, table_result, query_msg=queries[1])
2887+
self.columns_result = self._handle_results(col_success, col_result, query_msg=queries[2])
2888+
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[6])
2889+
self.types_result = self._handle_results(types_success, types_result, query_msg=queries[3])
2890+
self.functions_result = self._handle_results(functions_success, functions_result, query_msg=queries[4])
2891+
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result, query_msg=queries[5])
2892+
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[7])
2893+
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[8])
28732894
# V4-only results
28742895
# These tables don't exist in some DSE versions reporting 4.X so we can
28752896
# ignore them if we got an error
28762897
self.virtual_keyspaces_result = self._handle_results(
28772898
virtual_ks_success, virtual_ks_result,
2878-
expected_failures=(InvalidRequest,)
2899+
expected_failures=(InvalidRequest,), query_msg=queries[9]
28792900
)
28802901
self.virtual_tables_result = self._handle_results(
28812902
virtual_table_success, virtual_table_result,
2882-
expected_failures=(InvalidRequest,)
2903+
expected_failures=(InvalidRequest,), query_msg=queries[10]
28832904
)
28842905
self.virtual_columns_result = self._handle_results(
28852906
virtual_column_success, virtual_column_result,
2886-
expected_failures=(InvalidRequest,)
2907+
expected_failures=(InvalidRequest,), query_msg=queries[11]
28872908
)
28882909

28892910
self._aggregate_results()
@@ -2948,8 +2969,8 @@ class SchemaParserDSE68(SchemaParserDSE67):
29482969

29492970
_table_metadata_class = TableMetadataDSE68
29502971

2951-
def __init__(self, connection, timeout):
2952-
super(SchemaParserDSE68, self).__init__(connection, timeout)
2972+
def __init__(self, connection, timeout, fetch_size):
2973+
super(SchemaParserDSE68, self).__init__(connection, timeout, fetch_size)
29532974
self.keyspace_table_vertex_rows = defaultdict(lambda: defaultdict(list))
29542975
self.keyspace_table_edge_rows = defaultdict(lambda: defaultdict(list))
29552976

@@ -3314,21 +3335,21 @@ def __init__(
33143335
self.to_clustering_columns = to_clustering_columns
33153336

33163337

3317-
def get_schema_parser(connection, server_version, dse_version, timeout):
3338+
def get_schema_parser(connection, server_version, dse_version, timeout, fetch_size=None):
33183339
version = Version(server_version)
33193340
if dse_version:
33203341
v = Version(dse_version)
33213342
if v >= Version('6.8.0'):
3322-
return SchemaParserDSE68(connection, timeout)
3343+
return SchemaParserDSE68(connection, timeout, fetch_size)
33233344
elif v >= Version('6.7.0'):
3324-
return SchemaParserDSE67(connection, timeout)
3345+
return SchemaParserDSE67(connection, timeout, fetch_size)
33253346
elif v >= Version('6.0.0'):
3326-
return SchemaParserDSE60(connection, timeout)
3347+
return SchemaParserDSE60(connection, timeout, fetch_size)
33273348

33283349
if version >= Version('4-a'):
3329-
return SchemaParserV4(connection, timeout)
3350+
return SchemaParserV4(connection, timeout, fetch_size)
33303351
elif version >= Version('3.0.0'):
3331-
return SchemaParserV3(connection, timeout)
3352+
return SchemaParserV3(connection, timeout, fetch_size)
33323353
else:
33333354
# we could further specialize by version. Right now just refactoring the
33343355
# multi-version parser we have as of C* 2.2.0rc1.

tests/integration/standard/test_metadata.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,16 @@ class Ext1(Ext0):
10471047
self.assertIn(Ext0.after_table_cql(view_meta, Ext0.name, ext_map[Ext0.name]), new_cql)
10481048
self.assertIn(Ext1.after_table_cql(view_meta, Ext1.name, ext_map[Ext1.name]), new_cql)
10491049

1050+
def test_metadata_pagination(self):
1051+
self.cluster.refresh_schema_metadata()
1052+
for i in range(10):
1053+
self.session.execute("CREATE TABLE %s.%s_%d (a int PRIMARY KEY, b map<text, text>)"
1054+
% (self.keyspace_name, self.function_table_name, i))
1055+
1056+
self.cluster.schema_metadata_page_size = 5
1057+
self.cluster.refresh_schema_metadata()
1058+
self.assertEqual(len(self.cluster.metadata.keyspaces[self.keyspace_name].tables), 10)
1059+
10501060

10511061
class TestCodeCoverage(unittest.TestCase):
10521062

0 commit comments

Comments
 (0)