Skip to content

Commit 4c7e189

Browse files
authored
Merge pull request #1351 from wprzytula/reorganise-integration-tests
Reorganise integration tests - part 2
2 parents 8766094 + 7b4dadb commit 4c7e189

34 files changed

+4197
-4126
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use scylla::client::execution_profile::ExecutionProfile;
2+
use scylla::policies::load_balancing::{DefaultPolicy, LatencyAwarenessBuilder};
3+
4+
use crate::utils::{create_new_session_builder, setup_tracing};
5+
6+
// This is a regression test for #696.
7+
#[tokio::test]
8+
#[ntest::timeout(1000)]
9+
async fn latency_aware_query_completes() {
10+
setup_tracing();
11+
let policy = DefaultPolicy::builder()
12+
.latency_awareness(LatencyAwarenessBuilder::default())
13+
.build();
14+
let handle = ExecutionProfile::builder()
15+
.load_balancing_policy(policy)
16+
.build()
17+
.into_handle();
18+
19+
let session = create_new_session_builder()
20+
.default_execution_profile_handle(handle)
21+
.build()
22+
.await
23+
.unwrap();
24+
25+
session.query_unpaged("whatever", ()).await.unwrap_err();
26+
}

scylla/tests/integration/load_balancing/load_balancing.rs

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
#[expect(clippy::module_inception)]
2-
mod load_balancing;
1+
mod latency_awareness;
32
mod lwt_optimisation;
43
mod shards;
4+
mod simple_strategy;
55
mod tablets;
6+
mod token_awareness;

scylla/tests/integration/load_balancing/shards.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ use std::collections::HashSet;
22
use std::sync::Arc;
33

44
use crate::utils::{
5-
scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name,
6-
PerformDDL,
5+
create_new_session_builder, scylla_supports_tablets, setup_tracing, test_with_3_node_cluster,
6+
unique_keyspace_name, PerformDDL,
77
};
8+
use scylla::client::execution_profile::ExecutionProfile;
89
use scylla::client::session_builder::SessionBuilder;
10+
use scylla::cluster::{ClusterState, NodeRef};
11+
use scylla::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
12+
use scylla::routing::Shard;
913
use tokio::sync::mpsc;
1014

1115
use scylla_proxy::TargetShard;
@@ -87,3 +91,54 @@ async fn test_consistent_shard_awareness() {
8791
Err(err) => panic!("{}", err),
8892
}
8993
}
94+
95+
/// Regression test for panic that appeared if LBP picked a
96+
/// shard that is out of range for the node.
97+
#[tokio::test]
98+
async fn test_shard_out_of_range() {
99+
setup_tracing();
100+
101+
#[derive(Debug)]
102+
struct ShardOutOfRangeLBP;
103+
impl LoadBalancingPolicy for ShardOutOfRangeLBP {
104+
fn pick<'a>(
105+
&'a self,
106+
_query: &'a RoutingInfo,
107+
cluster: &'a ClusterState,
108+
) -> Option<(NodeRef<'a>, Option<Shard>)> {
109+
let node = &cluster.get_nodes_info()[0];
110+
match node.sharder() {
111+
Some(sharder) => Some((node, Some(sharder.nr_shards.get() as u32))),
112+
// For Cassandra let's pick some crazy shard number - it should be ignored anyway.
113+
None => Some((node, Some(u16::MAX as u32))),
114+
}
115+
}
116+
117+
fn fallback<'a>(
118+
&'a self,
119+
_query: &'a RoutingInfo,
120+
_cluster: &'a ClusterState,
121+
) -> FallbackPlan<'a> {
122+
Box::new(std::iter::empty())
123+
}
124+
125+
fn name(&self) -> String {
126+
"ShardOutOfRangeLBP".into()
127+
}
128+
}
129+
130+
let handle = ExecutionProfile::builder()
131+
.load_balancing_policy(Arc::new(ShardOutOfRangeLBP))
132+
.build()
133+
.into_handle();
134+
let session = create_new_session_builder()
135+
.default_execution_profile_handle(handle)
136+
.build()
137+
.await
138+
.unwrap();
139+
140+
let _ = session
141+
.query_unpaged("SELECT * FROM system.local WHERE key='local'", ())
142+
.await
143+
.unwrap();
144+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use crate::utils::{create_new_session_builder, unique_keyspace_name, PerformDDL as _};
2+
3+
/// It's recommended to use NetworkTopologyStrategy everywhere, so most tests use only NetworkTopologyStrategy.
4+
/// We still support SimpleStrategy, so to make sure that SimpleStrategy works correctly this test runs
5+
/// a few queries in a SimpleStrategy keyspace.
6+
#[tokio::test]
7+
async fn simple_strategy_test() {
8+
let ks = unique_keyspace_name();
9+
let session = create_new_session_builder().build().await.unwrap();
10+
11+
session
12+
.ddl(format!(
13+
"CREATE KEYSPACE {} WITH REPLICATION = \
14+
{{'class': 'SimpleStrategy', 'replication_factor': 1}}",
15+
ks
16+
))
17+
.await
18+
.unwrap();
19+
20+
session
21+
.ddl(format!(
22+
"CREATE TABLE {}.tab (p int, c int, r int, PRIMARY KEY (p, c, r))",
23+
ks
24+
))
25+
.await
26+
.unwrap();
27+
28+
session
29+
.query_unpaged(
30+
format!("INSERT INTO {}.tab (p, c, r) VALUES (1, 2, 3)", ks),
31+
(),
32+
)
33+
.await
34+
.unwrap();
35+
36+
session
37+
.query_unpaged(
38+
format!("INSERT INTO {}.tab (p, c, r) VALUES (?, ?, ?)", ks),
39+
(4, 5, 6),
40+
)
41+
.await
42+
.unwrap();
43+
44+
let prepared = session
45+
.prepare(format!("INSERT INTO {}.tab (p, c, r) VALUES (?, ?, ?)", ks))
46+
.await
47+
.unwrap();
48+
49+
session.execute_unpaged(&prepared, (7, 8, 9)).await.unwrap();
50+
51+
let mut rows: Vec<(i32, i32, i32)> = session
52+
.query_unpaged(format!("SELECT p, c, r FROM {}.tab", ks), ())
53+
.await
54+
.unwrap()
55+
.into_rows_result()
56+
.unwrap()
57+
.rows::<(i32, i32, i32)>()
58+
.unwrap()
59+
.map(|r| r.unwrap())
60+
.collect::<Vec<(i32, i32, i32)>>();
61+
rows.sort();
62+
63+
assert_eq!(rows, vec![(1, 2, 3), (4, 5, 6), (7, 8, 9)]);
64+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use crate::utils::{
2+
create_new_session_builder, scylla_supports_tablets, setup_tracing, unique_keyspace_name,
3+
PerformDDL as _,
4+
};
5+
6+
#[tokio::test]
7+
async fn test_token_awareness() {
8+
setup_tracing();
9+
let session = create_new_session_builder().build().await.unwrap();
10+
let ks = unique_keyspace_name();
11+
12+
// Need to disable tablets in this test because they make token routing
13+
// work differently, and in this test we want to test the classic token ring
14+
// behavior.
15+
let mut create_ks = format!(
16+
"CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}"
17+
);
18+
if scylla_supports_tablets(&session).await {
19+
create_ks += " AND TABLETS = {'enabled': false}"
20+
}
21+
22+
session.ddl(create_ks).await.unwrap();
23+
session
24+
.ddl(format!(
25+
"CREATE TABLE IF NOT EXISTS {}.t (a text primary key)",
26+
ks
27+
))
28+
.await
29+
.unwrap();
30+
31+
let mut prepared_statement = session
32+
.prepare(format!("INSERT INTO {}.t (a) VALUES (?)", ks))
33+
.await
34+
.unwrap();
35+
prepared_statement.set_tracing(true);
36+
37+
// The default policy should be token aware
38+
for size in 1..50usize {
39+
let key = vec!['a'; size].into_iter().collect::<String>();
40+
let values = (&key,);
41+
42+
// Execute a query and observe tracing info
43+
let res = session
44+
.execute_unpaged(&prepared_statement, values)
45+
.await
46+
.unwrap();
47+
let tracing_info = session
48+
.get_tracing_info(res.tracing_id().as_ref().unwrap())
49+
.await
50+
.unwrap();
51+
52+
// Verify that only one node was involved
53+
assert_eq!(tracing_info.nodes().len(), 1);
54+
55+
// Do the same with execute_iter (it now works with writes)
56+
let iter = session
57+
.execute_iter(prepared_statement.clone(), values)
58+
.await
59+
.unwrap();
60+
let tracing_id = iter.tracing_ids()[0];
61+
let tracing_info = session.get_tracing_info(&tracing_id).await.unwrap();
62+
63+
// Again, verify that only one node was involved
64+
assert_eq!(tracing_info.nodes().len(), 1);
65+
}
66+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use crate::utils::{
2+
create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL as _,
3+
};
4+
5+
#[tokio::test]
6+
async fn test_macros_complex_pk() {
7+
setup_tracing();
8+
let session = create_new_session_builder().build().await.unwrap();
9+
let ks = unique_keyspace_name();
10+
11+
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
12+
session.use_keyspace(ks, true).await.unwrap();
13+
session
14+
.ddl("CREATE TABLE IF NOT EXISTS complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))")
15+
.await
16+
.unwrap();
17+
18+
// Check that SerializeRow and DeserializeRow macros work
19+
{
20+
#[derive(scylla::SerializeRow, scylla::DeserializeRow, PartialEq, Debug, Clone)]
21+
struct ComplexPk {
22+
a: i32,
23+
b: i32,
24+
c: Option<String>,
25+
d: i32,
26+
e: i32,
27+
}
28+
let input: ComplexPk = ComplexPk {
29+
a: 9,
30+
b: 8,
31+
c: Some("seven".into()),
32+
d: 6,
33+
e: 5,
34+
};
35+
session
36+
.query_unpaged(
37+
"INSERT INTO complex_pk (a,b,c,d,e) VALUES (?,?,?,?,?)",
38+
input.clone(),
39+
)
40+
.await
41+
.unwrap();
42+
let output: ComplexPk = session
43+
.query_unpaged(
44+
"SELECT * FROM complex_pk WHERE a = 9 and b = 8 and c = 'seven'",
45+
&[],
46+
)
47+
.await
48+
.unwrap()
49+
.into_rows_result()
50+
.unwrap()
51+
.single_row()
52+
.unwrap();
53+
assert_eq!(input, output)
54+
}
55+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
mod complex_pk;
12
mod hygiene;

0 commit comments

Comments
 (0)