Skip to content

Commit 07bd1d8

Browse files
Lorak-mmkfruch
authored andcommitted
Add a test reproducing 'USE ks' race condition
1 parent 71001c3 commit 07bd1d8

File tree

3 files changed

+73
-2
lines changed

3 files changed

+73
-2
lines changed

.github/workflows/integration-tests-python2.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ jobs:
2121

2222
- name: Test with pytest
2323
run: |
24-
./ci/run_integration_test.sh tests/integration/standard/test_authentication.py tests/integration/standard/test_cluster.py tests/integration/standard/test_concurrent.py tests/integration/standard/test_connection.py tests/integration/standard/test_control_connection.py tests/integration/standard/test_custom_payload.py tests/integration/standard/test_custom_protocol_handler.py tests/integration/standard/test_cython_protocol_handlers.py tests/integration/standard/test_scylla_cloud.py
24+
./ci/run_integration_test.sh tests/integration/standard/test_authentication.py tests/integration/standard/test_cluster.py tests/integration/standard/test_concurrent.py tests/integration/standard/test_connection.py tests/integration/standard/test_control_connection.py tests/integration/standard/test_custom_payload.py tests/integration/standard/test_custom_protocol_handler.py tests/integration/standard/test_cython_protocol_handlers.py tests/integration/standard/test_scylla_cloud.py tests/integration/standard/test_use_keyspace.py
2525
# can't run this, cause only 2 cpus on github actions: tests/integration/standard/test_shard_aware.py

.github/workflows/integration-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ jobs:
2121

2222
- name: Test with pytest
2323
run: |
24-
./ci/run_integration_test.sh tests/integration/standard/test_authentication.py tests/integration/standard/test_cluster.py tests/integration/standard/test_concurrent.py tests/integration/standard/test_connection.py tests/integration/standard/test_control_connection.py tests/integration/standard/test_custom_payload.py tests/integration/standard/test_custom_protocol_handler.py tests/integration/standard/test_cython_protocol_handlers.py tests/integration/standard/test_scylla_cloud.py
24+
./ci/run_integration_test.sh tests/integration/standard/test_authentication.py tests/integration/standard/test_cluster.py tests/integration/standard/test_concurrent.py tests/integration/standard/test_connection.py tests/integration/standard/test_control_connection.py tests/integration/standard/test_custom_payload.py tests/integration/standard/test_custom_protocol_handler.py tests/integration/standard/test_cython_protocol_handlers.py tests/integration/standard/test_scylla_cloud.py tests/integration/standard/test_use_keyspace.py
2525
# can't run this, cause only 2 cpus on github actions: tests/integration/standard/test_shard_aware.py
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import os
2+
import time
3+
import random
4+
from subprocess import run
5+
import logging
6+
7+
try:
8+
from concurrent.futures import ThreadPoolExecutor, as_completed
9+
except ImportError:
10+
from futures import ThreadPoolExecutor, as_completed # noqa
11+
12+
try:
13+
import unittest2 as unittest
14+
except ImportError:
15+
import unittest # noqa
16+
17+
from mock import patch
18+
19+
from cassandra.connection import Connection
20+
from cassandra.cluster import Cluster
21+
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, ConstantReconnectionPolicy
22+
from cassandra import OperationTimedOut, ConsistencyLevel
23+
24+
from tests.integration import use_cluster, get_node, PROTOCOL_VERSION
25+
26+
LOGGER = logging.getLogger(__name__)
27+
28+
def setup_module():
29+
os.environ['SCYLLA_EXT_OPTS'] = "--smp 2 --memory 2048M"
30+
use_cluster('shared_aware', [3], start=True)
31+
32+
33+
34+
class TestUseKeyspace(unittest.TestCase):
35+
@classmethod
36+
def setup_class(cls):
37+
cls.cluster = Cluster(contact_points=["127.0.0.1"], protocol_version=PROTOCOL_VERSION,
38+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
39+
reconnection_policy=ConstantReconnectionPolicy(1))
40+
cls.session = cls.cluster.connect()
41+
LOGGER.info(cls.cluster.is_shard_aware())
42+
LOGGER.info(cls.cluster.shard_aware_stats())
43+
@classmethod
44+
def teardown_class(cls):
45+
cls.cluster.shutdown()
46+
47+
def test_set_keyspace_slow_connection(self):
48+
# Test that "USE keyspace" gets propagated
49+
# to all connections.
50+
#
51+
# Reproduces an issue #187 where some pending
52+
# connections for shards would not
53+
# receive "USE keyspace".
54+
#
55+
# Simulate that scenario by adding an artifical
56+
# delay before sending "USE keyspace" on
57+
# connections.
58+
59+
original_set_keyspace_blocking = Connection.set_keyspace_blocking
60+
def patched_set_keyspace_blocking(*args, **kwargs):
61+
time.sleep(1)
62+
return original_set_keyspace_blocking(*args, **kwargs)
63+
64+
with patch.object(Connection, "set_keyspace_blocking", patched_set_keyspace_blocking):
65+
self.session.execute("CREATE KEYSPACE test_set_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
66+
self.session.execute("CREATE TABLE test_set_keyspace.set_keyspace_slow_connection(pk int, PRIMARY KEY(pk))")
67+
68+
session2 = self.cluster.connect()
69+
session2.execute("USE test_set_keyspace")
70+
for i in range(200):
71+
session2.execute(f"SELECT * FROM set_keyspace_slow_connection WHERE pk = 1")

0 commit comments

Comments
 (0)