Skip to content

Commit 842c536

Browse files
authored
RUST-1800: Modularize unified_runner/operation.rs (#1232)
RUST-1800 Modularize unified_runner/operation.rs
1 parent ccd544b commit 842c536

File tree

19 files changed

+2595
-2353
lines changed

19 files changed

+2595
-2353
lines changed

src/test/spec/unified_runner/operation.rs

Lines changed: 181 additions & 2353 deletions
Large diffs are not rendered by default.
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use crate::{
2+
error::Result,
3+
options::{AggregateOptions, CreateCollectionOptions, DropCollectionOptions},
4+
test::spec::unified_runner::{
5+
operation::{with_mut_session, with_opt_session, TestOperation},
6+
Entity,
7+
TestRunner,
8+
},
9+
Collection,
10+
Database,
11+
};
12+
use bson::{doc, Bson, Document};
13+
use futures::{future::BoxFuture, TryStreamExt};
14+
use futures_util::FutureExt;
15+
use serde::Deserialize;
16+
17+
#[derive(Debug, Deserialize)]
18+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
19+
pub(super) struct AssertCollectionExists {
20+
collection_name: String,
21+
database_name: String,
22+
}
23+
24+
impl TestOperation for AssertCollectionExists {
25+
fn execute_test_runner_operation<'a>(
26+
&'a self,
27+
test_runner: &'a TestRunner,
28+
) -> BoxFuture<'a, ()> {
29+
async move {
30+
let db = test_runner.internal_client.database(&self.database_name);
31+
let names = db.list_collection_names().await.unwrap();
32+
assert!(names.contains(&self.collection_name));
33+
}
34+
.boxed()
35+
}
36+
}
37+
38+
#[derive(Debug, Deserialize)]
39+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
40+
pub(super) struct AssertCollectionNotExists {
41+
collection_name: String,
42+
database_name: String,
43+
}
44+
45+
impl TestOperation for AssertCollectionNotExists {
46+
fn execute_test_runner_operation<'a>(
47+
&'a self,
48+
test_runner: &'a TestRunner,
49+
) -> BoxFuture<'a, ()> {
50+
async move {
51+
let db = test_runner.internal_client.database(&self.database_name);
52+
let names = db.list_collection_names().await.unwrap();
53+
assert!(!names.contains(&self.collection_name));
54+
}
55+
.boxed()
56+
}
57+
}
58+
59+
#[derive(Debug, Deserialize)]
60+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
61+
pub(super) struct CreateCollection {
62+
collection: String,
63+
#[serde(flatten)]
64+
options: CreateCollectionOptions,
65+
session: Option<String>,
66+
}
67+
68+
impl TestOperation for CreateCollection {
69+
fn execute_entity_operation<'a>(
70+
&'a self,
71+
id: &'a str,
72+
test_runner: &'a TestRunner,
73+
) -> BoxFuture<'a, Result<Option<Entity>>> {
74+
async move {
75+
let database = test_runner.get_database(id).await;
76+
with_opt_session!(
77+
test_runner,
78+
&self.session,
79+
database
80+
.create_collection(&self.collection)
81+
.with_options(self.options.clone()),
82+
)
83+
.await?;
84+
Ok(Some(Entity::Collection(
85+
database.collection(&self.collection),
86+
)))
87+
}
88+
.boxed()
89+
}
90+
}
91+
92+
#[derive(Debug, Deserialize)]
93+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
94+
pub(super) struct DropCollection {
95+
collection: String,
96+
#[serde(flatten)]
97+
options: DropCollectionOptions,
98+
session: Option<String>,
99+
}
100+
101+
impl TestOperation for DropCollection {
102+
fn execute_entity_operation<'a>(
103+
&'a self,
104+
id: &'a str,
105+
test_runner: &'a TestRunner,
106+
) -> BoxFuture<'a, Result<Option<Entity>>> {
107+
async move {
108+
let database = test_runner.get_database(id).await;
109+
let collection = database.collection::<Document>(&self.collection).clone();
110+
with_opt_session!(
111+
test_runner,
112+
&self.session,
113+
collection.drop().with_options(self.options.clone()),
114+
)
115+
.await?;
116+
Ok(None)
117+
}
118+
.boxed()
119+
}
120+
}
121+
122+
#[derive(Debug, Deserialize)]
123+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
124+
pub(super) struct RenameCollection {
125+
to: String,
126+
}
127+
128+
impl TestOperation for RenameCollection {
129+
fn execute_entity_operation<'a>(
130+
&'a self,
131+
id: &'a str,
132+
test_runner: &'a TestRunner,
133+
) -> BoxFuture<'a, Result<Option<Entity>>> {
134+
async move {
135+
let target = test_runner.get_collection(id).await;
136+
let ns = target.namespace();
137+
let mut to_ns = ns.clone();
138+
to_ns.coll.clone_from(&self.to);
139+
let cmd = doc! {
140+
"renameCollection": crate::bson::to_bson(&ns)?,
141+
"to": crate::bson::to_bson(&to_ns)?,
142+
};
143+
let admin = test_runner.internal_client.database("admin");
144+
admin.run_command(cmd).await?;
145+
Ok(None)
146+
}
147+
.boxed()
148+
}
149+
}
150+
151+
#[derive(Debug, Deserialize)]
152+
pub(super) struct Aggregate {
153+
pipeline: Vec<Document>,
154+
session: Option<String>,
155+
#[serde(flatten)]
156+
options: AggregateOptions,
157+
}
158+
159+
impl TestOperation for Aggregate {
160+
fn execute_entity_operation<'a>(
161+
&'a self,
162+
id: &'a str,
163+
test_runner: &'a TestRunner,
164+
) -> BoxFuture<'a, Result<Option<Entity>>> {
165+
async move {
166+
let result = match &self.session {
167+
Some(session_id) => {
168+
enum AggregateEntity {
169+
Collection(Collection<Document>),
170+
Database(Database),
171+
Other(String),
172+
}
173+
let entity = match test_runner.entities.read().await.get(id).unwrap() {
174+
Entity::Collection(c) => AggregateEntity::Collection(c.clone()),
175+
Entity::Database(d) => AggregateEntity::Database(d.clone()),
176+
other => AggregateEntity::Other(format!("{:?}", other)),
177+
};
178+
with_mut_session!(test_runner, session_id, |session| async {
179+
let mut cursor = match entity {
180+
AggregateEntity::Collection(collection) => {
181+
collection
182+
.aggregate(self.pipeline.clone())
183+
.with_options(self.options.clone())
184+
.session(&mut *session)
185+
.await?
186+
}
187+
AggregateEntity::Database(db) => {
188+
db.aggregate(self.pipeline.clone())
189+
.with_options(self.options.clone())
190+
.session(&mut *session)
191+
.await?
192+
}
193+
AggregateEntity::Other(debug) => {
194+
panic!("Cannot execute aggregate on {}", &debug)
195+
}
196+
};
197+
cursor.stream(session).try_collect::<Vec<Document>>().await
198+
})
199+
.await?
200+
}
201+
None => {
202+
let entities = test_runner.entities.read().await;
203+
let cursor = match entities.get(id).unwrap() {
204+
Entity::Collection(collection) => {
205+
collection
206+
.aggregate(self.pipeline.clone())
207+
.with_options(self.options.clone())
208+
.await?
209+
}
210+
Entity::Database(db) => {
211+
db.aggregate(self.pipeline.clone())
212+
.with_options(self.options.clone())
213+
.await?
214+
}
215+
other => panic!("Cannot execute aggregate on {:?}", &other),
216+
};
217+
cursor.try_collect::<Vec<Document>>().await?
218+
}
219+
};
220+
Ok(Some(Bson::from(result).into()))
221+
}
222+
.boxed()
223+
}
224+
225+
fn returns_root_documents(&self) -> bool {
226+
true
227+
}
228+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
use crate::{
2+
action::Action,
3+
error::Result,
4+
options::{RunCursorCommandOptions, SelectionCriteria},
5+
test::spec::unified_runner::{
6+
operation::{with_mut_session, with_opt_session, TestOperation},
7+
Entity,
8+
TestCursor,
9+
TestRunner,
10+
},
11+
};
12+
use bson::{to_bson, Document};
13+
use futures::{future::BoxFuture, TryStreamExt};
14+
use futures_util::FutureExt;
15+
use serde::Deserialize;
16+
use tokio::sync::Mutex;
17+
18+
#[derive(Debug, Deserialize)]
19+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
20+
pub(super) struct RunCommand {
21+
command: Document,
22+
// We don't need to use this field, but it needs to be included during deserialization so that
23+
// we can use the deny_unknown_fields tag.
24+
#[serde(rename = "commandName")]
25+
_command_name: String,
26+
read_preference: Option<SelectionCriteria>,
27+
session: Option<String>,
28+
}
29+
30+
impl TestOperation for RunCommand {
31+
fn execute_entity_operation<'a>(
32+
&'a self,
33+
id: &'a str,
34+
test_runner: &'a TestRunner,
35+
) -> BoxFuture<'a, Result<Option<Entity>>> {
36+
async move {
37+
let command = self.command.clone();
38+
39+
let db = test_runner.get_database(id).await;
40+
let result = with_opt_session!(
41+
test_runner,
42+
&self.session,
43+
db.run_command(command)
44+
.optional(self.read_preference.clone(), |a, rp| {
45+
a.selection_criteria(rp)
46+
}),
47+
)
48+
.await?;
49+
let result = to_bson(&result)?;
50+
Ok(Some(result.into()))
51+
}
52+
.boxed()
53+
}
54+
}
55+
56+
#[derive(Debug, Deserialize)]
57+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
58+
pub(super) struct RunCursorCommand {
59+
command: Document,
60+
// We don't need to use this field, but it needs to be included during deserialization so that
61+
// we can use the deny_unknown_fields tag.
62+
#[serde(rename = "commandName")]
63+
_command_name: String,
64+
65+
#[serde(flatten)]
66+
options: RunCursorCommandOptions,
67+
session: Option<String>,
68+
}
69+
70+
impl TestOperation for RunCursorCommand {
71+
fn execute_entity_operation<'a>(
72+
&'a self,
73+
id: &'a str,
74+
test_runner: &'a TestRunner,
75+
) -> BoxFuture<'a, Result<Option<Entity>>> {
76+
async move {
77+
let command = self.command.clone();
78+
let db = test_runner.get_database(id).await;
79+
let options = self.options.clone();
80+
81+
let action = db.run_cursor_command(command).with_options(options);
82+
let result = match &self.session {
83+
Some(session_id) => {
84+
with_mut_session!(test_runner, session_id, |session| async {
85+
let mut cursor = action.session(&mut *session).await?;
86+
cursor.stream(session).try_collect::<Vec<_>>().await
87+
})
88+
.await?
89+
}
90+
None => {
91+
let cursor = action.await?;
92+
cursor.try_collect::<Vec<_>>().await?
93+
}
94+
};
95+
96+
Ok(Some(bson::to_bson(&result)?.into()))
97+
}
98+
.boxed()
99+
}
100+
}
101+
102+
#[derive(Debug, Deserialize)]
103+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
104+
pub struct CreateCommandCursor {
105+
command: Document,
106+
// We don't need to use this field, but it needs to be included during deserialization so that
107+
// we can use the deny_unknown_fields tag.
108+
#[serde(rename = "commandName")]
109+
_command_name: String,
110+
111+
#[serde(flatten)]
112+
options: RunCursorCommandOptions,
113+
session: Option<String>,
114+
}
115+
116+
impl TestOperation for CreateCommandCursor {
117+
fn execute_entity_operation<'a>(
118+
&'a self,
119+
id: &'a str,
120+
test_runner: &'a TestRunner,
121+
) -> BoxFuture<'a, Result<Option<Entity>>> {
122+
async move {
123+
let command = self.command.clone();
124+
let db = test_runner.get_database(id).await;
125+
let options = self.options.clone();
126+
127+
let action = db.run_cursor_command(command).with_options(options);
128+
match &self.session {
129+
Some(session_id) => {
130+
let mut ses_cursor = None;
131+
with_mut_session!(test_runner, session_id, |session| async {
132+
ses_cursor = Some(action.session(session).await);
133+
})
134+
.await;
135+
let test_cursor = TestCursor::Session {
136+
cursor: ses_cursor.unwrap().unwrap(),
137+
session_id: session_id.clone(),
138+
};
139+
Ok(Some(Entity::Cursor(test_cursor)))
140+
}
141+
None => {
142+
let doc_cursor = action.await?;
143+
let test_cursor = TestCursor::Normal(Mutex::new(doc_cursor));
144+
Ok(Some(Entity::Cursor(test_cursor)))
145+
}
146+
}
147+
}
148+
.boxed()
149+
}
150+
151+
fn returns_root_documents(&self) -> bool {
152+
false
153+
}
154+
}

0 commit comments

Comments
 (0)