diff --git a/cassandra/__init__.py b/cassandra/__init__.py index c1ee195aa2..6f5fd372fc 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -170,9 +170,9 @@ class ProtocolVersion(object): DSE private protocol v2, supported in DSE 6.0+ """ - SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3) + SUPPORTED_VERSIONS = (V5, V4, V3) """ - A tuple of all supported protocol versions + A tuple of all supported protocol versions for ScyllaDB, including future v5 version. """ BETA_VERSIONS = (V6,) @@ -223,14 +223,6 @@ def uses_error_code_map(cls, version): def uses_keyspace_flag(cls, version): return version >= cls.V5 and version != cls.DSE_V1 - @classmethod - def has_continuous_paging_support(cls, version): - return version >= cls.DSE_V1 - - @classmethod - def has_continuous_paging_next_pages(cls, version): - return version >= cls.DSE_V2 - @classmethod def has_checksumming_support(cls, version): return cls.V5 <= version < cls.DSE_V1 diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 68ab370f93..e38ed3306c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -51,7 +51,7 @@ from cassandra.connection import (ConnectionException, ConnectionShutdown, ConnectionHeartbeat, ProtocolVersionUnsupported, EndPoint, DefaultEndPoint, DefaultEndPointFactory, - ContinuousPagingState, SniEndPointFactory, ConnectionBusy) + SniEndPointFactory, ConnectionBusy) from cassandra.cqltypes import UserType import cassandra.cqltypes as types from cassandra.encoder import Encoder @@ -672,7 +672,7 @@ class Cluster(object): server will be automatically used. """ - protocol_version = ProtocolVersion.DSE_V2 + protocol_version = ProtocolVersion.V5 """ The maximum version of the native protocol to use. @@ -680,7 +680,7 @@ class Cluster(object): If not set in the constructor, the driver will automatically downgrade version based on a negotiation with the server, but it is most efficient - to set this to the maximum supported by your version of Cassandra. + to set this to the maximum supported by your version of ScyllaDB. Setting this will also prevent conflicting versions negotiated if your cluster is upgraded. @@ -2692,7 +2692,6 @@ def __init__(self, cluster, hosts, keyspace=None): raise NoHostAvailable(msg, [h.address for h in hosts]) self.session_id = uuid.uuid4() - self._graph_paging_available = self._check_graph_paging_available() if self.cluster.column_encryption_policy is not None: try: @@ -2889,26 +2888,10 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro def _maybe_set_graph_paging(self, execution_profile): graph_paging = execution_profile.continuous_paging_options if execution_profile.continuous_paging_options is _NOT_SET: - graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None + graph_paging = None execution_profile.continuous_paging_options = graph_paging - def _check_graph_paging_available(self): - """Verify if we can enable graph paging. This executed only once when the session is created.""" - - if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version): - return False - - for host in self.cluster.metadata.all_hosts(): - if host.dse_version is None: - return False - - version = Version(host.dse_version) - if version < _GRAPH_PAGING_MIN_DSE_VERSION: - return False - - return True - def _resolve_execution_profile_options(self, execution_profile): """ Determine the GraphSON protocol and row factory for a graph query. This is useful @@ -3053,14 +3036,6 @@ def _create_response_future(self, query, parameters, trace, custom_payload, else: timestamp = None - supports_continuous_paging_state = ( - ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version) - ) - if continuous_paging_options and supports_continuous_paging_state: - continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size) - else: - continuous_paging_state = None - if isinstance(query, SimpleStatement): query_string = query.query_string statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None @@ -3104,7 +3079,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, self, message, query, timeout, metrics=self._metrics, prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory, load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan, - continuous_paging_state=continuous_paging_state, host=host) + continuous_paging_state=None, host=host) def get_execution_profile(self, name): """ @@ -4523,7 +4498,6 @@ class ResponseFuture(object): _timer = None _protocol_handler = ProtocolHandler _spec_execution_plan = NoSpeculativeExecutionPlan() - _continuous_paging_options = None _continuous_paging_session = None _host = None diff --git a/cassandra/connection.py b/cassandra/connection.py index 7cd104ab29..39baeea884 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -445,33 +445,6 @@ class ProtocolError(Exception): class CrcMismatchException(ConnectionException): pass - -class ContinuousPagingState(object): - """ - A class for specifying continuous paging state, only supported starting with DSE_V2. - """ - - num_pages_requested = None - """ - How many pages we have already requested - """ - - num_pages_received = None - """ - How many pages we have already received - """ - - max_queue_size = None - """ - The max queue size chosen by the user via the options - """ - - def __init__(self, max_queue_size): - self.num_pages_requested = max_queue_size # the initial query requests max_queue_size - self.num_pages_received = 0 - self.max_queue_size = max_queue_size - - class ContinuousPagingSession(object): def __init__(self, stream_id, decoder, row_factory, connection, state): self.stream_id = stream_id diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 5fe4ed2be4..d8716f4eeb 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -553,7 +553,6 @@ def __init__(self, query_params, consistency_level, self.paging_state = paging_state self.timestamp = timestamp self.skip_meta = skip_meta - self.continuous_paging_options = continuous_paging_options self.keyspace = keyspace def _write_query_params(self, f, protocol_version): @@ -574,14 +573,6 @@ def _write_query_params(self, f, protocol_version): if self.timestamp is not None: flags |= _PROTOCOL_TIMESTAMP_FLAG - if self.continuous_paging_options: - if ProtocolVersion.has_continuous_paging_support(protocol_version): - flags |= _PAGING_OPTIONS_FLAG - else: - raise UnsupportedOperation( - "Continuous paging may only be used with protocol version " - "ProtocolVersion.DSE_V1 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V1.") - if self.keyspace is not None: if ProtocolVersion.uses_keyspace_flag(protocol_version): flags |= _WITH_KEYSPACE_FLAG @@ -609,14 +600,10 @@ def _write_query_params(self, f, protocol_version): write_long(f, self.timestamp) if self.keyspace is not None: write_string(f, self.keyspace) - if self.continuous_paging_options: - self._write_paging_options(f, self.continuous_paging_options, protocol_version) def _write_paging_options(self, f, paging_options, protocol_version): write_int(f, paging_options.max_pages) write_int(f, paging_options.max_pages_per_second) - if ProtocolVersion.has_continuous_paging_next_pages(protocol_version): - write_int(f, paging_options.max_queue_size) class QueryMessage(_QueryMessage): @@ -1050,12 +1037,9 @@ def send_body(self, f, protocol_version): if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE: if self.next_pages <= 0: raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0") - elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version): - raise UnsupportedOperation( - "Continuous paging backpressure may only be used with protocol version " - "ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.") else: - write_int(f, self.next_pages) + raise UnsupportedOperation( + "Continuous paging backpressure is not supported.") class _ProtocolHandler(object): diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index e01029a7cd..7a8845750e 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -193,7 +193,7 @@ def get_supported_protocol_versions(): 4.0(C*) -> 6(beta),5,4,3 ` """ if CASSANDRA_VERSION >= Version('4.0-beta5'): - return (3, 4, 5, 6) + return (3, 4, 5) if CASSANDRA_VERSION >= Version('4.0-a'): return (3, 4, 5) elif CASSANDRA_VERSION >= Version('3.10'): @@ -269,7 +269,6 @@ def xfail_scylla_version(filter: Callable[[Version], bool], reason: str, *args, local = local_decorator_creator() notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported') greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported') -protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported') greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required') greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required') diff --git a/tests/integration/simulacron/test_empty_column.py b/tests/integration/simulacron/test_empty_column.py index 589f730f04..2dbf3985ad 100644 --- a/tests/integration/simulacron/test_empty_column.py +++ b/tests/integration/simulacron/test_empty_column.py @@ -29,10 +29,6 @@ from tests.integration.simulacron.utils import PrimeQuery, prime_request -PROTOCOL_VERSION = 4 if PROTOCOL_VERSION in \ - (ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2) else PROTOCOL_VERSION - - @requiressimulacron class EmptyColumnTests(SimulacronCluster): """ diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index c83f454a0f..35ad5312f3 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -42,7 +42,7 @@ from tests import notwindows, notasyncio from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ - get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \ + get_unsupported_upper_protocol, local, CASSANDRA_IP, greaterthanorequalcass30, \ lessthanorequalcass40, TestCluster, PROTOCOL_VERSION, xfail_scylla, incorrect_test from tests.integration.util import assert_quiescent_pool_state from tests.util import assertListEqual @@ -1478,47 +1478,6 @@ def test_prepare_on_ignored_hosts(self): cluster.shutdown() -@protocolv6 -class BetaProtocolTest(unittest.TestCase): - - @protocolv6 - def test_invalid_protocol_version_beta_option(self): - """ - Test cluster connection with protocol v6 and beta flag not set - - @since 3.7.0 - @jira_ticket PYTHON-614, PYTHON-1232 - @expected_result client shouldn't connect with V6 and no beta flag set - - @test_category connection - """ - - - cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False) - try: - with pytest.raises(NoHostAvailable): - cluster.connect() - except Exception as e: - pytest.fail("Unexpected error encountered {0}".format(e.message)) - - @protocolv6 - def test_valid_protocol_version_beta_options_connect(self): - """ - Test cluster connection with protocol version 5 and beta flag set - - @since 3.7.0 - @jira_ticket PYTHON-614, PYTHON-1232 - @expected_result client should connect with protocol v6 and beta flag set. - - @test_category connection - """ - cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=True) - session = cluster.connect() - assert cluster.protocol_version == cassandra.ProtocolVersion.V6 - assert session.execute("select release_version from system.local").one() - cluster.shutdown() - - class DeprecationWarningTest(unittest.TestCase): def test_deprecation_warnings_legacy_parameters(self): """ diff --git a/tests/integration/standard/test_custom_protocol_handler.py b/tests/integration/standard/test_custom_protocol_handler.py index a9025bba97..239f7e7336 100644 --- a/tests/integration/standard/test_custom_protocol_handler.py +++ b/tests/integration/standard/test_custom_protocol_handler.py @@ -121,37 +121,6 @@ def test_custom_raw_row_results_all_types(self): assert len(CustomResultMessageTracked.checked_rev_row_set) == len(PRIMITIVE_DATATYPES)-1 cluster.shutdown() - @unittest.expectedFailure - @greaterthanorequalcass40 - def test_protocol_divergence_v5_fail_by_continuous_paging(self): - """ - Test to validate that V5 and DSE_V1 diverge. ContinuousPagingOptions is not supported by V5 - - @since DSE 2.0b3 GRAPH 1.0b1 - @jira_ticket PYTHON-694 - @expected_result NoHostAvailable will be risen when the continuous_paging_options parameter is set - - @test_category connection - """ - cluster = TestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True) - session = cluster.connect() - - max_pages = 4 - max_pages_per_second = 3 - continuous_paging_options = ContinuousPagingOptions(max_pages=max_pages, - max_pages_per_second=max_pages_per_second) - - future = self._send_query_message(session, timeout=session.default_timeout, - consistency_level=ConsistencyLevel.ONE, - continuous_paging_options=continuous_paging_options) - - # This should raise NoHostAvailable because continuous paging is not supported under ProtocolVersion.DSE_V1 - with pytest.raises(NoHostAvailable) as context: - future.result() - assert "Continuous paging may only be used with protocol version ProtocolVersion.DSE_V1 or higher" in str(context.value) - - cluster.shutdown() - @greaterthanorequalcass30 def test_protocol_divergence_v4_fail_by_flag_uses_int(self): """ diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index c27d18dccb..7b675fdf7b 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -2541,16 +2541,5 @@ def _assert_group_keys_by_host(self, keys, table_name, stmt): for key in keys: routing_key = prepared_stmt.bind(key).routing_key hosts = self.cluster.metadata.get_replicas(self.ks_name, routing_key) - assert 1 == len(hosts) # RF is 1 for this keyspace - assert key in keys_per_host[hosts[0]] - - -class VirtualKeypaceTest(BasicSharedKeyspaceUnitTestCase): - virtual_ks_names = ('system_virtual_schema', 'system_views') - - def test_existing_keyspaces_have_correct_virtual_tags(self): - for name, ks in self.cluster.metadata.keyspaces.items(): - if name in self.virtual_ks_names: - assert ks.virtual, 'incorrect .virtual value for {}'.format(name) - else: - assert not ks.virtual, 'incorrect .virtual value for {}'.format(name) + self.assertEqual(1, len(hosts)) # RF is 1 for this keyspace + self.assertIn(key, keys_per_host[hosts[0]]) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 180f4b6a8a..383a4de7c8 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -208,10 +208,6 @@ def test_default_serial_consistency_level_legacy(self, *_): class ProtocolVersionTests(unittest.TestCase): def test_protocol_downgrade_test(self): - lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2) - assert ProtocolVersion.DSE_V1 == lower - lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1) - assert ProtocolVersion.V5 == lower lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5) assert ProtocolVersion.V4 == lower lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4) @@ -219,9 +215,6 @@ def test_protocol_downgrade_test(self): lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3) assert 0 == lower - assert ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1) - assert ProtocolVersion.uses_int_query_flags(ProtocolVersion.DSE_V1) - assert not ProtocolVersion.uses_error_code_map(ProtocolVersion.V4) assert not ProtocolVersion.uses_int_query_flags(ProtocolVersion.V4) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index c21f5c7212..97dbe39957 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -33,8 +33,8 @@ class ConnectionTest(unittest.TestCase): - def make_connection(self): - c = Connection(DefaultEndPoint('1.2.3.4')) + def make_connection(self, **kwargs): + c = Connection(DefaultEndPoint('1.2.3.4'), **kwargs) c._socket = Mock() c._socket.send.side_effect = lambda x: len(x) return c @@ -186,7 +186,7 @@ def test_requested_compression_not_available(self, *args): assert isinstance(args[0], ProtocolError) def test_use_requested_compression(self, *args): - c = self.make_connection() + c = self.make_connection(protocol_version=4) c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])} c.defunct = Mock() # request snappy compression diff --git a/tests/unit/test_protocol.py b/tests/unit/test_protocol.py index 57261654df..9704811239 100644 --- a/tests/unit/test_protocol.py +++ b/tests/unit/test_protocol.py @@ -91,43 +91,6 @@ def test_query_message(self): def _check_calls(self, io, expected): assert tuple(c[1] for c in io.write.mock_calls) == tuple(expected) - def test_continuous_paging(self): - """ - Test to check continuous paging throws an Exception if it's not supported and the correct valuesa - are written to the buffer if the option is enabled. - - @since DSE 2.0b3 GRAPH 1.0b1 - @jira_ticket PYTHON-694 - @expected_result the values are correctly written - - @test_category connection - """ - max_pages = 4 - max_pages_per_second = 3 - continuous_paging_options = ContinuousPagingOptions(max_pages=max_pages, - max_pages_per_second=max_pages_per_second) - message = QueryMessage("a", 3, continuous_paging_options=continuous_paging_options) - io = Mock() - for version in [version for version in ProtocolVersion.SUPPORTED_VERSIONS - if not ProtocolVersion.has_continuous_paging_support(version)]: - with pytest.raises(UnsupportedOperation): - message.send_body(io, version) - - io.reset_mock() - message.send_body(io, ProtocolVersion.DSE_V1) - - # continuous paging adds two write calls to the buffer - assert len(io.write.mock_calls) == 6 - # Check that the appropriate flag is set to True - assert uint32_unpack(io.write.mock_calls[3][1][0]) & _WITH_SERIAL_CONSISTENCY_FLAG == 0 - assert uint32_unpack(io.write.mock_calls[3][1][0]) & _PAGE_SIZE_FLAG == 0 - assert uint32_unpack(io.write.mock_calls[3][1][0]) & _WITH_PAGING_STATE_FLAG == 0 - assert uint32_unpack(io.write.mock_calls[3][1][0]) & _PAGING_OPTIONS_FLAG == _PAGING_OPTIONS_FLAG - - # Test max_pages and max_pages_per_second are correctly written - assert uint32_unpack(io.write.mock_calls[4][1][0]) == max_pages - assert uint32_unpack(io.write.mock_calls[5][1][0]) == max_pages_per_second - def test_prepare_flag(self): """ Test to check the prepare flag is properly set, This should only happen for V5 at the moment.