Skip to content

Commit 7848b90

Browse files
committed
RUST-608 Dispatch threaded CMAP test operations in order
1 parent d79bddf commit 7848b90

File tree

2 files changed

+140
-182
lines changed

2 files changed

+140
-182
lines changed

src/cmap/test/file.rs

Lines changed: 29 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
use std::collections::VecDeque;
1+
use std::sync::Arc;
22

33
use serde::Deserialize;
44

5-
use super::event::Event;
6-
use crate::{cmap::options::ConnectionPoolOptions, error::ErrorKind, test::RunOn};
5+
use super::{event::Event, State};
6+
use crate::{
7+
cmap::options::ConnectionPoolOptions,
8+
error::{ErrorKind, Result},
9+
test::RunOn,
10+
};
711
use bson::Document;
812

913
#[derive(Debug, Deserialize)]
@@ -13,7 +17,7 @@ pub struct TestFile {
1317
style: TestStyle,
1418
pub description: String,
1519
pub pool_options: Option<ConnectionPoolOptions>,
16-
operations: VecDeque<ThreadedOperation>,
20+
pub operations: Vec<ThreadedOperation>,
1721
pub error: Option<Error>,
1822
pub events: Vec<Event>,
1923
#[serde(default)]
@@ -30,48 +34,39 @@ enum TestStyle {
3034
}
3135

3236
#[derive(Debug, Deserialize)]
33-
struct ThreadedOperation {
37+
pub struct ThreadedOperation {
3438
#[serde(flatten)]
3539
type_: Operation,
3640

3741
thread: Option<String>,
3842
}
3943

44+
impl ThreadedOperation {
45+
pub(super) async fn execute(self, state: Arc<State>) -> Result<()> {
46+
match self.thread {
47+
Some(thread_name) => {
48+
let threads = state.threads.read().await;
49+
let thread = threads.get(&thread_name).unwrap();
50+
thread.dispatcher.send(self.type_).unwrap();
51+
Ok(())
52+
}
53+
None => self.type_.execute(state).await,
54+
}
55+
}
56+
}
57+
4058
#[derive(Debug, Deserialize)]
4159
#[serde(tag = "name")]
4260
#[serde(rename_all = "camelCase")]
4361
pub enum Operation {
44-
Start {
45-
target: String,
46-
},
47-
Wait {
48-
ms: u64,
49-
},
50-
WaitForThread {
51-
target: String,
52-
},
53-
WaitForEvent {
54-
event: String,
55-
count: usize,
56-
},
57-
CheckOut {
58-
label: Option<String>,
59-
},
60-
CheckIn {
61-
connection: String,
62-
},
62+
Start { target: String },
63+
Wait { ms: u64 },
64+
WaitForThread { target: String },
65+
WaitForEvent { event: String, count: usize },
66+
CheckOut { label: Option<String> },
67+
CheckIn { connection: String },
6368
Clear,
6469
Close,
65-
66-
// In order to execute a `Start` operation, we need to know all of the operations that should
67-
// execute in the context of that thread. To achieve this, we preprocess the operations
68-
// specified by the test file to replace each instance of `Start` operation with a
69-
// `StartHelper` with the corresponding operations. `StartHelper` won't ever actually occur in
70-
// the original set of operations specified.
71-
StartHelper {
72-
target: String,
73-
operations: Vec<Operation>,
74-
},
7570
}
7671

7772
#[derive(Debug, Deserialize)]
@@ -94,54 +89,3 @@ impl Error {
9489
}
9590
}
9691
}
97-
98-
impl TestFile {
99-
pub fn process_operations(&mut self) -> Vec<Operation> {
100-
let mut processed_ops = Vec::new();
101-
102-
while let Some(operation) = self.operations.pop_front() {
103-
match operation.type_ {
104-
// When a `Start` operation is encountered, search the rest of the operations for
105-
// any that occur in the context of the corresponding thread, remove them from the
106-
// original set of operations, and add them to the newly created `StartHelper`
107-
// operation.
108-
Operation::Start { target } => {
109-
let start_helper = Operation::StartHelper {
110-
operations: remove_by(&mut self.operations, |op| {
111-
op.thread.as_ref() == Some(&target)
112-
})
113-
.into_iter()
114-
.map(|op| op.type_)
115-
.collect(),
116-
target,
117-
};
118-
119-
processed_ops.push(start_helper);
120-
}
121-
other => processed_ops.push(other),
122-
}
123-
}
124-
125-
processed_ops
126-
}
127-
}
128-
129-
// Removes all items in the `VecDeque` that fulfill the predicate and return them in order as a new
130-
// `Vec`.
131-
fn remove_by<T, F>(vec: &mut VecDeque<T>, pred: F) -> Vec<T>
132-
where
133-
F: Fn(&T) -> bool,
134-
{
135-
let mut i = 0;
136-
let mut removed = Vec::new();
137-
138-
while i < vec.len() {
139-
if pred(&vec[i]) {
140-
removed.push(vec.remove(i).unwrap());
141-
} else {
142-
i += 1;
143-
}
144-
}
145-
146-
removed
147-
}

0 commit comments

Comments
 (0)