@@ -3441,6 +3441,57 @@ def _clear_watcher(conn, expiring_weakref):
34413441 pass
34423442
34433443
3444+ def _fetch_remaining_pages (connection , query_msg , timeout , fail_on_error = True ):
3445+ """
3446+ Fetch all pages for a paged query.
3447+ Executes the query and fetches all pages if the result is paged.
3448+
3449+ :param connection: The connection to use for querying
3450+ :param query_msg: The QueryMessage to execute (must have fetch_size set for paging)
3451+ :param timeout: Timeout for each query operation
3452+ :param fail_on_error: If True, raise exceptions on query failure. If False, return (success, result) tuple. Defaults to True (same as connection.wait_for_response)
3453+ :return: If fail_on_error=True, returns the result with all parsed_rows combined from all pages.
3454+ If fail_on_error=False, returns (success, result) tuple where result has all parsed_rows combined.
3455+ """
3456+ # Execute the query to get the first page
3457+ response = connection .wait_for_response (query_msg , timeout = timeout , fail_on_error = fail_on_error )
3458+
3459+ # Handle fail_on_error=False case where response is (success, result) tuple
3460+ if not fail_on_error :
3461+ success , result = response
3462+ if not success :
3463+ return response # Return (False, exception) tuple
3464+ else :
3465+ result = response
3466+
3467+ if not result or not result .paging_state :
3468+ return response if not fail_on_error else result
3469+
3470+ all_rows = list (result .parsed_rows ) if result .parsed_rows else []
3471+
3472+ # Fetch remaining pages
3473+ while result and result .paging_state :
3474+ query_msg .paging_state = result .paging_state
3475+ page_response = connection .wait_for_response (query_msg , timeout = timeout , fail_on_error = fail_on_error )
3476+
3477+ if not fail_on_error :
3478+ page_success , page_result = page_response
3479+ if not page_success :
3480+ return page_response # Return (False, exception) tuple
3481+ result = page_result
3482+ else :
3483+ result = page_response
3484+
3485+ if result and result .parsed_rows :
3486+ all_rows .extend (result .parsed_rows )
3487+
3488+ # Update the result with all rows
3489+ if result :
3490+ result .parsed_rows = all_rows
3491+
3492+ return (True , result ) if not fail_on_error else result
3493+
3494+
34443495class ControlConnection (object ):
34453496 """
34463497 Internal
@@ -3634,23 +3685,27 @@ def _try_connect(self, host):
36343685 sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
36353686 sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
36363687 peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3637- consistency_level = ConsistencyLevel .ONE )
3688+ consistency_level = ConsistencyLevel .ONE ,
3689+ fetch_size = self ._schema_meta_page_size )
36383690 local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3639- consistency_level = ConsistencyLevel .ONE )
3640- (peers_success , peers_result ), (local_success , local_result ) = connection .wait_for_responses (
3641- peers_query , local_query , timeout = self ._timeout , fail_on_error = False )
3691+ consistency_level = ConsistencyLevel .ONE ,
3692+ fetch_size = self ._schema_meta_page_size )
3693+
3694+ local_success , local_result = _fetch_remaining_pages (connection , local_query , self ._timeout , fail_on_error = False )
36423695
36433696 if not local_success :
36443697 raise local_result
36453698
3699+ peers_success , peers_result = _fetch_remaining_pages (connection , peers_query , self ._timeout , fail_on_error = False )
3700+
36463701 if not peers_success :
36473702 # error with the peers v2 query, fallback to peers v1
36483703 self ._uses_peers_v2 = False
36493704 sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
36503705 peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3651- consistency_level = ConsistencyLevel .ONE )
3652- peers_result = connection . wait_for_response (
3653- peers_query , timeout = self ._timeout )
3706+ consistency_level = ConsistencyLevel .ONE ,
3707+ fetch_size = self . _schema_meta_page_size )
3708+ peers_result = _fetch_remaining_pages ( connection , peers_query , self ._timeout )
36543709
36553710 shared_results = (peers_result , local_result )
36563711 self ._refresh_node_list_and_token_map (connection , preloaded_results = shared_results )
@@ -3793,11 +3848,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37933848 log .debug ("[control connection] Refreshing node list and token map" )
37943849 sel_local = self ._SELECT_LOCAL
37953850 peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3796- consistency_level = cl )
3851+ consistency_level = cl ,
3852+ fetch_size = self ._schema_meta_page_size )
37973853 local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3798- consistency_level = cl )
3799- peers_result , local_result = connection .wait_for_responses (
3800- peers_query , local_query , timeout = self ._timeout )
3854+ consistency_level = cl ,
3855+ fetch_size = self ._schema_meta_page_size )
3856+
3857+ # Fetch all pages for both queries
3858+ # Note: system.local always has exactly 1 row, so it will never have additional pages
3859+ # system.peers might have multiple pages for very large clusters (>1000 nodes)
3860+ peers_result = _fetch_remaining_pages (connection , peers_query , self ._timeout )
3861+ local_result = _fetch_remaining_pages (connection , local_query , self ._timeout )
38013862
38023863 peers_result = dict_factory (peers_result .column_names , peers_result .parsed_rows )
38033864
@@ -3852,9 +3913,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38523913 # in system.local. See CASSANDRA-9436.
38533914 local_rpc_address_query = QueryMessage (
38543915 query = maybe_add_timeout_to_query (self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS , self ._metadata_request_timeout ),
3855- consistency_level = ConsistencyLevel .ONE )
3856- success , local_rpc_address_result = connection .wait_for_response (
3857- local_rpc_address_query , timeout = self ._timeout , fail_on_error = False )
3916+ consistency_level = ConsistencyLevel .ONE ,
3917+ fetch_size = self ._schema_meta_page_size )
3918+ # Fetch all pages (system.local table always contains exactly one row, so this is effectively a no-op)
3919+ success , local_rpc_address_result = _fetch_remaining_pages (
3920+ connection , local_rpc_address_query , self ._timeout , fail_on_error = False )
38583921 if success :
38593922 row = dict_factory (
38603923 local_rpc_address_result .column_names ,
@@ -4088,13 +4151,19 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
40884151
40894152 while elapsed < total_timeout :
40904153 peers_query = QueryMessage (query = maybe_add_timeout_to_query (select_peers_query , self ._metadata_request_timeout ),
4091- consistency_level = cl )
4154+ consistency_level = cl ,
4155+ fetch_size = self ._schema_meta_page_size )
40924156 local_query = QueryMessage (query = maybe_add_timeout_to_query (self ._SELECT_SCHEMA_LOCAL , self ._metadata_request_timeout ),
4093- consistency_level = cl )
4157+ consistency_level = cl ,
4158+ fetch_size = self ._schema_meta_page_size )
40944159 try :
40954160 timeout = min (self ._timeout , total_timeout - elapsed )
4096- peers_result , local_result = connection .wait_for_responses (
4097- peers_query , local_query , timeout = timeout )
4161+
4162+ # Fetch all pages if there are more results
4163+ # Note: system.local always has exactly 1 row, so it will never have additional pages
4164+ # system.peers might have multiple pages for very large clusters (>1000 nodes)
4165+ peers_result = _fetch_remaining_pages (connection , peers_query , timeout )
4166+ local_result = _fetch_remaining_pages (connection , local_query , timeout )
40984167 except OperationTimedOut as timeout :
40994168 log .debug ("[control connection] Timed out waiting for "
41004169 "response during schema agreement check: %s" , timeout )
0 commit comments