Skip to content

Commit 56011b9

Browse files
authored
RUST-1588 Testing for RunCursorCommand (#931)
1 parent de8a21e commit 56011b9

File tree

7 files changed

+126
-21
lines changed

7 files changed

+126
-21
lines changed

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ impl Client {
620620
///
621621
/// Note that topology changes require rebuilding the connection pool, so this method cannot
622622
/// guarantee that the pool will always be filled for the lifetime of the `Client`.
623-
///
623+
///
624624
/// Does nothing if `min_pool_size` is unset or zero.
625625
pub async fn warm_connection_pool(&self) {
626626
if !self.inner.options.min_pool_size.map_or(false, |s| s > 0) {

src/client/executor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,6 @@ impl Client {
692692
}))
693693
}
694694
}
695-
696695
handle_response(self, op, session, is_sharded, response).await
697696
}
698697
Err(err) => Err(err),

src/coll/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ impl Hint {
7878
}
7979

8080
/// Specifies the type of cursor to return from a find operation.
81-
#[derive(Debug, Clone, Copy)]
81+
#[derive(Debug, Clone, Copy, Deserialize)]
82+
#[serde(rename_all = "camelCase")]
8283
#[non_exhaustive]
8384
pub enum CursorType {
8485
/// Default; close the cursor after the last document is received from the server.

src/db/options.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,10 @@ pub struct ChangeStreamPreAndPostImages {
315315

316316
/// Specifies the options to a
317317
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
318-
#[derive(Clone, Debug, Default, TypedBuilder)]
318+
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
319319
#[builder(field_defaults(default, setter(into)))]
320+
#[serde(rename_all = "camelCase")]
321+
#[serde(default)]
320322
#[non_exhaustive]
321323
pub struct RunCursorCommandOptions {
322324
/// The default read preference for operations.
@@ -325,6 +327,8 @@ pub struct RunCursorCommandOptions {
325327
pub cursor_type: Option<CursorType>,
326328
/// Number of documents to return per batch.
327329
pub batch_size: Option<u32>,
330+
#[serde(rename = "maxtime", alias = "maxTimeMS")]
331+
#[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
328332
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
329333
/// on subsequent getMore commands.
330334
pub max_time: Option<Duration>,

src/operation/run_cursor_command.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
concern::WriteConcern,
66
cursor::CursorSpecification,
77
error::{Error, Result},
8-
operation::{Operation, RunCommand},
8+
operation::{CursorBody, Operation, RunCommand},
99
options::RunCursorCommandOptions,
1010
selection_criteria::SelectionCriteria,
1111
};
@@ -94,25 +94,18 @@ impl<'conn> Operation for RunCursorCommand<'conn> {
9494
response: RawCommandResponse,
9595
description: &StreamDescription,
9696
) -> Result<Self::O> {
97-
let doc = Operation::handle_response(&self.run_command, response, description)?;
98-
let cursor_info = bson::from_document(doc)?;
99-
let batch_size = match &self.options {
100-
Some(options) => options.batch_size,
101-
None => None,
102-
};
103-
let max_time = match &self.options {
104-
Some(options) => options.max_time,
105-
None => None,
106-
};
97+
let cursor_response: CursorBody = response.body()?;
98+
10799
let comment = match &self.options {
108100
Some(options) => options.comment.clone(),
109101
None => None,
110102
};
103+
111104
Ok(CursorSpecification::new(
112-
cursor_info,
105+
cursor_response.cursor,
113106
description.server_address.clone(),
114-
batch_size,
115-
max_time,
107+
self.options.as_ref().and_then(|opts| opts.batch_size),
108+
self.options.as_ref().and_then(|opts| opts.max_time),
116109
comment,
117110
))
118111
}

src/test/spec/run_command.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use crate::test::{spec::unified_runner::run_unified_tests, LOCK};
55
async fn run_unified() {
66
let _guard = LOCK.run_exclusively().await;
77
run_unified_tests(&["run-command", "unified"])
8-
// TODO RUST-1588: Unskip this file
9-
.skip_files(&["runCursorCommand.json"])
108
.skip_tests(&[
119
// TODO re: RUST-1649: fix withTransaction for new test runner
1210
"attaches transaction fields to given command",
11+
// TODO fix in a follow up PR because need to add IterateOnce
12+
"does not close the cursor when receiving an empty batch",
1313
])
1414
.await;
1515
}

src/test/spec/unified_runner/operation.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::{
4545
client::session::TransactionState,
4646
coll::options::Hint,
4747
collation::Collation,
48+
db::options::RunCursorCommandOptions,
4849
error::{ErrorKind, Result},
4950
gridfs::options::{GridFsDownloadByNameOptions, GridFsUploadOptions},
5051
options::{
@@ -288,6 +289,7 @@ impl<'de> Deserialize<'de> for Operation {
288289
"deleteOne" => deserialize_op::<DeleteOne>(definition.arguments),
289290
"find" => deserialize_op::<Find>(definition.arguments),
290291
"createFindCursor" => deserialize_op::<CreateFindCursor>(definition.arguments),
292+
"createCommandCursor" => deserialize_op::<CreateCommandCursor>(definition.arguments),
291293
"aggregate" => deserialize_op::<Aggregate>(definition.arguments),
292294
"distinct" => deserialize_op::<Distinct>(definition.arguments),
293295
"countDocuments" => deserialize_op::<CountDocuments>(definition.arguments),
@@ -314,6 +316,7 @@ impl<'de> Deserialize<'de> for Operation {
314316
"createCollection" => deserialize_op::<CreateCollection>(definition.arguments),
315317
"dropCollection" => deserialize_op::<DropCollection>(definition.arguments),
316318
"runCommand" => deserialize_op::<RunCommand>(definition.arguments),
319+
"runCursorCommand" => deserialize_op::<RunCursorCommand>(definition.arguments),
317320
"endSession" => deserialize_op::<EndSession>(definition.arguments),
318321
"assertSessionTransactionState" => {
319322
deserialize_op::<AssertSessionTransactionState>(definition.arguments)
@@ -1561,7 +1564,9 @@ impl TestOperation for CreateCollection {
15611564
.create_collection(&self.collection, self.options.clone())
15621565
.await?;
15631566
}
1564-
Ok(None)
1567+
Ok(Some(Entity::Collection(
1568+
database.collection(&self.collection),
1569+
)))
15651570
}
15661571
.boxed()
15671572
}
@@ -1644,6 +1649,109 @@ impl TestOperation for RunCommand {
16441649
}
16451650
}
16461651

1652+
#[derive(Debug, Deserialize)]
1653+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
1654+
pub(super) struct RunCursorCommand {
1655+
command: Document,
1656+
// We don't need to use this field, but it needs to be included during deserialization so that
1657+
// we can use the deny_unknown_fields tag.
1658+
#[serde(rename = "commandName")]
1659+
_command_name: String,
1660+
1661+
#[serde(flatten)]
1662+
options: RunCursorCommandOptions,
1663+
session: Option<String>,
1664+
}
1665+
1666+
impl TestOperation for RunCursorCommand {
1667+
fn execute_entity_operation<'a>(
1668+
&'a self,
1669+
id: &'a str,
1670+
test_runner: &'a TestRunner,
1671+
) -> BoxFuture<'a, Result<Option<Entity>>> {
1672+
async move {
1673+
let command = self.command.clone();
1674+
let db = test_runner.get_database(id).await;
1675+
let options = self.options.clone();
1676+
1677+
let result = match &self.session {
1678+
Some(session_id) => {
1679+
with_mut_session!(test_runner, session_id, |session| async {
1680+
let mut cursor = db
1681+
.run_cursor_command_with_session(command, options, session)
1682+
.await?;
1683+
cursor.stream(session).try_collect::<Vec<_>>().await
1684+
})
1685+
.await?
1686+
}
1687+
None => {
1688+
let cursor = db.run_cursor_command(command, options).await?;
1689+
cursor.try_collect::<Vec<_>>().await?
1690+
}
1691+
};
1692+
1693+
Ok(Some(bson::to_bson(&result)?.into()))
1694+
}
1695+
.boxed()
1696+
}
1697+
}
1698+
1699+
#[derive(Debug, Deserialize)]
1700+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
1701+
pub struct CreateCommandCursor {
1702+
command: Document,
1703+
// We don't need to use this field, but it needs to be included during deserialization so that
1704+
// we can use the deny_unknown_fields tag.
1705+
#[serde(rename = "commandName")]
1706+
_command_name: String,
1707+
1708+
#[serde(flatten)]
1709+
options: RunCursorCommandOptions,
1710+
session: Option<String>,
1711+
}
1712+
1713+
impl TestOperation for CreateCommandCursor {
1714+
fn execute_entity_operation<'a>(
1715+
&'a self,
1716+
id: &'a str,
1717+
test_runner: &'a TestRunner,
1718+
) -> BoxFuture<'a, Result<Option<Entity>>> {
1719+
async move {
1720+
let command = self.command.clone();
1721+
let db = test_runner.get_database(id).await;
1722+
let options = self.options.clone();
1723+
1724+
match &self.session {
1725+
Some(session_id) => {
1726+
let mut ses_cursor = None;
1727+
with_mut_session!(test_runner, session_id, |session| async {
1728+
ses_cursor = Some(
1729+
db.run_cursor_command_with_session(command, options, session)
1730+
.await,
1731+
);
1732+
})
1733+
.await;
1734+
let test_cursor = TestCursor::Session {
1735+
cursor: ses_cursor.unwrap().unwrap(),
1736+
session_id: session_id.clone(),
1737+
};
1738+
Ok(Some(Entity::Cursor(test_cursor)))
1739+
}
1740+
None => {
1741+
let doc_cursor = db.run_cursor_command(command, options).await?;
1742+
let test_cursor = TestCursor::Normal(Mutex::new(doc_cursor));
1743+
Ok(Some(Entity::Cursor(test_cursor)))
1744+
}
1745+
}
1746+
}
1747+
.boxed()
1748+
}
1749+
1750+
fn returns_root_documents(&self) -> bool {
1751+
false
1752+
}
1753+
}
1754+
16471755
#[derive(Debug, Deserialize)]
16481756
#[serde(rename_all = "camelCase", deny_unknown_fields)]
16491757
pub(super) struct EndSession {}

0 commit comments

Comments
 (0)