Skip to content

Commit ececd69

Browse files
committed
Remove protocol versions >5 from MAX_SUPPORTED
Without removing associated code yet, this patch just ensure we begin negotiating protocol version 5, then fallback to 4, reducing the amount of protocol errors that the Scylla server sends when it sees the unsupported versions. Refs: #244 Signed-off-by: Yaniv Kaul <[email protected]>
1 parent 52b7d25 commit ececd69

20 files changed

+27
-943
lines changed

cassandra/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ class ProtocolVersion(object):
170170
DSE private protocol v2, supported in DSE 6.0+
171171
"""
172172

173-
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
173+
SUPPORTED_VERSIONS = (V5, V4, V3)
174174
"""
175-
A tuple of all supported protocol versions
175+
A tuple of all supported protocol versions for ScyllaDB, including future v5 support.
176176
"""
177177

178178
BETA_VERSIONS = (V6,)
@@ -227,10 +227,6 @@ def uses_keyspace_flag(cls, version):
227227
def has_continuous_paging_support(cls, version):
228228
return version >= cls.DSE_V1
229229

230-
@classmethod
231-
def has_continuous_paging_next_pages(cls, version):
232-
return version >= cls.DSE_V2
233-
234230
@classmethod
235231
def has_checksumming_support(cls, version):
236232
return cls.V5 <= version < cls.DSE_V1

cassandra/cluster.py

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5252
ConnectionHeartbeat, ProtocolVersionUnsupported,
5353
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
54-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
SniEndPointFactory, ConnectionBusy)
5555
from cassandra.cqltypes import UserType
5656
import cassandra.cqltypes as types
5757
from cassandra.encoder import Encoder
@@ -672,7 +672,7 @@ class Cluster(object):
672672
server will be automatically used.
673673
"""
674674

675-
protocol_version = ProtocolVersion.DSE_V2
675+
protocol_version = ProtocolVersion.V5
676676
"""
677677
The maximum version of the native protocol to use.
678678
@@ -2692,7 +2692,6 @@ def __init__(self, cluster, hosts, keyspace=None):
26922692
raise NoHostAvailable(msg, [h.address for h in hosts])
26932693

26942694
self.session_id = uuid.uuid4()
2695-
self._graph_paging_available = self._check_graph_paging_available()
26962695

26972696
if self.cluster.column_encryption_policy is not None:
26982697
try:
@@ -2889,26 +2888,10 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
28892888
def _maybe_set_graph_paging(self, execution_profile):
28902889
graph_paging = execution_profile.continuous_paging_options
28912890
if execution_profile.continuous_paging_options is _NOT_SET:
2892-
graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None
2891+
graph_paging = None
28932892

28942893
execution_profile.continuous_paging_options = graph_paging
28952894

2896-
def _check_graph_paging_available(self):
2897-
"""Verify if we can enable graph paging. This executed only once when the session is created."""
2898-
2899-
if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version):
2900-
return False
2901-
2902-
for host in self.cluster.metadata.all_hosts():
2903-
if host.dse_version is None:
2904-
return False
2905-
2906-
version = Version(host.dse_version)
2907-
if version < _GRAPH_PAGING_MIN_DSE_VERSION:
2908-
return False
2909-
2910-
return True
2911-
29122895
def _resolve_execution_profile_options(self, execution_profile):
29132896
"""
29142897
Determine the GraphSON protocol and row factory for a graph query. This is useful
@@ -3053,13 +3036,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30533036
else:
30543037
timestamp = None
30553038

3056-
supports_continuous_paging_state = (
3057-
ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version)
3058-
)
3059-
if continuous_paging_options and supports_continuous_paging_state:
3060-
continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size)
3061-
else:
3062-
continuous_paging_state = None
3039+
continuous_paging_state = None
30633040

30643041
if isinstance(query, SimpleStatement):
30653042
query_string = query.query_string

cassandra/connection.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -444,32 +444,6 @@ class CrcMismatchException(ConnectionException):
444444
pass
445445

446446

447-
class ContinuousPagingState(object):
448-
"""
449-
A class for specifying continuous paging state, only supported starting with DSE_V2.
450-
"""
451-
452-
num_pages_requested = None
453-
"""
454-
How many pages we have already requested
455-
"""
456-
457-
num_pages_received = None
458-
"""
459-
How many pages we have already received
460-
"""
461-
462-
max_queue_size = None
463-
"""
464-
The max queue size chosen by the user via the options
465-
"""
466-
467-
def __init__(self, max_queue_size):
468-
self.num_pages_requested = max_queue_size # the initial query requests max_queue_size
469-
self.num_pages_received = 0
470-
self.max_queue_size = max_queue_size
471-
472-
473447
class ContinuousPagingSession(object):
474448
def __init__(self, stream_id, decoder, row_factory, connection, state):
475449
self.stream_id = stream_id

cassandra/protocol.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,6 @@ def _write_query_params(self, f, protocol_version):
615615
def _write_paging_options(self, f, paging_options, protocol_version):
616616
write_int(f, paging_options.max_pages)
617617
write_int(f, paging_options.max_pages_per_second)
618-
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
619-
write_int(f, paging_options.max_queue_size)
620618

621619

622620
class QueryMessage(_QueryMessage):
@@ -1050,12 +1048,10 @@ def send_body(self, f, protocol_version):
10501048
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
10511049
if self.next_pages <= 0:
10521050
raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0")
1053-
elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
1051+
else:
10541052
raise UnsupportedOperation(
10551053
"Continuous paging backpressure may only be used with protocol version "
10561054
"ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
1057-
else:
1058-
write_int(f, self.next_pages)
10591055

10601056

10611057
class _ProtocolHandler(object):

tests/integration/__init__.py

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,9 @@ def _get_dse_version_from_cass(cass_version):
228228

229229
def get_default_protocol():
230230
if CASSANDRA_VERSION >= Version('4.0-a'):
231-
if DSE_VERSION:
232-
return ProtocolVersion.DSE_V2
233-
else:
234-
return ProtocolVersion.V5
231+
return ProtocolVersion.V5
235232
if CASSANDRA_VERSION >= Version('3.10'):
236-
if DSE_VERSION:
237-
return ProtocolVersion.DSE_V1
238-
else:
239-
return 4
233+
return 4
240234
if CASSANDRA_VERSION >= Version('2.2'):
241235
return 4
242236
elif CASSANDRA_VERSION >= Version('2.1'):
@@ -268,17 +262,11 @@ def get_supported_protocol_versions():
268262
` """
269263
if CASSANDRA_VERSION >= Version('4.0-beta5'):
270264
if not DSE_VERSION:
271-
return (3, 4, 5, 6)
272-
if CASSANDRA_VERSION >= Version('4.0-a'):
273-
if DSE_VERSION:
274-
return (3, 4, ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2)
275-
else:
276265
return (3, 4, 5)
266+
if CASSANDRA_VERSION >= Version('4.0-a'):
267+
return (3, 4, 5)
277268
elif CASSANDRA_VERSION >= Version('3.10'):
278-
if DSE_VERSION:
279-
return (3, 4, ProtocolVersion.DSE_V1)
280-
else:
281-
return (3, 4)
269+
return (3, 4)
282270
elif CASSANDRA_VERSION >= Version('3.0'):
283271
return (3, 4)
284272
elif CASSANDRA_VERSION >= Version('2.2'):
@@ -316,10 +304,7 @@ def get_unsupported_upper_protocol():
316304
else:
317305
return ProtocolVersion.DSE_V1
318306
if CASSANDRA_VERSION >= Version('3.10'):
319-
if DSE_VERSION:
320-
return ProtocolVersion.DSE_V2
321-
else:
322-
return 5
307+
return 5
323308
if CASSANDRA_VERSION >= Version('2.2'):
324309
return 5
325310
elif CASSANDRA_VERSION >= Version('2.1'):
@@ -349,7 +334,6 @@ def _id_and_mark(f):
349334
local = local_decorator_creator()
350335
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
351336
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
352-
protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported')
353337

354338
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required')
355339
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required')
@@ -364,9 +348,6 @@ def _id_and_mark(f):
364348
lessthancass40 = unittest.skipUnless(CASSANDRA_VERSION < Version('4.0'), 'Cassandra version less than 4.0 required')
365349
lessthancass30 = unittest.skipUnless(CASSANDRA_VERSION < Version('3.0'), 'Cassandra version less then 3.0 required')
366350

367-
greaterthanorequaldse68 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('6.8'), "DSE 6.8 or greater required for this test")
368-
greaterthanorequaldse67 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('6.7'), "DSE 6.7 or greater required for this test")
369-
greaterthanorequaldse60 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('6.0'), "DSE 6.0 or greater required for this test")
370351
greaterthanorequaldse51 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('5.1'), "DSE 5.1 or greater required for this test")
371352
greaterthanorequaldse50 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('5.0'), "DSE 5.0 or greater required for this test")
372353
lessthandse51 = unittest.skipUnless(DSE_VERSION and DSE_VERSION < Version('5.1'), "DSE version less than 5.1 required")

0 commit comments

Comments
 (0)