Skip to content

Commit 6950c0c

Browse files
committed
Integration test for CachingSession
Adds the integration test `caching_session::ensure_cache_is_used` for `CachingSession`. The test sends through the `scylla_proxy` several queries and verifies that the number of prepare requests is exactly as expected. If the cache is working correctly no extra prepare requests should appear. If it's not, the assertions will bring attention to that.
1 parent b678d0d commit 6950c0c

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
use std::sync::Arc;
2+
3+
use crate::utils::test_with_3_node_cluster;
4+
use crate::utils::{setup_tracing, unique_keyspace_name, PerformDDL};
5+
use scylla::batch::Batch;
6+
use scylla::batch::BatchType;
7+
use scylla::client::caching_session::CachingSession;
8+
use scylla_proxy::RequestOpcode;
9+
use scylla_proxy::RequestReaction;
10+
use scylla_proxy::RequestRule;
11+
use scylla_proxy::ShardAwareness;
12+
use scylla_proxy::{Condition, ProxyError, Reaction, RequestFrame, TargetShard, WorkerError};
13+
use tokio::sync::mpsc;
14+
15+
fn consume_current_feedbacks(
16+
rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option<TargetShard>)>,
17+
) -> usize {
18+
std::iter::from_fn(|| rx.try_recv().ok()).count()
19+
}
20+
21+
#[tokio::test]
22+
#[cfg(not(scylla_cloud_tests))]
23+
async fn ensure_cache_is_used() {
24+
use scylla::client::execution_profile::ExecutionProfile;
25+
26+
use crate::utils::SingleTargetLBP;
27+
28+
setup_tracing();
29+
let res = test_with_3_node_cluster(
30+
ShardAwareness::QueryNode,
31+
|proxy_uris, translation_map, mut running_proxy| async move {
32+
let session = scylla::client::session_builder::SessionBuilder::new()
33+
.known_node(proxy_uris[0].as_str())
34+
.address_translator(Arc::new(translation_map))
35+
.build()
36+
.await
37+
.unwrap();
38+
39+
let cluster_size: usize = 3;
40+
let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..cluster_size)
41+
.map(|_| mpsc::unbounded_channel::<(RequestFrame, Option<TargetShard>)>())
42+
.unzip();
43+
for (i, tx) in feedback_txs.iter().cloned().enumerate() {
44+
running_proxy.running_nodes[i].change_request_rules(Some(vec![RequestRule(
45+
Condition::and(
46+
Condition::RequestOpcode(RequestOpcode::Prepare),
47+
Condition::not(Condition::ConnectionRegisteredAnyEvent),
48+
),
49+
RequestReaction::noop().with_feedback_when_performed(tx),
50+
)]));
51+
}
52+
53+
let ks = unique_keyspace_name();
54+
let rs = "{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}";
55+
session
56+
.ddl(format!(
57+
"CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {}",
58+
ks, rs
59+
))
60+
.await
61+
.unwrap();
62+
session.use_keyspace(ks, false).await.unwrap();
63+
session
64+
.ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))")
65+
.await
66+
.unwrap();
67+
// Assumption: all nodes have the same number of shards
68+
let nr_shards = session
69+
.get_cluster_state()
70+
.get_nodes_info()
71+
.first()
72+
.expect("No nodes information available")
73+
.sharder()
74+
.map(|sharder| sharder.nr_shards.get() as usize)
75+
.unwrap_or_else(|| 1); // If there is no sharder, assume 1 shard.
76+
77+
// Consume all feedbacks so far to ensure we will not count something unrelated.
78+
let _feedbacks = feedback_rxs
79+
.iter_mut()
80+
.map(consume_current_feedbacks)
81+
.sum::<usize>();
82+
83+
let caching_session: CachingSession = CachingSession::from(session, 100);
84+
85+
let batch_size: usize = 4;
86+
let mut batch = Batch::new(BatchType::Logged);
87+
for i in 1..=batch_size {
88+
let insert_b_c = format!("INSERT INTO tab (a, b, c) VALUES ({}, ?, ?)", i);
89+
batch.append_statement(insert_b_c.as_str());
90+
}
91+
let batch_values: Vec<(i32, i32)> = (1..=batch_size as i32).map(|i| (i, i)).collect();
92+
93+
// First batch that should generate prepares for each shard.
94+
caching_session
95+
.batch(&batch, batch_values.clone())
96+
.await
97+
.unwrap();
98+
let feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();
99+
assert_eq!(feedbacks, batch_size * nr_shards * cluster_size);
100+
101+
// Few extra runs. Those batches should not result in any prepares being sent.
102+
for _ in 0..4 {
103+
caching_session
104+
.batch(&batch, batch_values.clone())
105+
.await
106+
.unwrap();
107+
let feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();
108+
assert_eq!(feedbacks, 0);
109+
}
110+
111+
let prepared_batch_res_rows: Vec<(i32, i32, i32)> = caching_session
112+
.execute_unpaged("SELECT * FROM tab", &[])
113+
.await
114+
.unwrap()
115+
.into_rows_result()
116+
.unwrap()
117+
.rows()
118+
.unwrap()
119+
.collect::<Result<_, _>>()
120+
.unwrap();
121+
122+
// Select should have been prepared on all shards
123+
let feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();
124+
assert_eq!(feedbacks, nr_shards * cluster_size);
125+
126+
// Verify the data from inserts
127+
let mut prepared_batch_res_rows = prepared_batch_res_rows;
128+
prepared_batch_res_rows.sort();
129+
let expected_rows: Vec<(i32, i32, i32)> =
130+
(1..=batch_size as i32).map(|i| (i, i, i)).collect();
131+
assert_eq!(prepared_batch_res_rows, expected_rows);
132+
133+
// Run some alters to invalidate the server side cache, similarly to scylla/src/session_test.rs
134+
caching_session
135+
.ddl("ALTER TABLE tab RENAME c to tmp")
136+
.await
137+
.unwrap();
138+
caching_session
139+
.ddl("ALTER TABLE tab RENAME b to c")
140+
.await
141+
.unwrap();
142+
caching_session
143+
.ddl("ALTER TABLE tab RENAME tmp to b")
144+
.await
145+
.unwrap();
146+
147+
// execute_unpageds caused by alters likely resulted in some prepares being sent.
148+
// Consume those frames.
149+
feedback_rxs
150+
.iter_mut()
151+
.map(consume_current_feedbacks)
152+
.sum::<usize>();
153+
154+
// Run batch for the each shard. The server cache should be updated on the first mismatch,
155+
// therefore only first contacted shard will request reprepare due to mismatch.
156+
for node_info in caching_session
157+
.get_session()
158+
.get_cluster_state()
159+
.get_nodes_info()
160+
.iter()
161+
{
162+
for shard_id in 0..nr_shards {
163+
let policy = SingleTargetLBP {
164+
target: (node_info.clone(), Some(shard_id as u32)),
165+
};
166+
let execution_profile = ExecutionProfile::builder()
167+
.load_balancing_policy(Arc::new(policy))
168+
.build();
169+
batch.set_execution_profile_handle(Some(execution_profile.into_handle()));
170+
caching_session
171+
.batch(&batch, batch_values.clone())
172+
.await
173+
.unwrap();
174+
let feedbacks: usize =
175+
feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();
176+
let expected_feedbacks = if shard_id == 0 { batch_size } else { 0 };
177+
assert_eq!(
178+
feedbacks, expected_feedbacks,
179+
"Mismatch in feedbacks on execution for node: {:?}, shard: {}",
180+
node_info, shard_id
181+
);
182+
}
183+
}
184+
running_proxy
185+
},
186+
)
187+
.await;
188+
match res {
189+
Ok(()) => (),
190+
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
191+
Err(err) => panic!("{}", err),
192+
}
193+
}

scylla/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod authenticate;
22
mod batch;
3+
mod caching_session;
34
mod consistency;
45
mod cql_collections;
56
mod cql_types;

0 commit comments

Comments
 (0)