Skip to content

Commit ce73513

Browse files
committed
Remove protocol versions >5 from MAX_SUPPORTED
Without removing associated code yet, this patch just ensure we begin negotiating protocol version 5, then fallback to 4, reducing the amount of protocol errors that the Scylla server sends when it sees the unsupported versions. Refs: #244 Signed-off-by: Yaniv Kaul <[email protected]>
1 parent c7ca1c6 commit ce73513

File tree

11 files changed

+18
-176
lines changed

11 files changed

+18
-176
lines changed

cassandra/__init__.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ class ProtocolVersion(object):
170170
DSE private protocol v2, supported in DSE 6.0+
171171
"""
172172

173-
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
173+
SUPPORTED_VERSIONS = (V5, V4, V3)
174174
"""
175-
A tuple of all supported protocol versions
175+
A tuple of all supported protocol versions for ScyllaDB, including future v5 version.
176176
"""
177177

178178
BETA_VERSIONS = (V6,)
@@ -223,14 +223,6 @@ def uses_error_code_map(cls, version):
223223
def uses_keyspace_flag(cls, version):
224224
return version >= cls.V5 and version != cls.DSE_V1
225225

226-
@classmethod
227-
def has_continuous_paging_support(cls, version):
228-
return version >= cls.DSE_V1
229-
230-
@classmethod
231-
def has_continuous_paging_next_pages(cls, version):
232-
return version >= cls.DSE_V2
233-
234226
@classmethod
235227
def has_checksumming_support(cls, version):
236228
return cls.V5 <= version < cls.DSE_V1

cassandra/cluster.py

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5252
ConnectionHeartbeat, ProtocolVersionUnsupported,
5353
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
54-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
SniEndPointFactory, ConnectionBusy)
5555
from cassandra.cqltypes import UserType
5656
import cassandra.cqltypes as types
5757
from cassandra.encoder import Encoder
@@ -672,15 +672,15 @@ class Cluster(object):
672672
server will be automatically used.
673673
"""
674674

675-
protocol_version = ProtocolVersion.DSE_V2
675+
protocol_version = ProtocolVersion.V5
676676
"""
677677
The maximum version of the native protocol to use.
678678
679679
See :class:`.ProtocolVersion` for more information about versions.
680680
681681
If not set in the constructor, the driver will automatically downgrade
682682
version based on a negotiation with the server, but it is most efficient
683-
to set this to the maximum supported by your version of Cassandra.
683+
to set this to the maximum supported by your version of ScyllaDB.
684684
Setting this will also prevent conflicting versions negotiated if your
685685
cluster is upgraded.
686686
@@ -2692,7 +2692,6 @@ def __init__(self, cluster, hosts, keyspace=None):
26922692
raise NoHostAvailable(msg, [h.address for h in hosts])
26932693

26942694
self.session_id = uuid.uuid4()
2695-
self._graph_paging_available = self._check_graph_paging_available()
26962695

26972696
if self.cluster.column_encryption_policy is not None:
26982697
try:
@@ -2889,26 +2888,10 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
28892888
def _maybe_set_graph_paging(self, execution_profile):
28902889
graph_paging = execution_profile.continuous_paging_options
28912890
if execution_profile.continuous_paging_options is _NOT_SET:
2892-
graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None
2891+
graph_paging = None
28932892

28942893
execution_profile.continuous_paging_options = graph_paging
28952894

2896-
def _check_graph_paging_available(self):
2897-
"""Verify if we can enable graph paging. This executed only once when the session is created."""
2898-
2899-
if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version):
2900-
return False
2901-
2902-
for host in self.cluster.metadata.all_hosts():
2903-
if host.dse_version is None:
2904-
return False
2905-
2906-
version = Version(host.dse_version)
2907-
if version < _GRAPH_PAGING_MIN_DSE_VERSION:
2908-
return False
2909-
2910-
return True
2911-
29122895
def _resolve_execution_profile_options(self, execution_profile):
29132896
"""
29142897
Determine the GraphSON protocol and row factory for a graph query. This is useful
@@ -3053,14 +3036,6 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30533036
else:
30543037
timestamp = None
30553038

3056-
supports_continuous_paging_state = (
3057-
ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version)
3058-
)
3059-
if continuous_paging_options and supports_continuous_paging_state:
3060-
continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size)
3061-
else:
3062-
continuous_paging_state = None
3063-
30643039
if isinstance(query, SimpleStatement):
30653040
query_string = query.query_string
30663041
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
@@ -3104,7 +3079,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
31043079
self, message, query, timeout, metrics=self._metrics,
31053080
prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory,
31063081
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
3107-
continuous_paging_state=continuous_paging_state, host=host)
3082+
continuous_paging_state=None, host=host)
31083083

31093084
def get_execution_profile(self, name):
31103085
"""

cassandra/connection.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -443,33 +443,6 @@ class ProtocolError(Exception):
443443
class CrcMismatchException(ConnectionException):
444444
pass
445445

446-
447-
class ContinuousPagingState(object):
448-
"""
449-
A class for specifying continuous paging state, only supported starting with DSE_V2.
450-
"""
451-
452-
num_pages_requested = None
453-
"""
454-
How many pages we have already requested
455-
"""
456-
457-
num_pages_received = None
458-
"""
459-
How many pages we have already received
460-
"""
461-
462-
max_queue_size = None
463-
"""
464-
The max queue size chosen by the user via the options
465-
"""
466-
467-
def __init__(self, max_queue_size):
468-
self.num_pages_requested = max_queue_size # the initial query requests max_queue_size
469-
self.num_pages_received = 0
470-
self.max_queue_size = max_queue_size
471-
472-
473446
class ContinuousPagingSession(object):
474447
def __init__(self, stream_id, decoder, row_factory, connection, state):
475448
self.stream_id = stream_id

cassandra/protocol.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -575,12 +575,8 @@ def _write_query_params(self, f, protocol_version):
575575
flags |= _PROTOCOL_TIMESTAMP_FLAG
576576

577577
if self.continuous_paging_options:
578-
if ProtocolVersion.has_continuous_paging_support(protocol_version):
579-
flags |= _PAGING_OPTIONS_FLAG
580-
else:
581-
raise UnsupportedOperation(
582-
"Continuous paging may only be used with protocol version "
583-
"ProtocolVersion.DSE_V1 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V1.")
578+
raise UnsupportedOperation(
579+
"Continuous paging may only be used with future protocol versions")
584580

585581
if self.keyspace is not None:
586582
if ProtocolVersion.uses_keyspace_flag(protocol_version):
@@ -615,8 +611,6 @@ def _write_query_params(self, f, protocol_version):
615611
def _write_paging_options(self, f, paging_options, protocol_version):
616612
write_int(f, paging_options.max_pages)
617613
write_int(f, paging_options.max_pages_per_second)
618-
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
619-
write_int(f, paging_options.max_queue_size)
620614

621615

622616
class QueryMessage(_QueryMessage):
@@ -1050,12 +1044,9 @@ def send_body(self, f, protocol_version):
10501044
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
10511045
if self.next_pages <= 0:
10521046
raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0")
1053-
elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
1054-
raise UnsupportedOperation(
1055-
"Continuous paging backpressure may only be used with protocol version "
1056-
"ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
10571047
else:
1058-
write_int(f, self.next_pages)
1048+
raise UnsupportedOperation(
1049+
"Continuous paging backpressure is not supported.")
10591050

10601051

10611052
class _ProtocolHandler(object):

tests/integration/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def get_supported_protocol_versions():
192192
4.0(C*) -> 6(beta),5,4,3
193193
` """
194194
if CASSANDRA_VERSION >= Version('4.0-beta5'):
195-
return (3, 4, 5, 6)
195+
return (3, 4, 5)
196196
if CASSANDRA_VERSION >= Version('4.0-a'):
197197
return (3, 4, 5)
198198
elif CASSANDRA_VERSION >= Version('3.10'):
@@ -261,7 +261,6 @@ def _id_and_mark(f):
261261
local = local_decorator_creator()
262262
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
263263
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
264-
protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported')
265264

266265
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required')
267266
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required')

tests/integration/simulacron/test_empty_column.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@
2929
from tests.integration.simulacron.utils import PrimeQuery, prime_request
3030

3131

32-
PROTOCOL_VERSION = 4 if PROTOCOL_VERSION in \
33-
(ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2) else PROTOCOL_VERSION
34-
35-
3632
@requiressimulacron
3733
class EmptyColumnTests(SimulacronCluster):
3834
"""

tests/integration/standard/test_cluster.py

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from tests import notwindows, notasyncio
4242
from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \
4343
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
44-
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \
44+
get_unsupported_upper_protocol, local, CASSANDRA_IP, greaterthanorequalcass30, \
4545
lessthanorequalcass40, TestCluster, PROTOCOL_VERSION, xfail_scylla, incorrect_test
4646
from tests.integration.util import assert_quiescent_pool_state
4747
import sys
@@ -1479,47 +1479,6 @@ def test_prepare_on_ignored_hosts(self):
14791479
cluster.shutdown()
14801480

14811481

1482-
@protocolv6
1483-
class BetaProtocolTest(unittest.TestCase):
1484-
1485-
@protocolv6
1486-
def test_invalid_protocol_version_beta_option(self):
1487-
"""
1488-
Test cluster connection with protocol v6 and beta flag not set
1489-
1490-
@since 3.7.0
1491-
@jira_ticket PYTHON-614, PYTHON-1232
1492-
@expected_result client shouldn't connect with V6 and no beta flag set
1493-
1494-
@test_category connection
1495-
"""
1496-
1497-
1498-
cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False)
1499-
try:
1500-
with self.assertRaises(NoHostAvailable):
1501-
cluster.connect()
1502-
except Exception as e:
1503-
self.fail("Unexpected error encountered {0}".format(e.message))
1504-
1505-
@protocolv6
1506-
def test_valid_protocol_version_beta_options_connect(self):
1507-
"""
1508-
Test cluster connection with protocol version 5 and beta flag set
1509-
1510-
@since 3.7.0
1511-
@jira_ticket PYTHON-614, PYTHON-1232
1512-
@expected_result client should connect with protocol v6 and beta flag set.
1513-
1514-
@test_category connection
1515-
"""
1516-
cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=True)
1517-
session = cluster.connect()
1518-
self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.V6)
1519-
self.assertTrue(session.execute("select release_version from system.local").one())
1520-
cluster.shutdown()
1521-
1522-
15231482
class DeprecationWarningTest(unittest.TestCase):
15241483
def test_deprecation_warnings_legacy_parameters(self):
15251484
"""

tests/integration/standard/test_metadata.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,20 +2556,3 @@ def _assert_group_keys_by_host(self, keys, table_name, stmt):
25562556
hosts = self.cluster.metadata.get_replicas(self.ks_name, routing_key)
25572557
self.assertEqual(1, len(hosts)) # RF is 1 for this keyspace
25582558
self.assertIn(key, keys_per_host[hosts[0]])
2559-
2560-
2561-
class VirtualKeypaceTest(BasicSharedKeyspaceUnitTestCase):
2562-
virtual_ks_names = ('system_virtual_schema', 'system_views')
2563-
2564-
def test_existing_keyspaces_have_correct_virtual_tags(self):
2565-
for name, ks in self.cluster.metadata.keyspaces.items():
2566-
if name in self.virtual_ks_names:
2567-
self.assertTrue(
2568-
ks.virtual,
2569-
'incorrect .virtual value for {}'.format(name)
2570-
)
2571-
else:
2572-
self.assertFalse(
2573-
ks.virtual,
2574-
'incorrect .virtual value for {}'.format(name)
2575-
)

tests/unit/test_cluster.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,20 +207,13 @@ def test_default_serial_consistency_level_legacy(self, *_):
207207
class ProtocolVersionTests(unittest.TestCase):
208208

209209
def test_protocol_downgrade_test(self):
210-
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2)
211-
self.assertEqual(ProtocolVersion.DSE_V1, lower)
212-
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1)
213-
self.assertEqual(ProtocolVersion.V5,lower)
214210
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5)
215211
self.assertEqual(ProtocolVersion.V4,lower)
216212
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4)
217213
self.assertEqual(ProtocolVersion.V3,lower)
218214
lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3)
219215
self.assertEqual(0, lower)
220216

221-
self.assertTrue(ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1))
222-
self.assertTrue(ProtocolVersion.uses_int_query_flags(ProtocolVersion.DSE_V1))
223-
224217
self.assertFalse(ProtocolVersion.uses_error_code_map(ProtocolVersion.V4))
225218
self.assertFalse(ProtocolVersion.uses_int_query_flags(ProtocolVersion.V4))
226219

tests/unit/test_connection.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,14 @@ def test_use_requested_compression(self, *args):
188188
c = self.make_connection()
189189
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
190190
c.defunct = Mock()
191-
# request snappy compression
192-
c.compression = "snappy"
191+
# request LZ4 compression
192+
c.compression = "lz4"
193193

194194
locally_supported_compressions.pop('lz4', None)
195195
locally_supported_compressions.pop('snappy', None)
196196
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
197197
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
198198

199-
# the server only supports snappy
200199
options_buf = BytesIO()
201200
write_stringmultimap(options_buf, {
202201
'CQL_VERSION': ['3.0.3'],
@@ -206,7 +205,7 @@ def test_use_requested_compression(self, *args):
206205

207206
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
208207

209-
self.assertEqual(c.decompressor, locally_supported_compressions['snappy'][1])
208+
self.assertEqual(c.decompressor, locally_supported_compressions['lz4'][1])
210209

211210
def test_disable_compression(self, *args):
212211
c = self.make_connection()

0 commit comments

Comments
 (0)