Skip to content

Commit 160faa7

Browse files
authored
Merge pull request datastax#939 from datastax/python-941
PYTHON-941 Improve error for batch WriteTimeouts
2 parents 710b6f7 + 3a881b0 commit 160faa7

File tree

7 files changed

+175
-73
lines changed

7 files changed

+175
-73
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Other
1717
* Warn users when using the deprecated Session.default_consistency_level (PYTHON-953)
1818
* Add DSE smoke test to OSS driver tests (PYTHON-894)
1919
* Document long compilation times and workarounds (PYTHON-868)
20+
* Improve error for batch WriteTimeouts (PYTHON-941)
2021

2122
3.13.0
2223
======

cassandra/__init__.py

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import six
1617

1718

1819
class NullHandler(logging.Handler):
@@ -213,6 +214,75 @@ def uses_keyspace_flag(cls, version):
213214
return version >= cls.V5
214215

215216

217+
class WriteType(object):
218+
"""
219+
For usage with :class:`.RetryPolicy`, this describe a type
220+
of write operation.
221+
"""
222+
223+
SIMPLE = 0
224+
"""
225+
A write to a single partition key. Such writes are guaranteed to be atomic
226+
and isolated.
227+
"""
228+
229+
BATCH = 1
230+
"""
231+
A write to multiple partition keys that used the distributed batch log to
232+
ensure atomicity.
233+
"""
234+
235+
UNLOGGED_BATCH = 2
236+
"""
237+
A write to multiple partition keys that did not use the distributed batch
238+
log. Atomicity for such writes is not guaranteed.
239+
"""
240+
241+
COUNTER = 3
242+
"""
243+
A counter write (for one or multiple partition keys). Such writes should
244+
not be replayed in order to avoid overcount.
245+
"""
246+
247+
BATCH_LOG = 4
248+
"""
249+
The initial write to the distributed batch log that Cassandra performs
250+
internally before a BATCH write.
251+
"""
252+
253+
CAS = 5
254+
"""
255+
A lighweight-transaction write, such as "DELETE ... IF EXISTS".
256+
"""
257+
258+
VIEW = 6
259+
"""
260+
This WriteType is only seen in results for requests that were unable to
261+
complete MV operations.
262+
"""
263+
264+
CDC = 7
265+
"""
266+
This WriteType is only seen in results for requests that were unable to
267+
complete CDC operations.
268+
"""
269+
270+
271+
WriteType.name_to_value = {
272+
'SIMPLE': WriteType.SIMPLE,
273+
'BATCH': WriteType.BATCH,
274+
'UNLOGGED_BATCH': WriteType.UNLOGGED_BATCH,
275+
'COUNTER': WriteType.COUNTER,
276+
'BATCH_LOG': WriteType.BATCH_LOG,
277+
'CAS': WriteType.CAS,
278+
'VIEW': WriteType.VIEW,
279+
'CDC': WriteType.CDC
280+
}
281+
282+
283+
WriteType.value_to_name = {v: k for k, v in six.iteritems(WriteType.name_to_value)}
284+
285+
216286
class SchemaChangeType(object):
217287
DROPPED = 'DROPPED'
218288
CREATED = 'CREATED'
@@ -339,14 +409,21 @@ class Timeout(RequestExecutionException):
339409
the operation
340410
"""
341411

342-
def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None):
412+
def __init__(self, summary_message, consistency=None, required_responses=None,
413+
received_responses=None, **kwargs):
343414
self.consistency = consistency
344415
self.required_responses = required_responses
345416
self.received_responses = received_responses
346-
Exception.__init__(self, summary_message + ' info=' +
347-
repr({'consistency': consistency_value_to_name(consistency),
348-
'required_responses': required_responses,
349-
'received_responses': received_responses}))
417+
418+
if "write_type" in kwargs:
419+
kwargs["write_type"] = WriteType.value_to_name[kwargs["write_type"]]
420+
421+
info = {'consistency': consistency_value_to_name(consistency),
422+
'required_responses': required_responses,
423+
'received_responses': received_responses}
424+
info.update(kwargs)
425+
426+
Exception.__init__(self, summary_message + ' info=' + repr(info))
350427

351428

352429
class ReadTimeout(Timeout):
@@ -387,6 +464,7 @@ class WriteTimeout(Timeout):
387464
"""
388465

389466
def __init__(self, message, write_type=None, **kwargs):
467+
kwargs["write_type"] = write_type
390468
Timeout.__init__(self, message, **kwargs)
391469
self.write_type = write_type
392470

cassandra/policies.py

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
from random import randint, shuffle
1818
from threading import Lock
1919
import socket
20+
from cassandra import WriteType as WT
21+
22+
23+
# This is done this way because WriteType was originally
24+
# defined here and in order not to break the API.
25+
# It may removed in the next mayor.
26+
WriteType = WT
27+
2028

2129
from cassandra import ConsistencyLevel, OperationTimedOut
2230

@@ -690,72 +698,6 @@ def new_schedule(self):
690698
i += 1
691699

692700

693-
class WriteType(object):
694-
"""
695-
For usage with :class:`.RetryPolicy`, this describe a type
696-
of write operation.
697-
"""
698-
699-
SIMPLE = 0
700-
"""
701-
A write to a single partition key. Such writes are guaranteed to be atomic
702-
and isolated.
703-
"""
704-
705-
BATCH = 1
706-
"""
707-
A write to multiple partition keys that used the distributed batch log to
708-
ensure atomicity.
709-
"""
710-
711-
UNLOGGED_BATCH = 2
712-
"""
713-
A write to multiple partition keys that did not use the distributed batch
714-
log. Atomicity for such writes is not guaranteed.
715-
"""
716-
717-
COUNTER = 3
718-
"""
719-
A counter write (for one or multiple partition keys). Such writes should
720-
not be replayed in order to avoid overcount.
721-
"""
722-
723-
BATCH_LOG = 4
724-
"""
725-
The initial write to the distributed batch log that Cassandra performs
726-
internally before a BATCH write.
727-
"""
728-
729-
CAS = 5
730-
"""
731-
A lighweight-transaction write, such as "DELETE ... IF EXISTS".
732-
"""
733-
734-
VIEW = 6
735-
"""
736-
This WriteType is only seen in results for requests that were unable to
737-
complete MV operations.
738-
"""
739-
740-
CDC = 7
741-
"""
742-
This WriteType is only seen in results for requests that were unable to
743-
complete CDC operations.
744-
"""
745-
746-
747-
WriteType.name_to_value = {
748-
'SIMPLE': WriteType.SIMPLE,
749-
'BATCH': WriteType.BATCH,
750-
'UNLOGGED_BATCH': WriteType.UNLOGGED_BATCH,
751-
'COUNTER': WriteType.COUNTER,
752-
'BATCH_LOG': WriteType.BATCH_LOG,
753-
'CAS': WriteType.CAS,
754-
'VIEW': WriteType.VIEW,
755-
'CDC': WriteType.CDC
756-
}
757-
758-
759701
class RetryPolicy(object):
760702
"""
761703
A policy that describes whether to retry, rethrow, or ignore coordinator

cassandra/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
UTF8Type, VarcharType, UUIDType, UserType,
4141
TupleType, lookup_casstype, SimpleDateType,
4242
TimeType, ByteType, ShortType, DurationType)
43-
from cassandra.policies import WriteType
43+
from cassandra import WriteType
4444
from cassandra.cython_deps import HAVE_CYTHON, HAVE_NUMPY
4545
from cassandra import util
4646

tests/integration/simulacron/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
import unittest # noqa
1818

1919
from tests.integration.simulacron.utils import stop_simulacron, clear_queries
20+
from tests.integration import PROTOCOL_VERSION, SIMULACRON_JAR, CASSANDRA_VERSION
21+
from tests.integration.simulacron.utils import start_and_prime_singledc
2022

23+
from cassandra.cluster import Cluster
24+
25+
from packaging.version import Version
2126

2227
def teardown_package():
2328
stop_simulacron()
@@ -27,3 +32,23 @@ class SimulacronBase(unittest.TestCase):
2732
def tearDown(self):
2833
clear_queries()
2934
stop_simulacron()
35+
36+
37+
class SimulacronCluster(SimulacronBase):
38+
@classmethod
39+
def setUpClass(cls):
40+
if SIMULACRON_JAR is None or CASSANDRA_VERSION < Version("2.1"):
41+
return
42+
43+
start_and_prime_singledc()
44+
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
45+
cls.session = cls.cluster.connect(wait_for_all_pools=True)
46+
47+
@classmethod
48+
def tearDownClass(cls):
49+
if SIMULACRON_JAR is None or CASSANDRA_VERSION < Version("2.1"):
50+
return
51+
52+
cls.cluster.shutdown()
53+
stop_simulacron()
54+
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
try:
15+
import unittest2 as unittest
16+
except ImportError:
17+
import unittest # noqa
18+
19+
from tests.integration.simulacron import SimulacronCluster
20+
from tests.integration import requiressimulacron
21+
from tests.integration.simulacron.utils import prime_query
22+
23+
from cassandra import WriteTimeout, WriteType, ConsistencyLevel
24+
25+
@requiressimulacron
26+
class ClusterTests(SimulacronCluster):
27+
def test_writetimeout(self):
28+
write_type = "UNLOGGED_BATCH"
29+
consistency = "LOCAL_QUORUM"
30+
received_responses = 1
31+
required_responses = 4
32+
33+
query_to_prime_simple = "SELECT * from simulacron_keyspace.simple"
34+
then = {
35+
"result": "write_timeout",
36+
"delay_in_ms": 0,
37+
"consistency_level": consistency,
38+
"received": received_responses,
39+
"block_for": required_responses,
40+
"write_type": write_type,
41+
"ignore_on_prepare": True
42+
}
43+
prime_query(query_to_prime_simple, then=then, rows=None, column_types=None)
44+
45+
with self.assertRaises(WriteTimeout) as assert_raised_context:
46+
self.session.execute(query_to_prime_simple)
47+
wt = assert_raised_context.exception
48+
self.assertEqual(wt.write_type, WriteType.name_to_value[write_type])
49+
self.assertEqual(wt.consistency, ConsistencyLevel.name_to_value[consistency])
50+
self.assertEqual(wt.received_responses, received_responses)
51+
self.assertEqual(wt.required_responses, required_responses)
52+
self.assertIn(write_type, str(wt))
53+
self.assertIn(consistency, str(wt))
54+
self.assertIn(str(received_responses), str(wt))
55+
self.assertIn(str(required_responses), str(wt))

tests/integration/simulacron/test_policies.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
from itertools import count
3030
from packaging.version import Version
3131

32-
class BadRoundRobinPolicy(RoundRobinPolicy):
3332

33+
class BadRoundRobinPolicy(RoundRobinPolicy):
3434
def make_query_plan(self, working_keyspace=None, query=None):
3535
pos = self._position
3636
self._position += 1
@@ -293,6 +293,7 @@ def test_retry_policy_ignores_and_rethrows(self):
293293

294294
with self.assertRaises(WriteTimeout):
295295
self.session.execute(query_to_prime_simple)
296+
296297
#CDC should be ignored
297298
self.session.execute(query_to_prime_cdc)
298299

0 commit comments

Comments
 (0)