Skip to content

Commit e91f79e

Browse files
Copilotdkropachev
authored andcommitted
Add paging for ControlConnection topology queries
1 parent d37e3e5 commit e91f79e

File tree

2 files changed

+152
-21
lines changed

2 files changed

+152
-21
lines changed

cassandra/cluster.py

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3441,6 +3441,43 @@ def _clear_watcher(conn, expiring_weakref):
34413441
pass
34423442

34433443

3444+
def _fetch_all_pages(connection, query_msg, timeout, fail_on_error=True):
3445+
response = connection.wait_for_response(query_msg, timeout=timeout, fail_on_error=fail_on_error)
3446+
3447+
if not fail_on_error:
3448+
success, result = response
3449+
if not success:
3450+
return response
3451+
else:
3452+
result = response
3453+
3454+
if not result or not result.paging_state:
3455+
return response if not fail_on_error else result
3456+
3457+
all_rows = list(result.parsed_rows) if result.parsed_rows else []
3458+
3459+
# Fetch remaining pages
3460+
while result and result.paging_state:
3461+
query_msg.paging_state = result.paging_state
3462+
page_response = connection.wait_for_response(query_msg, timeout=timeout, fail_on_error=fail_on_error)
3463+
3464+
if not fail_on_error:
3465+
page_success, page_result = page_response
3466+
if not page_success:
3467+
return page_response
3468+
result = page_result
3469+
else:
3470+
result = page_response
3471+
3472+
if result and result.parsed_rows:
3473+
all_rows.extend(result.parsed_rows)
3474+
3475+
if result:
3476+
result.parsed_rows = all_rows
3477+
3478+
return (True, result) if not fail_on_error else result
3479+
3480+
34443481
class ControlConnection(object):
34453482
"""
34463483
Internal
@@ -3634,23 +3671,27 @@ def _try_connect(self, host):
36343671
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36353672
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
36363673
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
3637-
consistency_level=ConsistencyLevel.ONE)
3674+
consistency_level=ConsistencyLevel.ONE,
3675+
fetch_size=self._schema_meta_page_size)
36383676
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
3639-
consistency_level=ConsistencyLevel.ONE)
3640-
(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
3641-
peers_query, local_query, timeout=self._timeout, fail_on_error=False)
3677+
consistency_level=ConsistencyLevel.ONE,
3678+
fetch_size=self._schema_meta_page_size)
3679+
3680+
local_success, local_result = _fetch_all_pages(connection, local_query, self._timeout, fail_on_error=False)
36423681

36433682
if not local_success:
36443683
raise local_result
36453684

3685+
peers_success, peers_result = _fetch_all_pages(connection, peers_query, self._timeout, fail_on_error=False)
3686+
36463687
if not peers_success:
36473688
# error with the peers v2 query, fallback to peers v1
36483689
self._uses_peers_v2 = False
36493690
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36503691
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
3651-
consistency_level=ConsistencyLevel.ONE)
3652-
peers_result = connection.wait_for_response(
3653-
peers_query, timeout=self._timeout)
3692+
consistency_level=ConsistencyLevel.ONE,
3693+
fetch_size=self._schema_meta_page_size)
3694+
peers_result = _fetch_all_pages(connection, peers_query, self._timeout)
36543695

36553696
shared_results = (peers_result, local_result)
36563697
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
@@ -3793,11 +3834,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37933834
log.debug("[control connection] Refreshing node list and token map")
37943835
sel_local = self._SELECT_LOCAL
37953836
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
3796-
consistency_level=cl)
3837+
consistency_level=cl,
3838+
fetch_size=self._schema_meta_page_size)
37973839
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
3798-
consistency_level=cl)
3799-
peers_result, local_result = connection.wait_for_responses(
3800-
peers_query, local_query, timeout=self._timeout)
3840+
consistency_level=cl,
3841+
fetch_size=self._schema_meta_page_size)
3842+
3843+
# Fetch all pages for both queries
3844+
# Note: system.local always has exactly 1 row, so it will never have additional pages
3845+
# system.peers might have multiple pages for very large clusters (>1000 nodes)
3846+
peers_result = _fetch_all_pages(connection, peers_query, self._timeout)
3847+
local_result = _fetch_all_pages(connection, local_query, self._timeout)
38013848

38023849
peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)
38033850

@@ -3852,9 +3899,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38523899
# in system.local. See CASSANDRA-9436.
38533900
local_rpc_address_query = QueryMessage(
38543901
query=maybe_add_timeout_to_query(self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS, self._metadata_request_timeout),
3855-
consistency_level=ConsistencyLevel.ONE)
3856-
success, local_rpc_address_result = connection.wait_for_response(
3857-
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
3902+
consistency_level=ConsistencyLevel.ONE,
3903+
fetch_size=self._schema_meta_page_size)
3904+
# Fetch all pages (system.local table always contains exactly one row, so this is effectively a no-op)
3905+
success, local_rpc_address_result = _fetch_all_pages(
3906+
connection, local_rpc_address_query, self._timeout, fail_on_error=False)
38583907
if success:
38593908
row = dict_factory(
38603909
local_rpc_address_result.column_names,
@@ -4088,13 +4137,19 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
40884137

40894138
while elapsed < total_timeout:
40904139
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
4091-
consistency_level=cl)
4140+
consistency_level=cl,
4141+
fetch_size=self._schema_meta_page_size)
40924142
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
4093-
consistency_level=cl)
4143+
consistency_level=cl,
4144+
fetch_size=self._schema_meta_page_size)
40944145
try:
40954146
timeout = min(self._timeout, total_timeout - elapsed)
4096-
peers_result, local_result = connection.wait_for_responses(
4097-
peers_query, local_query, timeout=timeout)
4147+
4148+
# Fetch all pages if there are more results
4149+
# Note: system.local always has exactly 1 row, so it will never have additional pages
4150+
# system.peers might have multiple pages for very large clusters (>1000 nodes)
4151+
peers_result = _fetch_all_pages(connection, peers_query, timeout)
4152+
local_result = _fetch_all_pages(connection, local_query, timeout)
40984153
except OperationTimedOut as timeout:
40994154
log.debug("[control connection] Timed out waiting for "
41004155
"response during schema agreement check: %s", timeout)

tests/unit/test_control_connection.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
import unittest
1616

1717
from concurrent.futures import ThreadPoolExecutor
18-
from unittest.mock import Mock, ANY, call
18+
from unittest.mock import Mock, ANY, call, MagicMock
1919

20-
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
21-
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
20+
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType, ConsistencyLevel
21+
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS, QueryMessage
2222
from cassandra.cluster import ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
2323
from cassandra.pool import Host
2424
from cassandra.connection import EndPoint, DefaultEndPoint, DefaultEndPointFactory
@@ -167,6 +167,20 @@ def __init__(self):
167167
["192.168.1.2", 9042, "10.0.0.2", 7040, "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]
168168
]
169169
self.wait_for_responses = Mock(return_value=_node_meta_results(self.local_results, self.peer_results))
170+
# Set up wait_for_response to return the appropriate result based on the query
171+
def wait_for_response_side_effect(query_msg, timeout=None, fail_on_error=True):
172+
# Create a result that matches the expected format
173+
result = ResultMessage(kind=RESULT_KIND_ROWS)
174+
# Return peer or local results based on some simple heuristic
175+
if "peers" in query_msg.query.lower():
176+
result.column_names = self.peer_results[0]
177+
result.parsed_rows = self.peer_results[1]
178+
else:
179+
result.column_names = self.local_results[0]
180+
result.parsed_rows = self.local_results[1]
181+
result.paging_state = None
182+
return result
183+
self.wait_for_response = Mock(side_effect=wait_for_response_side_effect)
170184

171185

172186
class FakeTime(object):
@@ -305,6 +319,68 @@ def test_refresh_nodes_and_tokens(self):
305319

306320
assert self.connection.wait_for_responses.call_count == 1
307321

322+
def test_topology_queries_use_paging(self):
323+
"""
324+
Test that topology queries (system.peers and system.local) use fetch_size parameter
325+
"""
326+
# Test during refresh_node_list_and_token_map
327+
self.control_connection.refresh_node_list_and_token_map()
328+
329+
# Verify that wait_for_response was called (now used instead of wait_for_responses)
330+
assert self.connection.wait_for_response.called
331+
332+
# Get the QueryMessage arguments from the calls
333+
calls = self.connection.wait_for_response.call_args_list
334+
335+
# Verify QueryMessage instances have fetch_size set
336+
for call in calls:
337+
query_msg = call[0][0] # First positional argument
338+
assert isinstance(query_msg, QueryMessage)
339+
assert query_msg.fetch_size == self.control_connection._schema_meta_page_size
340+
341+
def test_topology_queries_fetch_all_pages(self):
342+
"""
343+
Test that topology queries fetch all pages when results are paged
344+
"""
345+
from cassandra.cluster import _fetch_all_pages
346+
347+
# Create mock connection
348+
mock_connection = MagicMock()
349+
mock_connection.endpoint = DefaultEndPoint("192.168.1.0")
350+
mock_connection.original_endpoint = mock_connection.endpoint
351+
352+
# Create first page of peers results with paging_state
353+
first_page = ResultMessage(kind=RESULT_KIND_ROWS)
354+
first_page.column_names = ["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"]
355+
first_page.parsed_rows = [["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"], "uuid2"]]
356+
first_page.paging_state = b"has_more_pages"
357+
358+
# Create second page of peers results without paging_state
359+
second_page = ResultMessage(kind=RESULT_KIND_ROWS)
360+
second_page.column_names = ["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens", "host_id"]
361+
second_page.parsed_rows = [["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"], "uuid3"]]
362+
second_page.paging_state = None
363+
364+
# Setup mock: first call returns first page, second call returns second page
365+
mock_connection.wait_for_response.side_effect = [first_page, second_page]
366+
367+
# Test _fetch_remaining_pages
368+
self.control_connection._connection = mock_connection
369+
query_msg = QueryMessage(query="SELECT * FROM system.peers",
370+
consistency_level=ConsistencyLevel.ONE,
371+
fetch_size=self.control_connection._schema_meta_page_size)
372+
373+
result = _fetch_all_pages(mock_connection, query_msg, timeout=5)
374+
375+
# Verify that both pages were fetched
376+
assert len(result.parsed_rows) == 2
377+
assert result.parsed_rows[0][0] == "192.168.1.1"
378+
assert result.parsed_rows[1][0] == "192.168.1.2"
379+
assert result.paging_state is None
380+
381+
# Verify wait_for_response was called twice (first page + second page)
382+
assert mock_connection.wait_for_response.call_count == 2
383+
308384
def test_refresh_nodes_and_tokens_with_invalid_peers(self):
309385
def refresh_and_validate_added_hosts():
310386
self.connection.wait_for_responses = Mock(return_value=_node_meta_results(

0 commit comments

Comments
 (0)