Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ class ProtocolVersion(object):
DSE private protocol v2, supported in DSE 6.0+
"""

SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
SUPPORTED_VERSIONS = (V5, V4, V3)
"""
A tuple of all supported protocol versions
A tuple of all supported protocol versions for ScyllaDB, including future v5 version.
"""

BETA_VERSIONS = (V6,)
Expand Down Expand Up @@ -223,14 +223,6 @@ def uses_error_code_map(cls, version):
def uses_keyspace_flag(cls, version):
return version >= cls.V5 and version != cls.DSE_V1

@classmethod
def has_continuous_paging_support(cls, version):
return version >= cls.DSE_V1

@classmethod
def has_continuous_paging_next_pages(cls, version):
return version >= cls.DSE_V2

@classmethod
def has_checksumming_support(cls, version):
return cls.V5 <= version < cls.DSE_V1
Expand Down
36 changes: 5 additions & 31 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported,
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
SniEndPointFactory, ConnectionBusy)
from cassandra.cqltypes import UserType
import cassandra.cqltypes as types
from cassandra.encoder import Encoder
Expand Down Expand Up @@ -672,15 +672,15 @@
server will be automatically used.
"""

protocol_version = ProtocolVersion.DSE_V2
protocol_version = ProtocolVersion.V5
"""
The maximum version of the native protocol to use.

See :class:`.ProtocolVersion` for more information about versions.

If not set in the constructor, the driver will automatically downgrade
version based on a negotiation with the server, but it is most efficient
to set this to the maximum supported by your version of Cassandra.
to set this to the maximum supported by your version of ScyllaDB.
Setting this will also prevent conflicting versions negotiated if your
cluster is upgraded.

Expand Down Expand Up @@ -2692,7 +2692,6 @@
raise NoHostAvailable(msg, [h.address for h in hosts])

self.session_id = uuid.uuid4()
self._graph_paging_available = self._check_graph_paging_available()

if self.cluster.column_encryption_policy is not None:
try:
Expand Down Expand Up @@ -2889,26 +2888,10 @@
def _maybe_set_graph_paging(self, execution_profile):
graph_paging = execution_profile.continuous_paging_options
if execution_profile.continuous_paging_options is _NOT_SET:
graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None
graph_paging = None

execution_profile.continuous_paging_options = graph_paging

def _check_graph_paging_available(self):
"""Verify if we can enable graph paging. This executed only once when the session is created."""

if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version):
return False

for host in self.cluster.metadata.all_hosts():
if host.dse_version is None:
return False

version = Version(host.dse_version)
if version < _GRAPH_PAGING_MIN_DSE_VERSION:
return False

return True

def _resolve_execution_profile_options(self, execution_profile):
"""
Determine the GraphSON protocol and row factory for a graph query. This is useful
Expand Down Expand Up @@ -3053,14 +3036,6 @@
else:
timestamp = None

supports_continuous_paging_state = (
ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version)
)
if continuous_paging_options and supports_continuous_paging_state:
continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size)
else:
continuous_paging_state = None

if isinstance(query, SimpleStatement):
query_string = query.query_string
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
Expand Down Expand Up @@ -3104,7 +3079,7 @@
self, message, query, timeout, metrics=self._metrics,
prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory,
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
continuous_paging_state=continuous_paging_state, host=host)
continuous_paging_state=None, host=host)

def get_execution_profile(self, name):
"""
Expand Down Expand Up @@ -4426,7 +4401,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4404 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.9)

cannot schedule new futures after shutdown

Check failure on line 4404 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.9)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down Expand Up @@ -4523,7 +4498,6 @@
_timer = None
_protocol_handler = ProtocolHandler
_spec_execution_plan = NoSpeculativeExecutionPlan()
_continuous_paging_options = None
_continuous_paging_session = None
_host = None

Expand Down
27 changes: 0 additions & 27 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,33 +445,6 @@ class ProtocolError(Exception):
class CrcMismatchException(ConnectionException):
pass


class ContinuousPagingState(object):
"""
A class for specifying continuous paging state, only supported starting with DSE_V2.
"""

num_pages_requested = None
"""
How many pages we have already requested
"""

num_pages_received = None
"""
How many pages we have already received
"""

max_queue_size = None
"""
The max queue size chosen by the user via the options
"""

def __init__(self, max_queue_size):
self.num_pages_requested = max_queue_size # the initial query requests max_queue_size
self.num_pages_received = 0
self.max_queue_size = max_queue_size


class ContinuousPagingSession(object):
def __init__(self, stream_id, decoder, row_factory, connection, state):
self.stream_id = stream_id
Expand Down
20 changes: 2 additions & 18 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ def __init__(self, query_params, consistency_level,
self.paging_state = paging_state
self.timestamp = timestamp
self.skip_meta = skip_meta
self.continuous_paging_options = continuous_paging_options
self.keyspace = keyspace

def _write_query_params(self, f, protocol_version):
Expand All @@ -574,14 +573,6 @@ def _write_query_params(self, f, protocol_version):
if self.timestamp is not None:
flags |= _PROTOCOL_TIMESTAMP_FLAG

if self.continuous_paging_options:
if ProtocolVersion.has_continuous_paging_support(protocol_version):
flags |= _PAGING_OPTIONS_FLAG
else:
raise UnsupportedOperation(
"Continuous paging may only be used with protocol version "
"ProtocolVersion.DSE_V1 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V1.")

if self.keyspace is not None:
if ProtocolVersion.uses_keyspace_flag(protocol_version):
flags |= _WITH_KEYSPACE_FLAG
Expand Down Expand Up @@ -609,14 +600,10 @@ def _write_query_params(self, f, protocol_version):
write_long(f, self.timestamp)
if self.keyspace is not None:
write_string(f, self.keyspace)
if self.continuous_paging_options:
self._write_paging_options(f, self.continuous_paging_options, protocol_version)

def _write_paging_options(self, f, paging_options, protocol_version):
write_int(f, paging_options.max_pages)
write_int(f, paging_options.max_pages_per_second)
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
write_int(f, paging_options.max_queue_size)


class QueryMessage(_QueryMessage):
Expand Down Expand Up @@ -1050,12 +1037,9 @@ def send_body(self, f, protocol_version):
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
if self.next_pages <= 0:
raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0")
elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
raise UnsupportedOperation(
"Continuous paging backpressure may only be used with protocol version "
"ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
else:
write_int(f, self.next_pages)
raise UnsupportedOperation(
"Continuous paging backpressure is not supported.")


class _ProtocolHandler(object):
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def get_supported_protocol_versions():
4.0(C*) -> 6(beta),5,4,3
` """
if CASSANDRA_VERSION >= Version('4.0-beta5'):
return (3, 4, 5, 6)
return (3, 4, 5)
if CASSANDRA_VERSION >= Version('4.0-a'):
return (3, 4, 5)
elif CASSANDRA_VERSION >= Version('3.10'):
Expand Down Expand Up @@ -269,7 +269,6 @@ def xfail_scylla_version(filter: Callable[[Version], bool], reason: str, *args,
local = local_decorator_creator()
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported')

greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required')
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required')
Expand Down
4 changes: 0 additions & 4 deletions tests/integration/simulacron/test_empty_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
from tests.integration.simulacron.utils import PrimeQuery, prime_request


PROTOCOL_VERSION = 4 if PROTOCOL_VERSION in \
(ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2) else PROTOCOL_VERSION


@requiressimulacron
class EmptyColumnTests(SimulacronCluster):
"""
Expand Down
43 changes: 1 addition & 42 deletions tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from tests import notwindows, notasyncio
from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \
get_unsupported_upper_protocol, local, CASSANDRA_IP, greaterthanorequalcass30, \
lessthanorequalcass40, TestCluster, PROTOCOL_VERSION, xfail_scylla, incorrect_test
from tests.integration.util import assert_quiescent_pool_state
from tests.util import assertListEqual
Expand Down Expand Up @@ -1478,47 +1478,6 @@ def test_prepare_on_ignored_hosts(self):
cluster.shutdown()


@protocolv6
class BetaProtocolTest(unittest.TestCase):

@protocolv6
def test_invalid_protocol_version_beta_option(self):
"""
Test cluster connection with protocol v6 and beta flag not set

@since 3.7.0
@jira_ticket PYTHON-614, PYTHON-1232
@expected_result client shouldn't connect with V6 and no beta flag set

@test_category connection
"""


cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False)
try:
with pytest.raises(NoHostAvailable):
cluster.connect()
except Exception as e:
pytest.fail("Unexpected error encountered {0}".format(e.message))

@protocolv6
def test_valid_protocol_version_beta_options_connect(self):
"""
Test cluster connection with protocol version 5 and beta flag set

@since 3.7.0
@jira_ticket PYTHON-614, PYTHON-1232
@expected_result client should connect with protocol v6 and beta flag set.

@test_category connection
"""
cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=True)
session = cluster.connect()
assert cluster.protocol_version == cassandra.ProtocolVersion.V6
assert session.execute("select release_version from system.local").one()
cluster.shutdown()


class DeprecationWarningTest(unittest.TestCase):
def test_deprecation_warnings_legacy_parameters(self):
"""
Expand Down
31 changes: 0 additions & 31 deletions tests/integration/standard/test_custom_protocol_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,37 +121,6 @@ def test_custom_raw_row_results_all_types(self):
assert len(CustomResultMessageTracked.checked_rev_row_set) == len(PRIMITIVE_DATATYPES)-1
cluster.shutdown()

@unittest.expectedFailure
@greaterthanorequalcass40
def test_protocol_divergence_v5_fail_by_continuous_paging(self):
"""
Test to validate that V5 and DSE_V1 diverge. ContinuousPagingOptions is not supported by V5

@since DSE 2.0b3 GRAPH 1.0b1
@jira_ticket PYTHON-694
@expected_result NoHostAvailable will be risen when the continuous_paging_options parameter is set

@test_category connection
"""
cluster = TestCluster(protocol_version=ProtocolVersion.V5, allow_beta_protocol_version=True)
session = cluster.connect()

max_pages = 4
max_pages_per_second = 3
continuous_paging_options = ContinuousPagingOptions(max_pages=max_pages,
max_pages_per_second=max_pages_per_second)

future = self._send_query_message(session, timeout=session.default_timeout,
consistency_level=ConsistencyLevel.ONE,
continuous_paging_options=continuous_paging_options)

# This should raise NoHostAvailable because continuous paging is not supported under ProtocolVersion.DSE_V1
with pytest.raises(NoHostAvailable) as context:
future.result()
assert "Continuous paging may only be used with protocol version ProtocolVersion.DSE_V1 or higher" in str(context.value)

cluster.shutdown()

@greaterthanorequalcass30
def test_protocol_divergence_v4_fail_by_flag_uses_int(self):
"""
Expand Down
15 changes: 2 additions & 13 deletions tests/integration/standard/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2541,16 +2541,5 @@ def _assert_group_keys_by_host(self, keys, table_name, stmt):
for key in keys:
routing_key = prepared_stmt.bind(key).routing_key
hosts = self.cluster.metadata.get_replicas(self.ks_name, routing_key)
assert 1 == len(hosts) # RF is 1 for this keyspace
assert key in keys_per_host[hosts[0]]


class VirtualKeypaceTest(BasicSharedKeyspaceUnitTestCase):
virtual_ks_names = ('system_virtual_schema', 'system_views')

def test_existing_keyspaces_have_correct_virtual_tags(self):
for name, ks in self.cluster.metadata.keyspaces.items():
if name in self.virtual_ks_names:
assert ks.virtual, 'incorrect .virtual value for {}'.format(name)
else:
assert not ks.virtual, 'incorrect .virtual value for {}'.format(name)
self.assertEqual(1, len(hosts)) # RF is 1 for this keyspace
self.assertIn(key, keys_per_host[hosts[0]])
7 changes: 0 additions & 7 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,13 @@ def test_default_serial_consistency_level_legacy(self, *_):
class ProtocolVersionTests(unittest.TestCase):

def test_protocol_downgrade_test(self):
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2)
assert ProtocolVersion.DSE_V1 == lower
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1)
assert ProtocolVersion.V5 == lower
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5)
assert ProtocolVersion.V4 == lower
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4)
assert ProtocolVersion.V3 == lower
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3)
assert 0 == lower

assert ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1)
assert ProtocolVersion.uses_int_query_flags(ProtocolVersion.DSE_V1)

assert not ProtocolVersion.uses_error_code_map(ProtocolVersion.V4)
assert not ProtocolVersion.uses_int_query_flags(ProtocolVersion.V4)

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

class ConnectionTest(unittest.TestCase):

def make_connection(self):
c = Connection(DefaultEndPoint('1.2.3.4'))
def make_connection(self, **kwargs):
c = Connection(DefaultEndPoint('1.2.3.4'), **kwargs)
c._socket = Mock()
c._socket.send.side_effect = lambda x: len(x)
return c
Expand Down Expand Up @@ -186,7 +186,7 @@ def test_requested_compression_not_available(self, *args):
assert isinstance(args[0], ProtocolError)

def test_use_requested_compression(self, *args):
c = self.make_connection()
c = self.make_connection(protocol_version=4)
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
c.defunct = Mock()
# request snappy compression
Expand Down
Loading
Loading