diff --git a/src/action/run_command.rs b/src/action/run_command.rs index 3481c41b7..2466356f2 100644 --- a/src/action/run_command.rs +++ b/src/action/run_command.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use bson::{Bson, Document}; +use bson::{Bson, Document, RawDocumentBuf}; use crate::{ client::session::TransactionState, @@ -40,7 +40,27 @@ impl Database { pub fn run_command(&self, command: Document) -> RunCommand { RunCommand { db: self, - command, + command: RawDocumentBuf::from_document(&command), + options: None, + session: None, + } + } + + /// Runs a database-level command. + /// + /// Note that no inspection is done on `doc`, so the command will not use the database's default + /// read concern or write concern. If specific read concern or write concern is desired, it must + /// be specified manually. + /// Please note that run_raw_command doesn't validate WriteConcerns passed into the body of the + /// command document. + /// + /// `await` will return d[`Result`]. + #[deeplink] + #[options_doc(run_command)] + pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand { + RunCommand { + db: self, + command: Ok(command), options: None, session: None, } @@ -55,7 +75,22 @@ impl Database { pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand { RunCursorCommand { db: self, - command, + command: RawDocumentBuf::from_document(&command), + options: None, + session: ImplicitSession, + } + } + + /// Runs a database-level command and returns a cursor to the response. + /// + /// `await` will return d[`Result>`] or a + /// d[`Result>`] if a [`ClientSession`] is provided. + #[deeplink] + #[options_doc(run_cursor_command)] + pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand { + RunCursorCommand { + db: self, + command: Ok(command), options: None, session: ImplicitSession, } @@ -79,6 +114,21 @@ impl crate::sync::Database { self.async_database.run_command(command) } + /// Runs a database-level command. + /// + /// Note that no inspection is done on `doc`, so the command will not use the database's default + /// read concern or write concern. If specific read concern or write concern is desired, it must + /// be specified manually. + /// Please note that run_raw_command doesn't validate WriteConcerns passed into the body of the + /// command document. + /// + /// [`run`](RunCommand::run) will return d[`Result`]. + #[deeplink] + #[options_doc(run_command, sync)] + pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand { + self.async_database.run_raw_command(command) + } + /// Runs a database-level command and returns a cursor to the response. /// /// [`run`](RunCursorCommand::run) will return d[`Result>`] or a @@ -88,13 +138,23 @@ impl crate::sync::Database { pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand { self.async_database.run_cursor_command(command) } + + /// Runs a database-level command and returns a cursor to the response. + /// + /// [`run`](RunCursorCommand::run) will return d[`Result>`] or a + /// d[`Result>`] if a [`ClientSession`] is provided. + #[deeplink] + #[options_doc(run_cursor_command, sync)] + pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand { + self.async_database.run_raw_cursor_command(command) + } } /// Run a database-level command. Create with [`Database::run_command`]. #[must_use] pub struct RunCommand<'a> { db: &'a Database, - command: Document, + command: bson::raw::Result, options: Option, session: Option<&'a mut ClientSession>, } @@ -115,10 +175,11 @@ impl<'a> Action for RunCommand<'a> { async fn execute(self) -> Result { let mut selection_criteria = self.options.and_then(|o| o.selection_criteria); + let command = self.command?; if let Some(session) = &self.session { match session.transaction.state { TransactionState::Starting | TransactionState::InProgress => { - if self.command.contains_key("readConcern") { + if command.get("readConcern").is_ok_and(|rc| rc.is_some()) { return Err(ErrorKind::InvalidArgument { message: "Cannot set read concern after starting a transaction".into(), } @@ -139,12 +200,8 @@ impl<'a> Action for RunCommand<'a> { } } - let operation = run_command::RunCommand::new( - self.db.name().into(), - self.command, - selection_criteria, - None, - )?; + let operation = + run_command::RunCommand::new(self.db.name().into(), command, selection_criteria, None); self.db .client() .execute_operation(operation, self.session) @@ -157,7 +214,7 @@ impl<'a> Action for RunCommand<'a> { #[must_use] pub struct RunCursorCommand<'a, Session = ImplicitSession> { db: &'a Database, - command: Document, + command: bson::raw::Result, options: Option, session: Session, } @@ -192,10 +249,10 @@ impl<'a> Action for RunCursorCommand<'a, ImplicitSession> { .and_then(|options| options.selection_criteria.clone()); let rcc = run_command::RunCommand::new( self.db.name().to_string(), - self.command, + self.command?, selection_criteria, None, - )?; + ); let rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?; let client = self.db.client(); client.execute_cursor_operation(rc_command).await @@ -218,10 +275,10 @@ impl<'a> Action for RunCursorCommand<'a, ExplicitSession<'a>> { .and_then(|options| options.selection_criteria.clone()); let rcc = run_command::RunCommand::new( self.db.name().to_string(), - self.command, + self.command?, selection_criteria, None, - )?; + ); let rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?; let client = self.db.client(); client diff --git a/src/client/csfle/state_machine.rs b/src/client/csfle/state_machine.rs index bbfaed599..ecc81f149 100644 --- a/src/client/csfle/state_machine.rs +++ b/src/client/csfle/state_machine.rs @@ -126,7 +126,7 @@ impl CryptExecutor { let db = db.as_ref().ok_or_else(|| { Error::internal("db required for NeedMongoMarkings state") })?; - let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?); + let op = RawOutput(RunCommand::new(db.to_string(), command, None, None)); let mongocryptd_client = self.mongocryptd_client.as_ref().ok_or_else(|| { Error::invalid_argument("this operation requires mongocryptd") })?; diff --git a/src/coll.rs b/src/coll.rs index 9b3a0ff43..362f48e48 100644 --- a/src/coll.rs +++ b/src/coll.rs @@ -3,11 +3,11 @@ pub mod options; use std::{fmt, fmt::Debug, str::FromStr, sync::Arc}; +use bson::rawdoc; use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize}; use self::options::*; use crate::{ - bson::doc, client::options::ServerAddress, cmap::conn::PinnedConnectionHandle, concern::{ReadConcern, WriteConcern}, @@ -199,13 +199,13 @@ where let op = crate::operation::run_command::RunCommand::new( ns.db, - doc! { + rawdoc! { "killCursors": ns.coll.as_str(), "cursors": [cursor_id] }, drop_address.map(SelectionCriteria::from_address), pinned_connection, - )?; + ); self.client().execute_operation(op, None).await?; Ok(()) } diff --git a/src/operation/run_command.rs b/src/operation/run_command.rs index 140c19d40..a2f5edf3f 100644 --- a/src/operation/run_command.rs +++ b/src/operation/run_command.rs @@ -20,32 +20,17 @@ pub(crate) struct RunCommand<'conn> { impl<'conn> RunCommand<'conn> { pub(crate) fn new( - db: String, - command: Document, - selection_criteria: Option, - pinned_connection: Option<&'conn PinnedConnectionHandle>, - ) -> Result { - Ok(Self { - db, - command: RawDocumentBuf::from_document(&command)?, - selection_criteria, - pinned_connection, - }) - } - - #[cfg(feature = "in-use-encryption")] - pub(crate) fn new_raw( db: String, command: RawDocumentBuf, selection_criteria: Option, pinned_connection: Option<&'conn PinnedConnectionHandle>, - ) -> Result { - Ok(Self { + ) -> Self { + Self { db, command, selection_criteria, pinned_connection, - }) + } } fn command_name(&self) -> Option<&str> { diff --git a/src/test/db.rs b/src/test/db.rs index ab70a09c8..b1d26ddab 100644 --- a/src/test/db.rs +++ b/src/test/db.rs @@ -1,6 +1,7 @@ use std::cmp::Ord; -use futures::stream::TryStreamExt; +use bson::RawDocumentBuf; +use futures::{stream::TryStreamExt, StreamExt}; use serde::Deserialize; use crate::{ @@ -413,3 +414,55 @@ async fn aggregate_with_generics() { .await .unwrap(); } + +#[tokio::test] +async fn test_run_command() { + let client = Client::for_test().await; + let database = client.database("db"); + + // Test run_command + { + let got = database.run_command(doc! {"ping": 1}).await.unwrap(); + assert_eq!(crate::bson_util::get_int(got.get("ok").unwrap()), Some(1)); + } + + // Test run_raw_command + { + let mut cmd = RawDocumentBuf::new(); + cmd.append("ping", 1); + let got = database.run_raw_command(cmd).await.unwrap(); + assert_eq!(crate::bson_util::get_int(got.get("ok").unwrap()), Some(1)); + } + + // Create a collection with a single document + { + let coll = database.collection("coll"); + coll.drop().await.expect("should drop"); + coll.insert_one(doc! {"foo": "bar"}) + .await + .expect("should insert"); + } + + // Test run_cursor_command + { + let cursor = database + .run_cursor_command(doc! {"find": "coll", "filter": {}}) + .await + .unwrap(); + let v: Vec> = cursor.collect().await; + assert_eq!(v.len(), 1); + assert_eq!(v[0].as_ref().unwrap().get_str("foo"), Ok("bar")); + } + + // Test run_raw_cursor_command + { + let mut cmd = RawDocumentBuf::new(); + cmd.append("find", "coll"); + cmd.append("filter", RawDocumentBuf::new()); + + let cursor = database.run_raw_cursor_command(cmd).await.unwrap(); + let v: Vec> = cursor.collect().await; + assert_eq!(v.len(), 1); + assert_eq!(v[0].as_ref().unwrap().get_str("foo"), Ok("bar")); + } +}