Skip to content

Commit fe8af81

Browse files
committed
test shard-awareness following pattern usage
1 parent f81d87f commit fe8af81

File tree

2 files changed

+47
-18
lines changed

2 files changed

+47
-18
lines changed

scylla-cql/src/types/serialize/row.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ impl SerializeRow for SerializedValues {
286286
_ctx: &RowSerializationContext<'_>,
287287
writer: &mut RowWriter,
288288
) -> Result<(), SerializationError> {
289-
Ok(writer.append_serialize_row(self))
289+
writer.append_serialize_row(self);
290+
Ok(())
290291
}
291292

292293
fn is_empty(&self) -> bool {

scylla/tests/integration/shard_aware_batching.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::utils::test_with_3_node_cluster;
22
use futures::prelude::*;
33
use futures_batch::ChunksTimeoutStreamExt;
44
use scylla::retry_policy::FallthroughRetryPolicy;
5+
use scylla::routing::Shard;
56
use scylla::serialize::row::SerializedValues;
67
use scylla::test_utils::unique_keyspace_name;
78
use scylla::transport::session::Session;
@@ -13,8 +14,8 @@ use tokio::sync::mpsc;
1314
use tokio_stream::wrappers::ReceiverStream;
1415

1516
use scylla_proxy::{
16-
Condition, ProxyError, Reaction, RequestFrame, RequestOpcode, RequestReaction, RequestRule,
17-
RunningProxy, ShardAwareness, WorkerError,
17+
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, RunningProxy,
18+
ShardAwareness, WorkerError,
1819
};
1920

2021
#[tokio::test]
@@ -55,6 +56,8 @@ async fn run_test(
5556
running_proxy.running_nodes[i].change_request_rules(Some(vec![prepared_rule(prepared_tx)]));
5657
prepared_rx
5758
});
59+
let shards_for_nodes_test_check: Arc<tokio::sync::Mutex<HashMap<uuid::Uuid, Vec<Shard>>>> =
60+
Default::default();
5861

5962
let handle = ExecutionProfile::builder()
6063
.retry_policy(Box::new(FallthroughRetryPolicy))
@@ -133,6 +136,7 @@ async fn run_test(
133136
scylla::batch::Batch::new(scylla::batch::BatchType::Unlogged);
134137
scylla_batch.enforce_target_node(&node, shard_id_on_node, &session);
135138

139+
let shards_for_nodes_test_check_clone = Arc::clone(&shards_for_nodes_test_check);
136140
batching_tasks.push(tokio::spawn(async move {
137141
let mut batches = ReceiverStream::new(receiver)
138142
.chunks_timeout(10, Duration::from_millis(100));
@@ -150,6 +154,13 @@ async fn run_test(
150154
.batch(&scylla_batch, &batch)
151155
.await
152156
.expect("Query to send batch failed");
157+
158+
shards_for_nodes_test_check_clone
159+
.lock()
160+
.await
161+
.entry(destination_shard.node_id)
162+
.or_default()
163+
.push(destination_shard.shard_id_on_node);
153164
}
154165
}));
155166
sender
@@ -172,23 +183,40 @@ async fn run_test(
172183

173184
// finally check that batching was indeed shard-aware.
174185

175-
// TODO
176-
177-
// wip: make sure we did capture the queries to each node
178-
fn clear_rxs(rxs: &mut [mpsc::UnboundedReceiver<(RequestFrame, Option<u16>)>; 3]) {
179-
for rx in rxs.iter_mut() {
180-
while rx.try_recv().is_ok() {}
186+
let mut expected: Vec<Vec<Shard>> = Arc::try_unwrap(shards_for_nodes_test_check)
187+
.expect("All batching tasks have finished")
188+
.into_inner()
189+
.into_values()
190+
.collect();
191+
192+
let mut nodes_shards_calls: Vec<Vec<Shard>> = Vec::new();
193+
for rx in prepared_rxs.iter_mut() {
194+
let mut shards_calls = Vec::new();
195+
shards_calls.push(
196+
rx.recv()
197+
.await
198+
.expect("Each node should have received at least one message")
199+
.1
200+
.expect("Calls should be shard-aware")
201+
.into(),
202+
);
203+
loop {
204+
match rx.try_recv() {
205+
Ok((_, call_shard)) => {
206+
shards_calls.push(call_shard.expect("Calls should all be shard-aware").into())
207+
}
208+
Err(_) => break,
209+
}
181210
}
211+
nodes_shards_calls.push(shards_calls);
182212
}
183-
async fn assert_all_replicas_queried(
184-
rxs: &mut [mpsc::UnboundedReceiver<(RequestFrame, Option<u16>)>; 3],
185-
) {
186-
for rx in rxs.iter_mut() {
187-
rx.recv().await.unwrap();
188-
}
189-
clear_rxs(rxs);
190-
}
191-
assert_all_replicas_queried(&mut prepared_rxs).await;
213+
214+
// Don't know which node is which
215+
// but at least once we don't care about which node is which they should agree about what was sent to what shard
216+
dbg!(&expected, &nodes_shards_calls);
217+
expected.sort_unstable();
218+
nodes_shards_calls.sort_unstable();
219+
assert_eq!(expected, nodes_shards_calls);
192220

193221
running_proxy
194222
}

0 commit comments

Comments
 (0)