Skip to content

Commit dfbe101

Browse files
mykauldkropachev
authored andcommitted
Remove protocol versions >5 from MAX_SUPPORTED
The main goal of this patch is to have the driver start negotiating protocol versions at 5, and not begin by attempting DSE-specific protocols or protocol version 6, which Scylla doesn't support and cause protocol errors which only wastes time in the beginning of each connection. But then, since the driver won't even try DSE-specific protocols, we don't need to support them and can begin to remove DSE-specific code. This patch only removes some obvious things, but there's still a bunch of DSE-specific code that hasn't been removed yet. Refs: #244 Signed-off-by: Yaniv Kaul <[email protected]>
1 parent 84c854b commit dfbe101

File tree

11 files changed

+21
-172
lines changed

11 files changed

+21
-172
lines changed

cassandra/__init__.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ class ProtocolVersion(object):
170170
DSE private protocol v2, supported in DSE 6.0+
171171
"""
172172

173-
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
173+
SUPPORTED_VERSIONS = (V5, V4, V3)
174174
"""
175-
A tuple of all supported protocol versions
175+
A tuple of all supported protocol versions for ScyllaDB, including future v5 version.
176176
"""
177177

178178
BETA_VERSIONS = (V6,)
@@ -223,14 +223,6 @@ def uses_error_code_map(cls, version):
223223
def uses_keyspace_flag(cls, version):
224224
return version >= cls.V5 and version != cls.DSE_V1
225225

226-
@classmethod
227-
def has_continuous_paging_support(cls, version):
228-
return version >= cls.DSE_V1
229-
230-
@classmethod
231-
def has_continuous_paging_next_pages(cls, version):
232-
return version >= cls.DSE_V2
233-
234226
@classmethod
235227
def has_checksumming_support(cls, version):
236228
return cls.V5 <= version < cls.DSE_V1

cassandra/cluster.py

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5252
ConnectionHeartbeat, ProtocolVersionUnsupported,
5353
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
54-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
SniEndPointFactory, ConnectionBusy)
5555
from cassandra.cqltypes import UserType
5656
import cassandra.cqltypes as types
5757
from cassandra.encoder import Encoder
@@ -672,15 +672,15 @@ class Cluster(object):
672672
server will be automatically used.
673673
"""
674674

675-
protocol_version = ProtocolVersion.DSE_V2
675+
protocol_version = ProtocolVersion.V5
676676
"""
677677
The maximum version of the native protocol to use.
678678
679679
See :class:`.ProtocolVersion` for more information about versions.
680680
681681
If not set in the constructor, the driver will automatically downgrade
682682
version based on a negotiation with the server, but it is most efficient
683-
to set this to the maximum supported by your version of Cassandra.
683+
to set this to the maximum supported by your version of ScyllaDB.
684684
Setting this will also prevent conflicting versions negotiated if your
685685
cluster is upgraded.
686686
@@ -2692,7 +2692,6 @@ def __init__(self, cluster, hosts, keyspace=None):
26922692
raise NoHostAvailable(msg, [h.address for h in hosts])
26932693

26942694
self.session_id = uuid.uuid4()
2695-
self._graph_paging_available = self._check_graph_paging_available()
26962695

26972696
if self.cluster.column_encryption_policy is not None:
26982697
try:
@@ -2889,26 +2888,10 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
28892888
def _maybe_set_graph_paging(self, execution_profile):
28902889
graph_paging = execution_profile.continuous_paging_options
28912890
if execution_profile.continuous_paging_options is _NOT_SET:
2892-
graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None
2891+
graph_paging = None
28932892

28942893
execution_profile.continuous_paging_options = graph_paging
28952894

2896-
def _check_graph_paging_available(self):
2897-
"""Verify if we can enable graph paging. This executed only once when the session is created."""
2898-
2899-
if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version):
2900-
return False
2901-
2902-
for host in self.cluster.metadata.all_hosts():
2903-
if host.dse_version is None:
2904-
return False
2905-
2906-
version = Version(host.dse_version)
2907-
if version < _GRAPH_PAGING_MIN_DSE_VERSION:
2908-
return False
2909-
2910-
return True
2911-
29122895
def _resolve_execution_profile_options(self, execution_profile):
29132896
"""
29142897
Determine the GraphSON protocol and row factory for a graph query. This is useful
@@ -3053,14 +3036,6 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30533036
else:
30543037
timestamp = None
30553038

3056-
supports_continuous_paging_state = (
3057-
ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version)
3058-
)
3059-
if continuous_paging_options and supports_continuous_paging_state:
3060-
continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size)
3061-
else:
3062-
continuous_paging_state = None
3063-
30643039
if isinstance(query, SimpleStatement):
30653040
query_string = query.query_string
30663041
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
@@ -3104,7 +3079,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
31043079
self, message, query, timeout, metrics=self._metrics,
31053080
prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory,
31063081
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
3107-
continuous_paging_state=continuous_paging_state, host=host)
3082+
continuous_paging_state=None, host=host)
31083083

31093084
def get_execution_profile(self, name):
31103085
"""

cassandra/connection.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -445,33 +445,6 @@ class ProtocolError(Exception):
445445
class CrcMismatchException(ConnectionException):
446446
pass
447447

448-
449-
class ContinuousPagingState(object):
450-
"""
451-
A class for specifying continuous paging state, only supported starting with DSE_V2.
452-
"""
453-
454-
num_pages_requested = None
455-
"""
456-
How many pages we have already requested
457-
"""
458-
459-
num_pages_received = None
460-
"""
461-
How many pages we have already received
462-
"""
463-
464-
max_queue_size = None
465-
"""
466-
The max queue size chosen by the user via the options
467-
"""
468-
469-
def __init__(self, max_queue_size):
470-
self.num_pages_requested = max_queue_size # the initial query requests max_queue_size
471-
self.num_pages_received = 0
472-
self.max_queue_size = max_queue_size
473-
474-
475448
class ContinuousPagingSession(object):
476449
def __init__(self, stream_id, decoder, row_factory, connection, state):
477450
self.stream_id = stream_id

cassandra/protocol.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -575,12 +575,8 @@ def _write_query_params(self, f, protocol_version):
575575
flags |= _PROTOCOL_TIMESTAMP_FLAG
576576

577577
if self.continuous_paging_options:
578-
if ProtocolVersion.has_continuous_paging_support(protocol_version):
579-
flags |= _PAGING_OPTIONS_FLAG
580-
else:
581-
raise UnsupportedOperation(
582-
"Continuous paging may only be used with protocol version "
583-
"ProtocolVersion.DSE_V1 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V1.")
578+
raise UnsupportedOperation(
579+
"Continuous paging may only be used with future protocol versions")
584580

585581
if self.keyspace is not None:
586582
if ProtocolVersion.uses_keyspace_flag(protocol_version):
@@ -615,8 +611,6 @@ def _write_query_params(self, f, protocol_version):
615611
def _write_paging_options(self, f, paging_options, protocol_version):
616612
write_int(f, paging_options.max_pages)
617613
write_int(f, paging_options.max_pages_per_second)
618-
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
619-
write_int(f, paging_options.max_queue_size)
620614

621615

622616
class QueryMessage(_QueryMessage):
@@ -1050,12 +1044,9 @@ def send_body(self, f, protocol_version):
10501044
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
10511045
if self.next_pages <= 0:
10521046
raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0")
1053-
elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
1054-
raise UnsupportedOperation(
1055-
"Continuous paging backpressure may only be used with protocol version "
1056-
"ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
10571047
else:
1058-
write_int(f, self.next_pages)
1048+
raise UnsupportedOperation(
1049+
"Continuous paging backpressure is not supported.")
10591050

10601051

10611052
class _ProtocolHandler(object):

tests/integration/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def get_supported_protocol_versions():
193193
4.0(C*) -> 6(beta),5,4,3
194194
` """
195195
if CASSANDRA_VERSION >= Version('4.0-beta5'):
196-
return (3, 4, 5, 6)
196+
return (3, 4, 5)
197197
if CASSANDRA_VERSION >= Version('4.0-a'):
198198
return (3, 4, 5)
199199
elif CASSANDRA_VERSION >= Version('3.10'):
@@ -269,7 +269,6 @@ def xfail_scylla_version(filter: Callable[[Version], bool], reason: str, *args,
269269
local = local_decorator_creator()
270270
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
271271
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
272-
protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported')
273272

274273
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required')
275274
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required')

tests/integration/simulacron/test_empty_column.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@
2929
from tests.integration.simulacron.utils import PrimeQuery, prime_request
3030

3131

32-
PROTOCOL_VERSION = 4 if PROTOCOL_VERSION in \
33-
(ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2) else PROTOCOL_VERSION
34-
35-
3632
@requiressimulacron
3733
class EmptyColumnTests(SimulacronCluster):
3834
"""

tests/integration/standard/test_cluster.py

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from tests import notwindows, notasyncio
4343
from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \
4444
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
45-
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \
45+
get_unsupported_upper_protocol, local, CASSANDRA_IP, greaterthanorequalcass30, \
4646
lessthanorequalcass40, TestCluster, PROTOCOL_VERSION, xfail_scylla, incorrect_test
4747
from tests.integration.util import assert_quiescent_pool_state
4848
from tests.util import assertListEqual
@@ -1478,47 +1478,6 @@ def test_prepare_on_ignored_hosts(self):
14781478
cluster.shutdown()
14791479

14801480

1481-
@protocolv6
1482-
class BetaProtocolTest(unittest.TestCase):
1483-
1484-
@protocolv6
1485-
def test_invalid_protocol_version_beta_option(self):
1486-
"""
1487-
Test cluster connection with protocol v6 and beta flag not set
1488-
1489-
@since 3.7.0
1490-
@jira_ticket PYTHON-614, PYTHON-1232
1491-
@expected_result client shouldn't connect with V6 and no beta flag set
1492-
1493-
@test_category connection
1494-
"""
1495-
1496-
1497-
cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False)
1498-
try:
1499-
with pytest.raises(NoHostAvailable):
1500-
cluster.connect()
1501-
except Exception as e:
1502-
pytest.fail("Unexpected error encountered {0}".format(e.message))
1503-
1504-
@protocolv6
1505-
def test_valid_protocol_version_beta_options_connect(self):
1506-
"""
1507-
Test cluster connection with protocol version 5 and beta flag set
1508-
1509-
@since 3.7.0
1510-
@jira_ticket PYTHON-614, PYTHON-1232
1511-
@expected_result client should connect with protocol v6 and beta flag set.
1512-
1513-
@test_category connection
1514-
"""
1515-
cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=True)
1516-
session = cluster.connect()
1517-
assert cluster.protocol_version == cassandra.ProtocolVersion.V6
1518-
assert session.execute("select release_version from system.local").one()
1519-
cluster.shutdown()
1520-
1521-
15221481
class DeprecationWarningTest(unittest.TestCase):
15231482
def test_deprecation_warnings_legacy_parameters(self):
15241483
"""

tests/integration/standard/test_metadata.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,16 +2541,5 @@ def _assert_group_keys_by_host(self, keys, table_name, stmt):
25412541
for key in keys:
25422542
routing_key = prepared_stmt.bind(key).routing_key
25432543
hosts = self.cluster.metadata.get_replicas(self.ks_name, routing_key)
2544-
assert 1 == len(hosts) # RF is 1 for this keyspace
2545-
assert key in keys_per_host[hosts[0]]
2546-
2547-
2548-
class VirtualKeypaceTest(BasicSharedKeyspaceUnitTestCase):
2549-
virtual_ks_names = ('system_virtual_schema', 'system_views')
2550-
2551-
def test_existing_keyspaces_have_correct_virtual_tags(self):
2552-
for name, ks in self.cluster.metadata.keyspaces.items():
2553-
if name in self.virtual_ks_names:
2554-
assert ks.virtual, 'incorrect .virtual value for {}'.format(name)
2555-
else:
2556-
assert not ks.virtual, 'incorrect .virtual value for {}'.format(name)
2544+
self.assertEqual(1, len(hosts)) # RF is 1 for this keyspace
2545+
self.assertIn(key, keys_per_host[hosts[0]])

tests/unit/test_cluster.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,20 +208,13 @@ def test_default_serial_consistency_level_legacy(self, *_):
208208
class ProtocolVersionTests(unittest.TestCase):
209209

210210
def test_protocol_downgrade_test(self):
211-
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2)
212-
assert ProtocolVersion.DSE_V1 == lower
213-
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1)
214-
assert ProtocolVersion.V5 == lower
215211
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5)
216212
assert ProtocolVersion.V4 == lower
217213
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4)
218214
assert ProtocolVersion.V3 == lower
219215
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3)
220216
assert 0 == lower
221217

222-
assert ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1)
223-
assert ProtocolVersion.uses_int_query_flags(ProtocolVersion.DSE_V1)
224-
225218
assert not ProtocolVersion.uses_error_code_map(ProtocolVersion.V4)
226219
assert not ProtocolVersion.uses_int_query_flags(ProtocolVersion.V4)
227220

tests/unit/test_connection.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,14 @@ def test_use_requested_compression(self, *args):
189189
c = self.make_connection()
190190
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
191191
c.defunct = Mock()
192-
# request snappy compression
193-
c.compression = "snappy"
192+
# request LZ4 compression
193+
c.compression = "lz4"
194194

195195
locally_supported_compressions.pop('lz4', None)
196196
locally_supported_compressions.pop('snappy', None)
197197
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
198198
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
199199

200-
# the server only supports snappy
201200
options_buf = BytesIO()
202201
write_stringmultimap(options_buf, {
203202
'CQL_VERSION': ['3.0.3'],
@@ -207,7 +206,8 @@ def test_use_requested_compression(self, *args):
207206

208207
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
209208

210-
assert c.decompressor == locally_supported_compressions['snappy'][1]
209+
210+
assert c.decompressor == locally_supported_compressions['lz4'][1]
211211

212212
def test_disable_compression(self, *args):
213213
c = self.make_connection()

0 commit comments

Comments
 (0)