From cb9c2d369c53428e98f871251d6f3efb93cec44c Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Tue, 11 Nov 2025 13:05:38 -0800 Subject: [PATCH 01/17] WIP --- crates/mox_client/python/tests/conftest.py | 18 + exe_name.txt | 1 + src/action/find.rs | 82 ++++ src/client/executor.rs | 55 +++ src/cursor.rs | 1 + src/cursor/raw_batch.rs | 508 +++++++++++++++++++++ src/lib.rs | 1 + src/operation.rs | 2 + src/operation/find_raw.rs | 164 +++++++ src/operation/get_more_raw.rs | 134 ++++++ src/test/coll.rs | 29 ++ 11 files changed, 995 insertions(+) create mode 100644 crates/mox_client/python/tests/conftest.py create mode 100644 exe_name.txt create mode 100644 src/cursor/raw_batch.rs create mode 100644 src/operation/find_raw.rs create mode 100644 src/operation/get_more_raw.rs diff --git a/crates/mox_client/python/tests/conftest.py b/crates/mox_client/python/tests/conftest.py new file mode 100644 index 000000000..5d871b105 --- /dev/null +++ b/crates/mox_client/python/tests/conftest.py @@ -0,0 +1,18 @@ +import logging +import sys + + +def pytest_configure(config): + # Configure root logger so Rust logs forwarded via pyo3-log are visible + root = logging.getLogger() + if not root.handlers: # avoid duplicate handlers under repeated runs + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter( + logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") + ) + root.addHandler(handler) + # Default to INFO globally; turn up verbosity for our crate + root.setLevel(logging.INFO) + logging.getLogger("mox_client").setLevel(logging.DEBUG) + + diff --git a/exe_name.txt b/exe_name.txt new file mode 100644 index 000000000..930fc389c --- /dev/null +++ b/exe_name.txt @@ -0,0 +1 @@ +/Users/silasmarvin/github/mongo-rust-driver/target/debug/deps/mongodb-dfd7b57d04f5145a \ No newline at end of file diff --git a/src/action/find.rs b/src/action/find.rs index 121e9d00c..9dae77543 100644 --- a/src/action/find.rs +++ b/src/action/find.rs @@ -22,6 +22,7 @@ use super::{ export_doc, option_setters, options_doc, + CollRef, ExplicitSession, ImplicitSession, }; @@ -41,6 +42,21 @@ impl Collection { session: ImplicitSession, } } + + /// Finds the documents and returns raw server batches. + /// + /// `await` will return d[`Result`] (or + /// d[`Result`] if a session is provided). + #[deeplink] + #[options_doc(find)] + pub fn find_raw_batches(&self, filter: Document) -> FindRawBatches<'_> { + FindRawBatches { + cr: CollRef::new(self), + filter, + options: None, + session: ImplicitSession, + } + } } impl Collection { @@ -142,6 +158,72 @@ impl<'a, T: Send + Sync> Action for Find<'a, T, ExplicitSession<'a>> { } } +#[must_use] +pub struct FindRawBatches<'a, Session = ImplicitSession> { + cr: CollRef<'a>, + filter: Document, + options: Option, + session: Session, +} + +#[option_setters(crate::coll::options::FindOptions)] +#[export_doc(find_raw_batches)] +impl<'a, Session> FindRawBatches<'a, Session> { + pub fn session<'s>( + self, + value: impl Into<&'s mut ClientSession>, + ) -> FindRawBatches<'a, ExplicitSession<'s>> { + FindRawBatches { + cr: self.cr, + filter: self.filter, + options: self.options, + session: ExplicitSession(value.into()), + } + } +} + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, ImplicitSession> { + type Future = FindRawBatchesFuture; + + async fn execute(mut self) -> Result { + resolve_options!(self.cr, self.options, [read_concern, selection_criteria]); + let op = crate::operation::find_raw::FindRaw::new( + self.cr.namespace(), + self.filter, + self.options, + ); + self.cr + .client() + .execute_raw_batch_cursor_operation(op) + .await + } +} + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, ExplicitSession<'a>> { + type Future = FindRawBatchesSessionFuture; + + async fn execute(mut self) -> Result { + resolve_read_concern_with_session!(self.cr, self.options, Some(&mut *self.session.0))?; + resolve_selection_criteria_with_session!( + self.cr, + self.options, + Some(&mut *self.session.0) + )?; + + let op = crate::operation::find_raw::FindRaw::new( + self.cr.namespace(), + self.filter, + self.options, + ); + self.cr + .client() + .execute_session_raw_batch_cursor_operation(op, self.session.0) + .await + } +} + /// Finds a single document in a collection matching a filter. Construct with /// [`Collection::find_one`]. #[must_use] diff --git a/src/client/executor.rs b/src/client/executor.rs index 674eeca94..0469ee6b3 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -211,6 +211,61 @@ impl Client { .await } + pub(crate) async fn execute_raw_batch_cursor_operation( + &self, + mut op: impl BorrowMut, + ) -> Result + where + Op: Operation, + { + Box::pin(async { + let mut details = self + .execute_operation_with_details(op.borrow_mut(), None) + .await?; + // Mirror pinning logic without a CursorSpecification. + let pinned = if self.is_load_balanced() && details.output.info.id != 0 { + Some(details.connection.pin()?) + } else { + None + }; + Ok(crate::cursor::raw_batch::RawBatchCursor::new( + self.clone(), + details.output, + details.implicit_session, + pinned, + )) + }) + .await + } + + pub(crate) async fn execute_session_raw_batch_cursor_operation( + &self, + mut op: impl BorrowMut, + session: &mut ClientSession, + ) -> Result + where + Op: Operation, + { + let mut details = self + .execute_operation_with_details(op.borrow_mut(), &mut *session) + .await?; + + // Prefer the transaction's pinned connection if present; otherwise mirror load-balanced + // pinning. + let pinned = if let Some(handle) = session.transaction.pinned_connection() { + Some(handle.replicate()) + } else if self.is_load_balanced() && details.output.info.id != 0 { + Some(details.connection.pin()?) + } else { + None + }; + Ok(crate::cursor::raw_batch::SessionRawBatchCursor::new( + self.clone(), + details.output, + pinned, + )) + } + pub(crate) async fn execute_session_cursor_operation( &self, mut op: impl BorrowMut, diff --git a/src/cursor.rs b/src/cursor.rs index a1605ad54..2c06261eb 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,4 +1,5 @@ mod common; +pub mod raw_batch; pub(crate) mod session; #[cfg(test)] diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs new file mode 100644 index 000000000..d11f57ed8 --- /dev/null +++ b/src/cursor/raw_batch.rs @@ -0,0 +1,508 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use crate::bson::{RawArray, RawBsonRef}; +use futures_core::{future::BoxFuture, Future, Stream}; + +use crate::{ + bson::{RawDocument, RawDocumentBuf}, + change_stream::event::ResumeToken, + client::{options::ServerAddress, AsyncDropToken}, + cmap::conn::PinnedConnectionHandle, + cursor::common::{ + kill_cursor, ClientSessionHandle, ExplicitClientSessionHandle, ImplicitClientSessionHandle, + PinnedConnection, + }, + error::{Error, ErrorKind, Result}, + operation::get_more_raw::GetMoreRaw, + Client, ClientSession, +}; + +use super::common::CursorInformation; + +#[derive(Debug, Clone)] +pub(crate) struct RawBatchCursorSpecification { + pub(crate) info: CursorInformation, + pub(crate) initial_reply: RawDocumentBuf, + pub(crate) post_batch_resume_token: Option, +} + +#[derive(Debug)] +pub struct RawBatch { + reply: RawDocumentBuf, +} + +impl RawBatch { + pub(crate) fn new(reply: RawDocumentBuf) -> Self { + Self { reply } + } + + pub fn doc_slices<'a>(&'a self) -> Result<&'a RawArray> { + let root = self.reply.as_ref(); + let cursor = root + .get("cursor")? + .and_then(RawBsonRef::as_document) + .ok_or_else(|| Error::invalid_response("missing cursor subdocument"))?; + + let docs = cursor + .get("firstBatch")? + .or_else(|| cursor.get("nextBatch").ok().flatten()) + .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch"))?; + + docs.as_array() + .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch")) + } + + pub fn raw_reply(&self) -> &RawDocument { + self.reply.as_ref() + } + + pub fn into_raw_reply(self) -> RawDocumentBuf { + self.reply + } +} + +pub struct RawBatchCursor { + client: Client, + drop_token: AsyncDropToken, + info: CursorInformation, + state: RawBatchCursorState, + drop_address: Option, +} + +struct RawBatchCursorState { + exhausted: bool, + pinned_connection: PinnedConnection, + post_batch_resume_token: Option, + provider: GetMoreRawProvider<'static, ImplicitClientSessionHandle>, + initial_reply: Option, + pending_reply: Option, +} + +impl RawBatchCursor { + pub(crate) fn new( + client: Client, + spec: RawBatchCursorSpecification, + session: Option, + pin: Option, + ) -> Self { + let exhausted = spec.info.id == 0; + Self { + client: client.clone(), + drop_token: client.register_async_drop(), + info: spec.info, + drop_address: None, + state: RawBatchCursorState { + exhausted, + pinned_connection: PinnedConnection::new(pin), + post_batch_resume_token: spec.post_batch_resume_token, + provider: if exhausted { + GetMoreRawProvider::Done + } else { + GetMoreRawProvider::Idle(Box::new(ImplicitClientSessionHandle(session))) + }, + initial_reply: Some(spec.initial_reply), + pending_reply: None, + }, + } + } + + pub fn address(&self) -> &ServerAddress { + &self.info.address + } + + pub fn set_drop_address(&mut self, address: ServerAddress) { + self.drop_address = Some(address); + } + + pub fn is_exhausted(&self) -> bool { + self.state.exhausted + } + + fn mark_exhausted(&mut self) { + self.state.exhausted = true; + self.state.pinned_connection = PinnedConnection::Unpinned; + } +} + +impl Stream for RawBatchCursor { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Yield initial batch first, if present. + if let Some(initial) = self.state.initial_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(initial)))); + } + + // If a getMore is in flight, poll it. + let mut ready = None; + { + let provider = &mut self.state.provider; + if let Some(f) = provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(g) => ready = Some(g), + } + } + } + if let Some(get_more_out) = ready { + match get_more_out.result { + Ok(out) => { + self.state.pending_reply = Some(out.raw_reply); + self.state.post_batch_resume_token = out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + } + + if let Some(reply) = self.state.pending_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } + + if !self.state.exhausted { + let info = self.info.clone(); + let client = self.client.clone(); + let state = &mut self.state; + state + .provider + .start_execution(info, client, state.pinned_connection.handle()); + // Immediately poll the newly-started getMore once to register the waker. + if let Some(f) = state.provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + state.pending_reply = Some(out.raw_reply); + state.post_batch_resume_token = out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + if let Some(reply) = self.state.pending_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } else if self.state.exhausted { + return Poll::Ready(None); + } else { + return Poll::Pending; + } + } + } + } else { + return Poll::Pending; + } + } + + Poll::Ready(None) + } +} + +impl Drop for RawBatchCursor { + fn drop(&mut self) { + if self.is_exhausted() { + return; + } + kill_cursor( + self.client.clone(), + &mut self.drop_token, + &self.info.ns, + self.info.id, + self.state.pinned_connection.replicate(), + self.drop_address.take(), + #[cfg(test)] + None, + ); + } +} + +#[derive(Debug)] +pub struct SessionRawBatchCursor { + client: Client, + drop_token: AsyncDropToken, + info: CursorInformation, + exhausted: bool, + pinned_connection: PinnedConnection, + post_batch_resume_token: Option, + initial_reply: Option, + drop_address: Option, +} + +impl SessionRawBatchCursor { + pub(crate) fn new( + client: Client, + spec: RawBatchCursorSpecification, + pinned: Option, + ) -> Self { + let exhausted = spec.info.id == 0; + Self { + drop_token: client.register_async_drop(), + client, + info: spec.info, + exhausted, + pinned_connection: PinnedConnection::new(pinned), + post_batch_resume_token: spec.post_batch_resume_token, + initial_reply: Some(spec.initial_reply), + drop_address: None, + } + } + + pub fn stream<'session>( + &mut self, + session: &'session mut ClientSession, + ) -> SessionRawBatchCursorStream<'_, 'session> { + SessionRawBatchCursorStream { + parent: self, + provider: GetMoreRawProvider::Idle(Box::new(ExplicitClientSessionHandle(session))), + } + } + + pub fn address(&self) -> &ServerAddress { + &self.info.address + } + + pub fn set_drop_address(&mut self, address: ServerAddress) { + self.drop_address = Some(address); + } + + pub fn is_exhausted(&self) -> bool { + self.exhausted + } +} + +impl Drop for SessionRawBatchCursor { + fn drop(&mut self) { + if self.is_exhausted() { + return; + } + kill_cursor( + self.client.clone(), + &mut self.drop_token, + &self.info.ns, + self.info.id, + self.pinned_connection.replicate(), + self.drop_address.take(), + #[cfg(test)] + None, + ); + } +} + +pub struct SessionRawBatchCursorStream<'cursor, 'session> { + parent: &'cursor mut SessionRawBatchCursor, + provider: GetMoreRawProvider<'session, ExplicitClientSessionHandle<'session>>, +} + +impl Stream for SessionRawBatchCursorStream<'_, '_> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // yield initial reply first + if let Some(initial) = self.parent.initial_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(initial)))); + } + + if self.parent.exhausted { + return Poll::Ready(None); + } + + // If a getMore is in flight, poll it. + let mut ready = None; + { + let provider = &mut self.provider; + if let Some(f) = provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(g) => ready = Some(g), + } + } + } + if let Some(get_more_out) = ready { + match get_more_out.result { + Ok(out) => { + if out.exhausted { + self.parent.exhausted = true; + } + if out.id != 0 { + self.parent.info.id = out.id; + } + self.parent.info.ns = out.ns; + self.parent.post_batch_resume_token = out.post_batch_resume_token; + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Ok(RawBatch::new(out.raw_reply)))); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.parent.exhausted = true; + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + } + + // Start a getMore if needed. + let info = self.parent.info.clone(); + let client = self.parent.client.clone(); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.provider.start_execution(info, client, pinned_ref); + Poll::Pending + } +} +pub struct GetMoreRawResultAndSession { + pub result: Result, + pub session: S, +} + +enum GetMoreRawProvider<'s, S> { + Executing(BoxFuture<'s, GetMoreRawResultAndSession>), + Idle(Box), + Done, +} + +impl<'s, S: ClientSessionHandle<'s>> GetMoreRawProvider<'s, S> { + fn executing_future(&mut self) -> Option<&mut BoxFuture<'s, GetMoreRawResultAndSession>> { + if let Self::Executing(future) = self { + Some(future) + } else { + None + } + } + + fn clear_execution(&mut self, session: S, exhausted: bool) { + if exhausted && session.is_implicit() { + *self = Self::Done + } else { + *self = Self::Idle(Box::new(session)) + } + } + + fn start_execution( + &mut self, + info: CursorInformation, + client: Client, + pinned_connection: Option<&PinnedConnectionHandle>, + ) { + take_mut::take(self, |this| { + if let Self::Idle(mut session) = this { + let pinned = pinned_connection.map(|c| c.replicate()); + let fut = Box::pin(async move { + let get_more = GetMoreRaw::new(info, pinned.as_ref()); + let res = client + .execute_operation(get_more, session.borrow_mut()) + .await; + GetMoreRawResultAndSession { + result: res, + session: *session, + } + }); + Self::Executing(fut) + } else { + this + } + }) + } +} + +pub struct RawDocumentStream { + inner: R, +} + +impl RawBatchCursor { + pub fn into_raw_documents(self) -> RawDocumentStream { + RawDocumentStream { inner: self } + } +} + +impl Stream for RawDocumentStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bson::{doc, Document}; + + #[test] + fn raw_batch_into_docs_works() { + let reply_doc: Document = doc! { + "ok": 1, + "cursor": { + "id": 0_i64, + "ns": "db.coll", + "firstBatch": [ + { "x": 1 }, + { "x": 2 } + ] + } + }; + let mut bytes = Vec::new(); + reply_doc.to_writer(&mut bytes).unwrap(); + let raw = RawDocumentBuf::from_bytes(bytes).unwrap(); + + let batch = RawBatch::new(raw); + let docs: Vec<_> = batch.doc_slices().unwrap().into_iter().collect(); + assert_eq!(docs.len(), 2); + } +} diff --git a/src/lib.rs b/src/lib.rs index 6e6c628aa..a10839c4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ pub use crate::{ client::{session::ClientSession, Client}, coll::Collection, cursor::{ + raw_batch::RawBatch, session::{SessionCursor, SessionCursorStream}, Cursor, }, diff --git a/src/operation.rs b/src/operation.rs index b55fa6b69..06e7412db 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -13,7 +13,9 @@ pub(crate) mod drop_database; mod drop_indexes; mod find; pub(crate) mod find_and_modify; +pub(crate) mod find_raw; mod get_more; +pub(crate) mod get_more_raw; mod insert; pub(crate) mod list_collections; pub(crate) mod list_databases; diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs new file mode 100644 index 000000000..c5d705ad4 --- /dev/null +++ b/src/operation/find_raw.rs @@ -0,0 +1,164 @@ +use crate::bson::RawDocumentBuf; + +use crate::{ + bson::{rawdoc, Document}, + bson_compat::{cstr, CStr}, + cmap::{Command, RawCommandResponse, StreamDescription}, + cursor::{raw_batch::RawBatchCursorSpecification, CursorInformation}, + error::{Error, Result}, + operation::{ + append_options_to_raw_document, + CursorBody, + OperationWithDefaults, + SERVER_4_4_0_WIRE_VERSION, + }, + options::{CursorType, FindOptions, SelectionCriteria}, + Namespace, +}; + +use super::ExecutionContext; + +#[derive(Debug)] +pub(crate) struct FindRaw { + ns: Namespace, + filter: Document, + options: Option>, +} + +impl FindRaw { + pub(crate) fn new(ns: Namespace, filter: Document, options: Option) -> Self { + Self { + ns, + filter, + options: options.map(Box::new), + } + } +} + +impl OperationWithDefaults for FindRaw { + type O = RawBatchCursorSpecification; + const NAME: &'static CStr = cstr!("find"); + + fn build(&mut self, _description: &StreamDescription) -> Result { + let mut body = rawdoc! { + Self::NAME: self.ns.coll.clone(), + }; + + if let Some(ref mut options) = self.options { + if options.limit.map(|limit| limit < 0) == Some(true) { + body.append(cstr!("singleBatch"), true); + } + + if let Some(ref mut batch_size) = options.batch_size { + if i32::try_from(*batch_size).is_err() { + return Err(Error::invalid_argument( + "the batch size must fit into a signed 32-bit integer", + )); + } + if let Some(limit) = options.limit.and_then(|limit| u32::try_from(limit).ok()) { + if *batch_size == limit { + *batch_size += 1; + } + } + } + + match options.cursor_type { + Some(CursorType::Tailable) => { + body.append(cstr!("tailable"), true); + } + Some(CursorType::TailableAwait) => { + body.append(cstr!("tailable"), true); + body.append(cstr!("awaitData"), true); + } + _ => {} + }; + } + + append_options_to_raw_document(&mut body, self.options.as_ref())?; + + let raw_filter: RawDocumentBuf = (&self.filter).try_into()?; + body.append(cstr!("filter"), raw_filter); + + Ok(Command::new_read( + Self::NAME, + &self.ns.db, + self.options.as_ref().and_then(|o| o.read_concern.clone()), + body, + )) + } + + fn extract_at_cluster_time( + &self, + response: &crate::bson::RawDocument, + ) -> Result> { + CursorBody::extract_at_cluster_time(response) + } + + fn handle_response<'a>( + &'a self, + response: &'a RawCommandResponse, + context: ExecutionContext<'a>, + ) -> Result { + // Build initial spec using minimal parsing and copy of raw reply. + let raw = RawDocumentBuf::from_bytes(response.as_bytes().to_vec())?; + + // Parse minimal fields via raw to avoid per-doc copies. + let raw_root = response.raw_body(); + let cursor_doc = raw_root + .get("cursor")? + .and_then(crate::bson::RawBsonRef::as_document) + .ok_or_else(|| Error::invalid_response("missing cursor in response"))?; + + let id = cursor_doc + .get("id")? + .and_then(crate::bson::RawBsonRef::as_i64) + .ok_or_else(|| Error::invalid_response("missing cursor id"))?; + + let ns_str = cursor_doc + .get("ns")? + .and_then(crate::bson::RawBsonRef::as_str) + .ok_or_else(|| Error::invalid_response("missing cursor ns"))?; + let ns = Namespace::from_str(ns_str) + .ok_or_else(|| Error::invalid_response("invalid cursor ns"))?; + + let post_token_raw = cursor_doc + .get("postBatchResumeToken")? + .and_then(crate::bson::RawBsonRef::as_document) + .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) + .transpose()?; + let post_batch_resume_token = + crate::change_stream::event::ResumeToken::from_raw(post_token_raw); + + let description = context.connection.stream_description()?; + let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION { + None + } else { + self.options.as_ref().and_then(|opts| opts.comment.clone()) + }; + + let info = CursorInformation { + ns, + id, + address: description.server_address.clone(), + batch_size: self.options.as_ref().and_then(|opts| opts.batch_size), + max_time: self.options.as_ref().and_then(|opts| opts.max_await_time), + comment, + }; + + Ok(RawBatchCursorSpecification { + info, + initial_reply: raw, + post_batch_resume_token, + }) + } + + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + true + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.options + .as_ref() + .and_then(|opts| opts.selection_criteria.as_ref()) + } +} diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs new file mode 100644 index 000000000..2ffda838b --- /dev/null +++ b/src/operation/get_more_raw.rs @@ -0,0 +1,134 @@ +use crate::bson::RawDocumentBuf; +use std::time::Duration; + +use crate::{ + bson::{rawdoc, RawBson}, + bson_compat::{cstr, CStr}, + cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, + cursor::CursorInformation, + error::Result, + operation::OperationWithDefaults, + options::SelectionCriteria, + Namespace, +}; + +use super::ExecutionContext; + +#[derive(Debug)] +pub(crate) struct GetMoreRaw<'conn> { + ns: Namespace, + cursor_id: i64, + selection_criteria: SelectionCriteria, + batch_size: Option, + max_time: Option, + pinned_connection: Option<&'conn PinnedConnectionHandle>, + comment: Option, +} + +impl<'conn> GetMoreRaw<'conn> { + pub(crate) fn new( + info: CursorInformation, + pinned: Option<&'conn PinnedConnectionHandle>, + ) -> Self { + Self { + ns: info.ns, + cursor_id: info.id, + selection_criteria: SelectionCriteria::from_address(info.address), + batch_size: info.batch_size, + max_time: info.max_time, + pinned_connection: pinned, + comment: info.comment, + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct GetMoreRawResult { + pub(crate) raw_reply: RawDocumentBuf, + pub(crate) exhausted: bool, + pub(crate) post_batch_resume_token: Option, + pub(crate) ns: Namespace, + pub(crate) id: i64, +} + +impl OperationWithDefaults for GetMoreRaw<'_> { + type O = GetMoreRawResult; + + const NAME: &'static CStr = cstr!("getMore"); + + fn build(&mut self, _description: &StreamDescription) -> Result { + let mut body = rawdoc! { + Self::NAME: self.cursor_id, + "collection": self.ns.coll.clone(), + }; + + if let Some(batch_size) = self.batch_size { + let batch_size = crate::checked::Checked::from(batch_size).try_into::()?; + if batch_size != 0 { + body.append(cstr!("batchSize"), batch_size); + } + } + + if let Some(ref max_time) = self.max_time { + body.append( + cstr!("maxTimeMS"), + max_time.as_millis().try_into().unwrap_or(i32::MAX), + ); + } + + if let Some(comment) = &self.comment { + let raw_comment: RawBson = comment.clone().try_into()?; + body.append(cstr!("comment"), raw_comment); + } + + Ok(Command::new(Self::NAME, &self.ns.db, body)) + } + + fn handle_response<'a>( + &'a self, + response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + // Copy raw reply for the batch. + let raw = RawDocumentBuf::from_bytes(response.as_bytes().to_vec())?; + + // Parse minimal cursor fields from the raw reply. + #[derive(serde::Deserialize)] + struct HelperNs { + cursor: HelperCursor, + } + #[derive(serde::Deserialize)] + struct HelperCursor { + id: i64, + ns: String, + #[serde(rename = "postBatchResumeToken")] + post_batch_resume_token: Option, + } + let helper: HelperNs = crate::bson_compat::deserialize_from_slice(response.as_bytes())?; + + let exhausted = helper.cursor.id == 0; + let ns = Namespace::from_str(helper.cursor.ns.as_str()).unwrap(); + let token = crate::change_stream::event::ResumeToken::from_raw( + helper.cursor.post_batch_resume_token, + ); + + Ok(GetMoreRawResult { + raw_reply: raw, + exhausted, + post_batch_resume_token: token, + ns, + id: helper.cursor.id, + }) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + Some(&self.selection_criteria) + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.pinned_connection + } + + #[cfg(feature = "opentelemetry")] + type Otel = crate::otel::Witness; +} diff --git a/src/test/coll.rs b/src/test/coll.rs index 3e8e41df1..8e08aaeae 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -143,6 +143,35 @@ async fn find() { } } +#[tokio::test] +#[function_name::named] +async fn find_raw_batches_one() { + let client = Client::for_test().await; + let coll = client + .init_db_and_coll(function_name!(), function_name!()) + .await; + + coll.insert_one(doc! { "x": 1 }).await.unwrap(); + + let mut batches = coll + .find_raw_batches(doc! { "x": 1 }) + .limit(-1) + .await + .unwrap(); + + // get the first (and only) server batch due to limit -1 + let mut found_one = false; + while let Some(batch_res) = batches.next().await { + let batch = batch_res.unwrap(); + let mut iter = batch.doc_slices().unwrap().into_iter(); + let first = iter.next().unwrap().unwrap(); + let doc = Document::try_from(first.as_document().unwrap()).unwrap(); + assert_eq!(doc.get_i32("x").unwrap(), 1); + found_one = true; + } + assert!(found_one); +} + #[tokio::test] #[function_name::named] async fn update() { From e334445a88fa69adb1af598c0cc1103d016a7297 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Tue, 11 Nov 2025 13:46:29 -0800 Subject: [PATCH 02/17] WIP 2 --- benchmarks/Cargo.lock | 251 ++++++++++++----------- benchmarks/src/bench.rs | 1 + benchmarks/src/bench/find_raw_batches.rs | 67 ++++++ benchmarks/src/main.rs | 21 +- src/cursor/raw_batch.rs | 48 +++++ src/operation/get_more_raw.rs | 54 ++--- 6 files changed, 300 insertions(+), 142 deletions(-) create mode 100644 benchmarks/src/bench/find_raw_batches.rs diff --git a/benchmarks/Cargo.lock b/benchmarks/Cargo.lock index 4888a65a3..b299b7e0b 100644 --- a/benchmarks/Cargo.lock +++ b/benchmarks/Cargo.lock @@ -39,21 +39,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -112,12 +97,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.22.1" @@ -163,12 +142,12 @@ version = "2.15.0" source = "git+https://github.com/mongodb/bson-rust?branch=2.15.x#f6f163095b5159ce175424b0e02f9bd7acfaddf2" dependencies = [ "ahash", - "base64 0.22.1", + "base64", "bitvec", "getrandom 0.2.15", "getrandom 0.3.3", "hex", - "indexmap 2.7.1", + "indexmap", "js-sys", "once_cell", "rand 0.9.2", @@ -218,11 +197,7 @@ version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ - "android-tzdata", - "iana-time-zone", "num-traits", - "serde", - "windows-targets 0.52.6", ] [[package]] @@ -279,12 +254,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" -[[package]] -name = "core-foundation-sys" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" - [[package]] name = "cpufeatures" version = "0.2.17" @@ -358,7 +327,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", - "serde", ] [[package]] @@ -595,12 +563,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.15.2" @@ -645,7 +607,7 @@ dependencies = [ "ipnet", "once_cell", "rand 0.8.5", - "thiserror", + "thiserror 1.0.69", "tinyvec", "tokio", "tracing", @@ -668,7 +630,7 @@ dependencies = [ "rand 0.8.5", "resolv-conf", "smallvec", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -693,29 +655,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "iana-time-zone" -version = "0.1.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows-core", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - [[package]] name = "icu_collections" version = "1.5.0" @@ -861,17 +800,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", - "serde", -] - [[package]] name = "indexmap" version = "2.7.1" @@ -879,8 +807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown 0.15.2", - "serde", + "hashbrown", ] [[package]] @@ -901,7 +828,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.8", "widestring", "windows-sys 0.48.0", "winreg", @@ -937,9 +864,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "linked-hash-map" @@ -1088,8 +1015,8 @@ source = "git+https://github.com/mongodb/libmongocrypt-rust.git?branch=main#1327 name = "mongodb" version = "3.3.0" dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", + "base64", + "bitflags 2.8.0", "bson", "chrono", "derive-where", @@ -1107,24 +1034,26 @@ dependencies = [ "mongodb-internal-macros", "pbkdf2", "percent-encoding", - "rand 0.8.5", + "rand 0.9.2", "rustc_version_runtime", "rustls", "rustversion", "serde", + "serde_bytes", "serde_with", "sha1", "sha2", - "socket2", + "socket2 0.6.1", "stringprep", "strsim 0.11.1", "take_mut", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-rustls", + "tokio-util", "typed-builder", "uuid", - "webpki-roots 0.26.11", + "webpki-roots", ] [[package]] @@ -1565,7 +1494,7 @@ version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "930cfb6e6abf99298aaad7d29abbef7a9999a9a8806a40088f55f0dcec03146b" dependencies = [ - "indexmap 2.7.1", + "indexmap", "itoa", "memchr", "ryu", @@ -1578,16 +1507,9 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ - "base64 0.22.1", - "chrono", - "hex", - "indexmap 1.9.3", - "indexmap 2.7.1", "serde", "serde_derive", - "serde_json", "serde_with_macros", - "time", ] [[package]] @@ -1664,6 +1586,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spin" version = "0.9.8" @@ -1765,7 +1697,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl 2.0.17", ] [[package]] @@ -1779,6 +1720,17 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "time" version = "0.3.37" @@ -1856,7 +1808,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.8", "tokio-macros", "windows-sys 0.52.0", ] @@ -1901,6 +1853,7 @@ checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -1918,7 +1871,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.7.1", + "indexmap", "toml_datetime", "winnow", ] @@ -1956,18 +1909,18 @@ dependencies = [ [[package]] name = "typed-builder" -version = "0.20.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7" +checksum = "398a3a3c918c96de527dc11e6e846cd549d4508030b8a33e1da12789c856b81a" dependencies = [ "typed-builder-macro", ] [[package]] name = "typed-builder-macro" -version = "0.20.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28" +checksum = "0e48cea23f68d1f78eb7bc092881b6bb88d3d6b5b7e6234f6f9c911da1ffb221" dependencies = [ "proc-macro2", "quote", @@ -2153,15 +2106,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "webpki-roots" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" -dependencies = [ - "webpki-roots 1.0.2", -] - [[package]] name = "webpki-roots" version = "1.0.2" @@ -2200,13 +2144,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows-core" -version = "0.52.0" +name = "windows-link" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets 0.52.6", -] +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" [[package]] name = "windows-sys" @@ -2235,6 +2176,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -2259,13 +2209,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -2278,6 +2245,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -2290,6 +2263,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -2302,12 +2281,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -2320,6 +2311,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -2332,6 +2329,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -2344,6 +2347,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -2356,6 +2365,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.5.40" diff --git a/benchmarks/src/bench.rs b/benchmarks/src/bench.rs index b448254eb..297b149cf 100644 --- a/benchmarks/src/bench.rs +++ b/benchmarks/src/bench.rs @@ -12,6 +12,7 @@ pub mod insert_one; pub mod json_multi_export; pub mod json_multi_import; pub mod run_command; +pub mod find_raw_batches; use std::{ convert::TryInto, diff --git a/benchmarks/src/bench/find_raw_batches.rs b/benchmarks/src/bench/find_raw_batches.rs new file mode 100644 index 000000000..e4c12df46 --- /dev/null +++ b/benchmarks/src/bench/find_raw_batches.rs @@ -0,0 +1,67 @@ +use anyhow::Result; +use futures::stream::StreamExt; +use mongodb::{ + bson::{doc, Document}, + Client, + Collection, + Database, +}; + +use crate::bench::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME}; + +pub struct FindRawBatchesBenchmark { + db: Database, + coll: Collection, + uri: String, + num_iter: usize, +} + +/// Specifies the options to `FindRawBatchesBenchmark::setup` operation. +pub struct Options { + pub num_iter: usize, + pub doc: Document, + pub uri: String, +} + +#[async_trait::async_trait] +impl Benchmark for FindRawBatchesBenchmark { + type Options = Options; + type TaskState = (); + + async fn setup(options: Self::Options) -> Result { + let client = Client::with_uri_str(&options.uri).await?; + let db = client.database(&DATABASE_NAME); + drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?; + + let coll = db.collection(&COLL_NAME); + let docs = vec![options.doc.clone(); options.num_iter]; + coll.insert_many(docs).await?; + + Ok(FindRawBatchesBenchmark { + db, + coll, + uri: options.uri, + num_iter: options.num_iter, + }) + } + + async fn do_task(&self, _state: Self::TaskState) -> Result<()> { + // Drain the cursor using raw server batches. + let mut batches = self.coll.find_raw_batches(doc! {}).await?; + while let Some(batch_res) = batches.next().await { + let batch = batch_res?; + // Touch each element minimally to avoid full materialization. + for elem in batch.doc_slices()?.into_iter() { + let _ = elem?; + } + } + Ok(()) + } + + async fn teardown(&self) -> Result<()> { + drop_database(self.uri.as_str(), self.db.name()).await?; + Ok(()) + } +} + + diff --git a/benchmarks/src/main.rs b/benchmarks/src/main.rs index a0089d7e3..47d5dfc29 100644 --- a/benchmarks/src/main.rs +++ b/benchmarks/src/main.rs @@ -24,6 +24,7 @@ use crate::{ bson_decode::BsonDecodeBenchmark, bson_encode::BsonEncodeBenchmark, bulk_write::{InsertBulkWriteBenchmark, MixedBulkWriteBenchmark}, + find_raw_batches::FindRawBatchesBenchmark, find_many::FindManyBenchmark, find_one::FindOneBenchmark, gridfs_download::GridFsDownloadBenchmark, @@ -62,6 +63,7 @@ const FIND_ONE_BENCH: &str = "Find one"; const FIND_MANY_BENCH: &str = "Find many and empty cursor"; const FIND_MANY_BENCH_RAW: &str = "Find many and empty cursor (raw BSON)"; const FIND_MANY_BENCH_SERDE: &str = "Find many and empty cursor (serde structs)"; +const FIND_MANY_BENCH_RAW_BATCHES: &str = "Find many and empty cursor (raw batches)"; const GRIDFS_DOWNLOAD_BENCH: &str = "GridFS download"; const LDJSON_MULTI_EXPORT_BENCH: &str = "LDJSON multi-file export"; const GRIDFS_MULTI_DOWNLOAD_BENCH: &str = "GridFS multi-file download"; @@ -104,6 +106,7 @@ enum BenchmarkId { SmallDocInsertBulkWrite, // 23 LargeDocInsertBulkWrite, // 24 MixedBulkWrite, // 25 + FindManyRawBatches, // 26 } impl BenchmarkId { @@ -127,6 +130,7 @@ impl BenchmarkId { BenchmarkId::BsonFullDocumentEncode => FULL_BSON_ENCODING, BenchmarkId::FindManyRawBson => FIND_MANY_BENCH_RAW, BenchmarkId::FindManySerde => FIND_MANY_BENCH_SERDE, + BenchmarkId::FindManyRawBatches => FIND_MANY_BENCH_RAW_BATCHES, BenchmarkId::GridFsDownload => GRIDFS_DOWNLOAD_BENCH, BenchmarkId::GridFsUpload => GRIDFS_UPLOAD_BENCH, BenchmarkId::GridFsMultiDownload => GRIDFS_MULTI_DOWNLOAD_BENCH, @@ -159,6 +163,7 @@ const SINGLE_BENCHES: &[&str] = &[ /// Benchmarks included in the "MultiBench" composite. const MULTI_BENCHES: &[&str] = &[ FIND_MANY_BENCH_RAW, + FIND_MANY_BENCH_RAW_BATCHES, SMALL_DOC_INSERT_MANY_BENCH, LARGE_DOC_INSERT_MANY_BENCH, GRIDFS_UPLOAD_BENCH, @@ -180,6 +185,7 @@ const PARALLEL_BENCHES: &[&str] = &[ const READ_BENCHES: &[&str] = &[ FIND_ONE_BENCH, FIND_MANY_BENCH_RAW, + FIND_MANY_BENCH_RAW_BATCHES, GRIDFS_DOWNLOAD_BENCH, LDJSON_MULTI_EXPORT_BENCH, GRIDFS_MULTI_DOWNLOAD_BENCH, @@ -199,7 +205,7 @@ const WRITE_BENCHES: &[&str] = &[ MIXED_BULK_WRITE_BENCH, ]; -const MAX_ID: u8 = BenchmarkId::MixedBulkWrite as u8; +const MAX_ID: u8 = BenchmarkId::FindManyRawBatches as u8; async fn run_benchmarks( uri: &str, @@ -465,6 +471,17 @@ async fn run_benchmarks( comp_score += score_test(find_many, id.name(), 16.22, more_info); } + // Find many using raw batches and empty the cursor + BenchmarkId::FindManyRawBatches => { + let options = bench::find_raw_batches::Options { + num_iter: 10000, + doc: get_tweet().await, + uri: uri.to_string(), + }; + let result = + bench::run_benchmark::(options).await?; + comp_score += score_test(result, FIND_MANY_BENCH_RAW_BATCHES, 16.22, more_info); + } // GridFS download BenchmarkId::GridFsDownload => { @@ -589,6 +606,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet { } if matches.is_present("multi") { ids.insert(BenchmarkId::FindManyRawBson); + ids.insert(BenchmarkId::FindManyRawBatches); ids.insert(BenchmarkId::SmallDocInsertMany); ids.insert(BenchmarkId::LargeDocInsertMany); ids.insert(BenchmarkId::GridFsDownload); @@ -621,6 +639,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet { ids.insert(BenchmarkId::FindMany); ids.insert(BenchmarkId::FindManyRawBson); ids.insert(BenchmarkId::FindManySerde); + ids.insert(BenchmarkId::FindManyRawBatches); ids.insert(BenchmarkId::SmallDocInsertMany); ids.insert(BenchmarkId::LargeDocInsertMany); ids.insert(BenchmarkId::LdJsonMultiFileImport); diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index d11f57ed8..eb5e6745e 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -133,6 +133,18 @@ impl Stream for RawBatchCursor { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Yield initial batch first, if present. if let Some(initial) = self.state.initial_reply.take() { + // Prefetch the next getMore in the background, if applicable. + if !self.state.exhausted { + let info = self.info.clone(); + let client = self.client.clone(); + let pinned_owned = self + .state + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.state.provider.start_execution(info, client, pinned_ref); + } return Poll::Ready(Some(Ok(RawBatch::new(initial)))); } @@ -179,6 +191,18 @@ impl Stream for RawBatchCursor { } if let Some(reply) = self.state.pending_reply.take() { + // Prefetch the next getMore before returning this batch, if applicable. + if !self.state.exhausted { + let info = self.info.clone(); + let client = self.client.clone(); + let pinned_owned = self + .state + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.state.provider.start_execution(info, client, pinned_ref); + } return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } @@ -341,6 +365,18 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // yield initial reply first if let Some(initial) = self.parent.initial_reply.take() { + // Prefetch the next getMore in the background, if applicable. + if !self.parent.exhausted { + let info = self.parent.info.clone(); + let client = self.parent.client.clone(); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.provider.start_execution(info, client, pinned_ref); + } return Poll::Ready(Some(Ok(RawBatch::new(initial)))); } @@ -373,6 +409,18 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { let exhausted_now = self.parent.exhausted; self.provider .clear_execution(get_more_out.session, exhausted_now); + // Prefetch the next getMore before returning this batch, if applicable. + if !self.parent.exhausted { + let info = self.parent.info.clone(); + let client = self.parent.client.clone(); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.provider.start_execution(info, client, pinned_ref); + } return Poll::Ready(Some(Ok(RawBatch::new(out.raw_reply)))); } Err(e) => { diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs index 2ffda838b..c5f602176 100644 --- a/src/operation/get_more_raw.rs +++ b/src/operation/get_more_raw.rs @@ -89,35 +89,43 @@ impl OperationWithDefaults for GetMoreRaw<'_> { response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { - // Copy raw reply for the batch. + // Own the raw reply (single copy). let raw = RawDocumentBuf::from_bytes(response.as_bytes().to_vec())?; - // Parse minimal cursor fields from the raw reply. - #[derive(serde::Deserialize)] - struct HelperNs { - cursor: HelperCursor, - } - #[derive(serde::Deserialize)] - struct HelperCursor { - id: i64, - ns: String, - #[serde(rename = "postBatchResumeToken")] - post_batch_resume_token: Option, - } - let helper: HelperNs = crate::bson_compat::deserialize_from_slice(response.as_bytes())?; - - let exhausted = helper.cursor.id == 0; - let ns = Namespace::from_str(helper.cursor.ns.as_str()).unwrap(); - let token = crate::change_stream::event::ResumeToken::from_raw( - helper.cursor.post_batch_resume_token, - ); + // Extract minimal cursor fields directly from the raw reply to avoid + // walking the batch array via serde. + let root = response.raw_body(); + let cursor = root + .get("cursor")? + .and_then(crate::bson::RawBsonRef::as_document) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor subdocument"))?; + + let id = cursor + .get("id")? + .and_then(crate::bson::RawBsonRef::as_i64) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor id"))?; + + let ns_str = cursor + .get("ns")? + .and_then(crate::bson::RawBsonRef::as_str) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor ns"))?; + let ns = Namespace::from_str(ns_str) + .ok_or_else(|| crate::error::Error::invalid_response("invalid cursor ns"))?; + + let token_raw = cursor + .get("postBatchResumeToken")? + .and_then(crate::bson::RawBsonRef::as_document) + .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) + .transpose()?; + let post_batch_resume_token = + crate::change_stream::event::ResumeToken::from_raw(token_raw); Ok(GetMoreRawResult { raw_reply: raw, - exhausted, - post_batch_resume_token: token, + exhausted: id == 0, + post_batch_resume_token, ns, - id: helper.cursor.id, + id, }) } From 966769f0289a726ab72099dd64966b6746d3ac6c Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Tue, 11 Nov 2025 17:56:41 -0800 Subject: [PATCH 03/17] Benchmark --- benchmarks/src/bench/find_raw_batches.rs | 15 +-- src/cursor/raw_batch.rs | 149 +++++++++++++++++++++-- src/operation/find_raw.rs | 4 +- src/operation/get_more_raw.rs | 3 +- 4 files changed, 141 insertions(+), 30 deletions(-) diff --git a/benchmarks/src/bench/find_raw_batches.rs b/benchmarks/src/bench/find_raw_batches.rs index e4c12df46..bc254026d 100644 --- a/benchmarks/src/bench/find_raw_batches.rs +++ b/benchmarks/src/bench/find_raw_batches.rs @@ -2,9 +2,7 @@ use anyhow::Result; use futures::stream::StreamExt; use mongodb::{ bson::{doc, Document}, - Client, - Collection, - Database, + Client, Collection, Database, }; use crate::bench::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME}; @@ -13,7 +11,6 @@ pub struct FindRawBatchesBenchmark { db: Database, coll: Collection, uri: String, - num_iter: usize, } /// Specifies the options to `FindRawBatchesBenchmark::setup` operation. @@ -41,19 +38,13 @@ impl Benchmark for FindRawBatchesBenchmark { db, coll, uri: options.uri, - num_iter: options.num_iter, }) } async fn do_task(&self, _state: Self::TaskState) -> Result<()> { - // Drain the cursor using raw server batches. let mut batches = self.coll.find_raw_batches(doc! {}).await?; while let Some(batch_res) = batches.next().await { - let batch = batch_res?; - // Touch each element minimally to avoid full materialization. - for elem in batch.doc_slices()?.into_iter() { - let _ = elem?; - } + batch_res?; } Ok(()) } @@ -63,5 +54,3 @@ impl Benchmark for FindRawBatchesBenchmark { Ok(()) } } - - diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index eb5e6745e..a9776a51c 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -137,13 +137,43 @@ impl Stream for RawBatchCursor { if !self.state.exhausted { let info = self.info.clone(); let client = self.client.clone(); - let pinned_owned = self - .state - .pinned_connection - .handle() - .map(|c| c.replicate()); + let pinned_owned = self.state.pinned_connection.handle().map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); - self.state.provider.start_execution(info, client, pinned_ref); + self.state + .provider + .start_execution(info, client, pinned_ref); + // Immediately poll once to register waker; if already ready, buffer the result. + if let Some(f) = self.state.provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + self.state.pending_reply = Some(out.raw_reply); + self.state.post_batch_resume_token = + out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } } return Poll::Ready(Some(Ok(RawBatch::new(initial)))); } @@ -195,13 +225,43 @@ impl Stream for RawBatchCursor { if !self.state.exhausted { let info = self.info.clone(); let client = self.client.clone(); - let pinned_owned = self - .state - .pinned_connection - .handle() - .map(|c| c.replicate()); + let pinned_owned = self.state.pinned_connection.handle().map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); - self.state.provider.start_execution(info, client, pinned_ref); + self.state + .provider + .start_execution(info, client, pinned_ref); + // Immediately poll once to register waker; if already ready, buffer the result. + if let Some(f) = self.state.provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + self.state.pending_reply = Some(out.raw_reply); + self.state.post_batch_resume_token = + out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } } return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } @@ -376,6 +436,39 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { .map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); + // Immediately poll once to register waker; if already ready, buffer the result + // into initial_reply for the next poll. + if let Some(f) = self.provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + if out.exhausted { + self.parent.exhausted = true; + } + if out.id != 0 { + self.parent.info.id = out.id; + } + self.parent.info.ns = out.ns; + self.parent.post_batch_resume_token = + out.post_batch_resume_token; + // Buffer next reply to yield on the following poll. + self.parent.initial_reply = Some(out.raw_reply); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.parent.exhausted = true; + } + } + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } } return Poll::Ready(Some(Ok(RawBatch::new(initial)))); } @@ -420,6 +513,38 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { .map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); + // Immediately poll once to register waker; if already ready, buffer the + // result into initial_reply for the next poll. + if let Some(f) = self.provider.executing_future() { + match Pin::new(f).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out2) => { + match get_more_out2.result { + Ok(out2) => { + if out2.exhausted { + self.parent.exhausted = true; + } + if out2.id != 0 { + self.parent.info.id = out2.id; + } + self.parent.info.ns = out2.ns; + self.parent.post_batch_resume_token = + out2.post_batch_resume_token; + self.parent.initial_reply = Some(out2.raw_reply); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.parent.exhausted = true; + } + } + } + let exhausted_now2 = self.parent.exhausted; + self.provider + .clear_execution(get_more_out2.session, exhausted_now2); + } + } + } } return Poll::Ready(Some(Ok(RawBatch::new(out.raw_reply)))); } diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs index c5d705ad4..19e13612a 100644 --- a/src/operation/find_raw.rs +++ b/src/operation/find_raw.rs @@ -7,9 +7,7 @@ use crate::{ cursor::{raw_batch::RawBatchCursorSpecification, CursorInformation}, error::{Error, Result}, operation::{ - append_options_to_raw_document, - CursorBody, - OperationWithDefaults, + append_options_to_raw_document, CursorBody, OperationWithDefaults, SERVER_4_4_0_WIRE_VERSION, }, options::{CursorType, FindOptions, SelectionCriteria}, diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs index c5f602176..353360902 100644 --- a/src/operation/get_more_raw.rs +++ b/src/operation/get_more_raw.rs @@ -117,8 +117,7 @@ impl OperationWithDefaults for GetMoreRaw<'_> { .and_then(crate::bson::RawBsonRef::as_document) .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) .transpose()?; - let post_batch_resume_token = - crate::change_stream::event::ResumeToken::from_raw(token_raw); + let post_batch_resume_token = crate::change_stream::event::ResumeToken::from_raw(token_raw); Ok(GetMoreRawResult { raw_reply: raw, From 95f3dbe3ef5c98d35cb2207b9eb212eba040e02a Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Tue, 11 Nov 2025 18:56:48 -0800 Subject: [PATCH 04/17] Cleanup --- src/cursor/raw_batch.rs | 376 +++++++++------------------------------- 1 file changed, 80 insertions(+), 296 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index a9776a51c..c4905988a 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -131,157 +131,16 @@ impl Stream for RawBatchCursor { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Yield initial batch first, if present. - if let Some(initial) = self.state.initial_reply.take() { - // Prefetch the next getMore in the background, if applicable. - if !self.state.exhausted { - let info = self.info.clone(); - let client = self.client.clone(); - let pinned_owned = self.state.pinned_connection.handle().map(|c| c.replicate()); - let pinned_ref = pinned_owned.as_ref(); - self.state - .provider - .start_execution(info, client, pinned_ref); - // Immediately poll once to register waker; if already ready, buffer the result. - if let Some(f) = self.state.provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out) => { - match get_more_out.result { - Ok(out) => { - self.state.pending_reply = Some(out.raw_reply); - self.state.post_batch_resume_token = - out.post_batch_resume_token; - if out.exhausted { - self.mark_exhausted(); - } - if out.id != 0 { - self.info.id = out.id; - } - self.info.ns = out.ns; - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.mark_exhausted(); - } - } - } - let exhausted_now = self.state.exhausted; - self.state - .provider - .clear_execution(get_more_out.session, exhausted_now); - } - } - } - } - return Poll::Ready(Some(Ok(RawBatch::new(initial)))); - } - - // If a getMore is in flight, poll it. - let mut ready = None; - { - let provider = &mut self.state.provider; - if let Some(f) = provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(g) => ready = Some(g), - } - } - } - if let Some(get_more_out) = ready { - match get_more_out.result { - Ok(out) => { - self.state.pending_reply = Some(out.raw_reply); - self.state.post_batch_resume_token = out.post_batch_resume_token; - if out.exhausted { - self.mark_exhausted(); - } - if out.id != 0 { - self.info.id = out.id; - } - self.info.ns = out.ns; - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.mark_exhausted(); - } - let exhausted_now = self.state.exhausted; - self.state - .provider - .clear_execution(get_more_out.session, exhausted_now); - return Poll::Ready(Some(Err(e))); - } - } - let exhausted_now = self.state.exhausted; - self.state - .provider - .clear_execution(get_more_out.session, exhausted_now); - } - - if let Some(reply) = self.state.pending_reply.take() { - // Prefetch the next getMore before returning this batch, if applicable. - if !self.state.exhausted { - let info = self.info.clone(); - let client = self.client.clone(); - let pinned_owned = self.state.pinned_connection.handle().map(|c| c.replicate()); - let pinned_ref = pinned_owned.as_ref(); - self.state - .provider - .start_execution(info, client, pinned_ref); - // Immediately poll once to register waker; if already ready, buffer the result. - if let Some(f) = self.state.provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out) => { - match get_more_out.result { - Ok(out) => { - self.state.pending_reply = Some(out.raw_reply); - self.state.post_batch_resume_token = - out.post_batch_resume_token; - if out.exhausted { - self.mark_exhausted(); - } - if out.id != 0 { - self.info.id = out.id; - } - self.info.ns = out.ns; - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.mark_exhausted(); - } - } - } - let exhausted_now = self.state.exhausted; - self.state - .provider - .clear_execution(get_more_out.session, exhausted_now); - } - } - } - } - return Poll::Ready(Some(Ok(RawBatch::new(reply)))); - } - - if !self.state.exhausted { - let info = self.info.clone(); - let client = self.client.clone(); - let state = &mut self.state; - state - .provider - .start_execution(info, client, state.pinned_connection.handle()); - // Immediately poll the newly-started getMore once to register the waker. - if let Some(f) = state.provider.executing_future() { - match Pin::new(f).poll(cx) { + loop { + // If a getMore is in flight, poll it and update state. + if let Some(future) = self.state.provider.executing_future() { + match Pin::new(future).poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(get_more_out) => { match get_more_out.result { Ok(out) => { - state.pending_reply = Some(out.raw_reply); - state.post_batch_resume_token = out.post_batch_resume_token; + self.state.pending_reply = Some(out.raw_reply); + self.state.post_batch_resume_token = out.post_batch_resume_token; if out.exhausted { self.mark_exhausted(); } @@ -291,8 +150,7 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -306,21 +164,36 @@ impl Stream for RawBatchCursor { self.state .provider .clear_execution(get_more_out.session, exhausted_now); - if let Some(reply) = self.state.pending_reply.take() { - return Poll::Ready(Some(Ok(RawBatch::new(reply)))); - } else if self.state.exhausted { - return Poll::Ready(None); - } else { - return Poll::Pending; - } } } - } else { - return Poll::Pending; } - } - Poll::Ready(None) + // Yield any buffered reply (initial or pending). + if let Some(reply) = self + .state + .initial_reply + .take() + .or_else(|| self.state.pending_reply.take()) + { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } + + // If not exhausted and the connection is valid, start a getMore and iterate. + if !self.state.exhausted + && !matches!(self.state.pinned_connection, PinnedConnection::Invalid(_)) + { + let info = self.info.clone(); + let client = self.client.clone(); + let state = &mut self.state; + state + .provider + .start_execution(info, client, state.pinned_connection.handle()); + continue; + } + + // Otherwise, we're done. + return Poll::Ready(None); + } } } @@ -423,10 +296,51 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // yield initial reply first - if let Some(initial) = self.parent.initial_reply.take() { - // Prefetch the next getMore in the background, if applicable. - if !self.parent.exhausted { + loop { + // If a getMore is in flight, poll it and update state. + if let Some(future) = self.provider.executing_future() { + match Pin::new(future).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + if out.exhausted { + self.parent.exhausted = true; + } + if out.id != 0 { + self.parent.info.id = out.id; + } + self.parent.info.ns = out.ns; + self.parent.post_batch_resume_token = out.post_batch_resume_token; + // Buffer next reply to yield on following polls. + self.parent.initial_reply = Some(out.raw_reply); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + self.parent.exhausted = true; + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } + + // Yield any buffered reply (initial). + if let Some(reply) = self.parent.initial_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } + + // If not exhausted and the connection is valid, start a getMore and iterate. + if !self.parent.exhausted + && !matches!(self.parent.pinned_connection, PinnedConnection::Invalid(_)) + { let info = self.parent.info.clone(); let client = self.parent.client.clone(); let pinned_owned = self @@ -436,142 +350,12 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { .map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); - // Immediately poll once to register waker; if already ready, buffer the result - // into initial_reply for the next poll. - if let Some(f) = self.provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out) => { - match get_more_out.result { - Ok(out) => { - if out.exhausted { - self.parent.exhausted = true; - } - if out.id != 0 { - self.parent.info.id = out.id; - } - self.parent.info.ns = out.ns; - self.parent.post_batch_resume_token = - out.post_batch_resume_token; - // Buffer next reply to yield on the following poll. - self.parent.initial_reply = Some(out.raw_reply); - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.parent.exhausted = true; - } - } - } - let exhausted_now = self.parent.exhausted; - self.provider - .clear_execution(get_more_out.session, exhausted_now); - } - } - } + continue; } - return Poll::Ready(Some(Ok(RawBatch::new(initial)))); - } - if self.parent.exhausted { + // Otherwise, we're done. return Poll::Ready(None); } - - // If a getMore is in flight, poll it. - let mut ready = None; - { - let provider = &mut self.provider; - if let Some(f) = provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(g) => ready = Some(g), - } - } - } - if let Some(get_more_out) = ready { - match get_more_out.result { - Ok(out) => { - if out.exhausted { - self.parent.exhausted = true; - } - if out.id != 0 { - self.parent.info.id = out.id; - } - self.parent.info.ns = out.ns; - self.parent.post_batch_resume_token = out.post_batch_resume_token; - let exhausted_now = self.parent.exhausted; - self.provider - .clear_execution(get_more_out.session, exhausted_now); - // Prefetch the next getMore before returning this batch, if applicable. - if !self.parent.exhausted { - let info = self.parent.info.clone(); - let client = self.parent.client.clone(); - let pinned_owned = self - .parent - .pinned_connection - .handle() - .map(|c| c.replicate()); - let pinned_ref = pinned_owned.as_ref(); - self.provider.start_execution(info, client, pinned_ref); - // Immediately poll once to register waker; if already ready, buffer the - // result into initial_reply for the next poll. - if let Some(f) = self.provider.executing_future() { - match Pin::new(f).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out2) => { - match get_more_out2.result { - Ok(out2) => { - if out2.exhausted { - self.parent.exhausted = true; - } - if out2.id != 0 { - self.parent.info.id = out2.id; - } - self.parent.info.ns = out2.ns; - self.parent.post_batch_resume_token = - out2.post_batch_resume_token; - self.parent.initial_reply = Some(out2.raw_reply); - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.parent.exhausted = true; - } - } - } - let exhausted_now2 = self.parent.exhausted; - self.provider - .clear_execution(get_more_out2.session, exhausted_now2); - } - } - } - } - return Poll::Ready(Some(Ok(RawBatch::new(out.raw_reply)))); - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.parent.exhausted = true; - } - let exhausted_now = self.parent.exhausted; - self.provider - .clear_execution(get_more_out.session, exhausted_now); - return Poll::Ready(Some(Err(e))); - } - } - } - - // Start a getMore if needed. - let info = self.parent.info.clone(); - let client = self.parent.client.clone(); - let pinned_owned = self - .parent - .pinned_connection - .handle() - .map(|c| c.replicate()); - let pinned_ref = pinned_owned.as_ref(); - self.provider.start_execution(info, client, pinned_ref); - Poll::Pending } } pub struct GetMoreRawResultAndSession { From 8b5434206be2e2e9dfbd7db56aafaff0db030f88 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 10:29:29 -0800 Subject: [PATCH 05/17] Some cleanups --- src/cursor/raw_batch.rs | 29 ++++------------------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index c4905988a..11a51b05f 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -150,7 +150,8 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -316,7 +317,8 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { self.parent.initial_reply = Some(out.raw_reply); } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.parent.exhausted = true; } let exhausted_now = self.parent.exhausted; @@ -413,29 +415,6 @@ impl<'s, S: ClientSessionHandle<'s>> GetMoreRawProvider<'s, S> { } } -pub struct RawDocumentStream { - inner: R, -} - -impl RawBatchCursor { - pub fn into_raw_documents(self) -> RawDocumentStream { - RawDocumentStream { inner: self } - } -} - -impl Stream for RawDocumentStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - } - } -} - #[cfg(test)] mod tests { use super::*; From 245d3497a650fc7156a4d8c21e575e1a6f7b3af8 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 11:14:27 -0800 Subject: [PATCH 06/17] Add owned path --- src/client/executor.rs | 51 +++++++++------ src/cursor/raw_batch.rs | 116 ++++++++++++++++++++++++++++++++-- src/operation.rs | 59 +++++++++++++++++ src/operation/find_raw.rs | 62 ++++++++++++++++++ src/operation/get_more_raw.rs | 77 ++++++++++++---------- 5 files changed, 310 insertions(+), 55 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 0469ee6b3..7e32d69b7 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -95,7 +95,7 @@ impl Client { /// Server selection will performed using the criteria specified on the operation, if any, and /// an implicit session will be created if the operation and write concern are compatible with /// sessions and an explicit session is not provided. - pub(crate) async fn execute_operation( + pub(crate) async fn execute_operation( &self, mut op: impl BorrowMut, session: impl Into>, @@ -105,7 +105,7 @@ impl Client { .map(|details| details.output) } - async fn execute_operation_with_details( + async fn execute_operation_with_details( &self, op: &mut T, session: impl Into>, @@ -123,7 +123,7 @@ impl Client { result } - async fn execute_operation_with_details_inner( + async fn execute_operation_with_details_inner( &self, op: &mut T, mut session: Option<&mut ClientSession>, @@ -193,7 +193,7 @@ impl Client { mut op: impl BorrowMut, ) -> Result> where - Op: Operation, + Op: Operation + Sync, { Box::pin(async { let mut details = self @@ -216,7 +216,7 @@ impl Client { mut op: impl BorrowMut, ) -> Result where - Op: Operation, + Op: Operation + Sync, { Box::pin(async { let mut details = self @@ -244,7 +244,7 @@ impl Client { session: &mut ClientSession, ) -> Result where - Op: Operation, + Op: Operation + Sync, { let mut details = self .execute_operation_with_details(op.borrow_mut(), &mut *session) @@ -272,7 +272,7 @@ impl Client { session: &mut ClientSession, ) -> Result> where - Op: Operation, + Op: Operation + Sync, { let mut details = self .execute_operation_with_details(op.borrow_mut(), &mut *session) @@ -384,7 +384,7 @@ impl Client { /// Selects a server and executes the given operation on it, optionally using a provided /// session. Retries the operation upon failure if retryability is supported or after /// reauthenticating if reauthentication is required. - async fn execute_operation_with_retry( + async fn execute_operation_with_retry( &self, op: &mut T, mut session: Option<&mut ClientSession>, @@ -556,7 +556,7 @@ impl Client { } /// Executes an operation on a given connection, optionally using a provided session. - pub(crate) async fn execute_operation_on_connection( + pub(crate) async fn execute_operation_on_connection( &self, op: &mut T, connection: &mut PooledConnection, @@ -712,15 +712,30 @@ impl Client { effective_criteria: effective_criteria.clone(), }; - match op.handle_response(&response, context).await { - Ok(response) => Ok(response), - Err(mut err) => { - err.add_labels_and_update_pin( - Some(connection.stream_description()?), - session, - Some(retryability), - ); - Err(err.with_server_response(&response)) + if op.wants_owned_response() { + match op.handle_response_owned(response, context).await { + Ok(output) => Ok(output), + Err(mut err) => { + err.add_labels_and_update_pin( + Some(connection.stream_description()?), + session, + Some(retryability), + ); + // Cannot attach server response; it was moved. + Err(err) + } + } + } else { + match op.handle_response(&response, context).await { + Ok(output) => Ok(output), + Err(mut err) => { + err.add_labels_and_update_pin( + Some(connection.stream_description()?), + session, + Some(retryability), + ); + Err(err.with_server_response(&response)) + } } } } diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index 11a51b05f..b266a3d6d 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -150,8 +150,7 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -176,6 +175,49 @@ impl Stream for RawBatchCursor { .take() .or_else(|| self.state.pending_reply.take()) { + // Prefetch the next getMore before returning this batch, if applicable. + if !self.state.exhausted + && !matches!(self.state.pinned_connection, PinnedConnection::Invalid(_)) + { + let info = self.info.clone(); + let client = self.client.clone(); + let state = &mut self.state; + state + .provider + .start_execution(info, client, state.pinned_connection.handle()); + // Immediately poll once to register the waker and opportunistically buffer. + if let Some(fut) = state.provider.executing_future() { + match Pin::new(fut).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + state.pending_reply = Some(out.raw_reply); + state.post_batch_resume_token = out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + // Intentionally do not surface the error here; clear and continue. + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } + } return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } @@ -317,8 +359,7 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { self.parent.initial_reply = Some(out.raw_reply); } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { self.parent.exhausted = true; } let exhausted_now = self.parent.exhausted; @@ -336,6 +377,50 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { // Yield any buffered reply (initial). if let Some(reply) = self.parent.initial_reply.take() { + // Prefetch the next getMore before returning this batch, if applicable. + if !self.parent.exhausted + && !matches!(self.parent.pinned_connection, PinnedConnection::Invalid(_)) + { + let info = self.parent.info.clone(); + let client = self.parent.client.clone(); + // Avoid borrow conflicts by replicating the handle into a temporary owner. + let pinned_owned = self.parent.pinned_connection.handle().map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.provider.start_execution(info, client, pinned_ref); + // Immediately poll once to register the waker and opportunistically buffer. + if let Some(fut) = self.provider.executing_future() { + match Pin::new(fut).poll(cx) { + Poll::Pending => {} + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + if out.exhausted { + self.parent.exhausted = true; + } + if out.id != 0 { + self.parent.info.id = out.id; + } + self.parent.info.ns = out.ns; + self.parent.post_batch_resume_token = + out.post_batch_resume_token; + // buffer for the next poll + self.parent.initial_reply = Some(out.raw_reply); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.parent.exhausted = true; + } + // Intentionally do not surface the error here; clear and continue. + } + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } + } return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } @@ -415,6 +500,29 @@ impl<'s, S: ClientSessionHandle<'s>> GetMoreRawProvider<'s, S> { } } +pub struct RawDocumentStream { + inner: R, +} + +impl RawBatchCursor { + pub fn into_raw_documents(self) -> RawDocumentStream { + RawDocumentStream { inner: self } + } +} + +impl Stream for RawDocumentStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/operation.rs b/src/operation.rs index 06e7412db..48795f278 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -145,6 +145,26 @@ pub(crate) trait Operation { context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result>; + /// Whether this operation prefers to take ownership of the server response body for + /// zero-copy handling. + /// Defaults to false; operations can override via OperationWithDefaults. + fn wants_owned_response(&self) -> bool { + false + } + + /// Interprets the server response taking ownership of the body to enable zero-copy handling. + /// By default, delegates to the borrowed handle_response path. + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> + where + Self: Sync, + { + async move { self.handle_response(&response, context).await }.boxed() + } + /// Interpret an error encountered while sending the built command to the server, potentially /// recovering. fn handle_error(&self, error: Error) -> Result; @@ -232,6 +252,35 @@ pub(crate) trait OperationWithDefaults: Send + Sync { async move { self.handle_response(response, context) }.boxed() } + /// Whether this operation prefers to take ownership of the server response body for + /// zero-copy handling. + fn wants_owned_response(&self) -> bool { + false + } + + /// Interprets the server response taking ownership of the body to enable zero-copy handling. + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> Result { + // By default, delegate to borrowed path by re-borrowing. + // Note: default impls that want zero-copy should override this. + self.handle_response(&response, context) + } + + /// Async wrapper for owned-response handling. + fn handle_response_owned_async<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> + where + Self: Sync, + { + async move { self.handle_response_owned(response, context) }.boxed() + } + /// Interpret an error encountered while sending the built command to the server, potentially /// recovering. fn handle_error(&self, error: Error) -> Result { @@ -311,6 +360,16 @@ where ) -> BoxFuture<'a, Result> { self.handle_response_async(response, context) } + fn wants_owned_response(&self) -> bool { + self.wants_owned_response() + } + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> { + self.handle_response_owned_async(response, context) + } fn handle_error(&self, error: Error) -> Result { self.handle_error(error) } diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs index 19e13612a..ae19c2361 100644 --- a/src/operation/find_raw.rs +++ b/src/operation/find_raw.rs @@ -37,6 +37,68 @@ impl OperationWithDefaults for FindRaw { type O = RawBatchCursorSpecification; const NAME: &'static CStr = cstr!("find"); + fn wants_owned_response(&self) -> bool { + true + } + + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> Result { + // Parse minimal fields via raw to avoid per-doc copies. + let raw_root = response.raw_body(); + let cursor_doc = raw_root + .get("cursor")? + .and_then(crate::bson::RawBsonRef::as_document) + .ok_or_else(|| Error::invalid_response("missing cursor in response"))?; + + let id = cursor_doc + .get("id")? + .and_then(crate::bson::RawBsonRef::as_i64) + .ok_or_else(|| Error::invalid_response("missing cursor id"))?; + + let ns_str = cursor_doc + .get("ns")? + .and_then(crate::bson::RawBsonRef::as_str) + .ok_or_else(|| Error::invalid_response("missing cursor ns"))?; + let ns = Namespace::from_str(ns_str) + .ok_or_else(|| Error::invalid_response("invalid cursor ns"))?; + + let post_token_raw = cursor_doc + .get("postBatchResumeToken")? + .and_then(crate::bson::RawBsonRef::as_document) + .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) + .transpose()?; + let post_batch_resume_token = + crate::change_stream::event::ResumeToken::from_raw(post_token_raw); + + let description = context.connection.stream_description()?; + let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION { + None + } else { + self.options.as_ref().and_then(|opts| opts.comment.clone()) + }; + + let info = CursorInformation { + ns, + id, + address: description.server_address.clone(), + batch_size: self.options.as_ref().and_then(|opts| opts.batch_size), + max_time: self.options.as_ref().and_then(|opts| opts.max_await_time), + comment, + }; + + // Take ownership of the raw reply with zero copies. + let raw = response.into_raw_document_buf(); + + Ok(RawBatchCursorSpecification { + info, + initial_reply: raw, + post_batch_resume_token, + }) + } + fn build(&mut self, _description: &StreamDescription) -> Result { let mut body = rawdoc! { Self::NAME: self.ns.coll.clone(), diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs index 353360902..420be136d 100644 --- a/src/operation/get_more_raw.rs +++ b/src/operation/get_more_raw.rs @@ -56,44 +56,16 @@ impl OperationWithDefaults for GetMoreRaw<'_> { const NAME: &'static CStr = cstr!("getMore"); - fn build(&mut self, _description: &StreamDescription) -> Result { - let mut body = rawdoc! { - Self::NAME: self.cursor_id, - "collection": self.ns.coll.clone(), - }; - - if let Some(batch_size) = self.batch_size { - let batch_size = crate::checked::Checked::from(batch_size).try_into::()?; - if batch_size != 0 { - body.append(cstr!("batchSize"), batch_size); - } - } - - if let Some(ref max_time) = self.max_time { - body.append( - cstr!("maxTimeMS"), - max_time.as_millis().try_into().unwrap_or(i32::MAX), - ); - } - - if let Some(comment) = &self.comment { - let raw_comment: RawBson = comment.clone().try_into()?; - body.append(cstr!("comment"), raw_comment); - } - - Ok(Command::new(Self::NAME, &self.ns.db, body)) + fn wants_owned_response(&self) -> bool { + true } - fn handle_response<'a>( + fn handle_response_owned<'a>( &'a self, - response: &'a RawCommandResponse, + response: RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { - // Own the raw reply (single copy). - let raw = RawDocumentBuf::from_bytes(response.as_bytes().to_vec())?; - - // Extract minimal cursor fields directly from the raw reply to avoid - // walking the batch array via serde. + // Extract minimal fields directly from the raw reply to avoid walking the batch via serde. let root = response.raw_body(); let cursor = root .get("cursor")? @@ -119,6 +91,9 @@ impl OperationWithDefaults for GetMoreRaw<'_> { .transpose()?; let post_batch_resume_token = crate::change_stream::event::ResumeToken::from_raw(token_raw); + // Take ownership of the raw bytes without copying. + let raw = response.into_raw_document_buf(); + Ok(GetMoreRawResult { raw_reply: raw, exhausted: id == 0, @@ -128,6 +103,42 @@ impl OperationWithDefaults for GetMoreRaw<'_> { }) } + fn build(&mut self, _description: &StreamDescription) -> Result { + let mut body = rawdoc! { + Self::NAME: self.cursor_id, + "collection": self.ns.coll.clone(), + }; + + if let Some(batch_size) = self.batch_size { + let batch_size = crate::checked::Checked::from(batch_size).try_into::()?; + if batch_size != 0 { + body.append(cstr!("batchSize"), batch_size); + } + } + + if let Some(ref max_time) = self.max_time { + body.append( + cstr!("maxTimeMS"), + max_time.as_millis().try_into().unwrap_or(i32::MAX), + ); + } + + if let Some(comment) = &self.comment { + let raw_comment: RawBson = comment.clone().try_into()?; + body.append(cstr!("comment"), raw_comment); + } + + Ok(Command::new(Self::NAME, &self.ns.db, body)) + } + + fn handle_response<'a>( + &'a self, + _response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + unimplemented!("should call handle_response_owned") + } + fn selection_criteria(&self) -> Option<&SelectionCriteria> { Some(&self.selection_criteria) } From 0eabf59f17beab7a514b1e6c7befb33e9f1bbc0d Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 13:52:08 -0800 Subject: [PATCH 07/17] Cleanups --- src/client/executor.rs | 44 ++++++++--------------- src/operation.rs | 36 ++++++------------- src/operation/find_raw.rs | 66 +++++------------------------------ src/operation/get_more_raw.rs | 16 ++++----- 4 files changed, 41 insertions(+), 121 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 7e32d69b7..ade3b5bd4 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -21,10 +21,7 @@ use crate::otel::OtelFutureStub as _; use crate::{ bson::Document, change_stream::{ - event::ChangeStreamEvent, - session::SessionChangeStream, - ChangeStream, - ChangeStreamData, + event::ChangeStreamEvent, session::SessionChangeStream, ChangeStream, ChangeStreamData, WatchArgs, }, cmap::{ @@ -33,33 +30,20 @@ use crate::{ wire::{next_request_id, Message}, PinnedConnectionHandle, }, - ConnectionPool, - RawCommandResponse, - StreamDescription, + ConnectionPool, RawCommandResponse, StreamDescription, }, cursor::{session::SessionCursor, Cursor, CursorSpecification}, error::{ - Error, - ErrorKind, - Result, - RETRYABLE_WRITE_ERROR, - TRANSIENT_TRANSACTION_ERROR, + Error, ErrorKind, Result, RETRYABLE_WRITE_ERROR, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT, }, event::command::{ - CommandEvent, - CommandFailedEvent, - CommandStartedEvent, - CommandSucceededEvent, + CommandEvent, CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent, }, hello::LEGACY_HELLO_COMMAND_NAME_LOWERCASE, operation::{ aggregate::{change_stream::ChangeStreamAggregate, AggregateTarget}, - AbortTransaction, - CommandErrorBody, - CommitTransaction, - ExecutionContext, - Operation, + AbortTransaction, CommandErrorBody, CommitTransaction, ExecutionContext, Operation, Retryability, }, options::{ChangeStreamOptions, SelectionCriteria}, @@ -95,7 +79,7 @@ impl Client { /// Server selection will performed using the criteria specified on the operation, if any, and /// an implicit session will be created if the operation and write concern are compatible with /// sessions and an explicit session is not provided. - pub(crate) async fn execute_operation( + pub(crate) async fn execute_operation( &self, mut op: impl BorrowMut, session: impl Into>, @@ -105,7 +89,7 @@ impl Client { .map(|details| details.output) } - async fn execute_operation_with_details( + async fn execute_operation_with_details( &self, op: &mut T, session: impl Into>, @@ -123,7 +107,7 @@ impl Client { result } - async fn execute_operation_with_details_inner( + async fn execute_operation_with_details_inner( &self, op: &mut T, mut session: Option<&mut ClientSession>, @@ -193,7 +177,7 @@ impl Client { mut op: impl BorrowMut, ) -> Result> where - Op: Operation + Sync, + Op: Operation, { Box::pin(async { let mut details = self @@ -216,7 +200,7 @@ impl Client { mut op: impl BorrowMut, ) -> Result where - Op: Operation + Sync, + Op: Operation, { Box::pin(async { let mut details = self @@ -244,7 +228,7 @@ impl Client { session: &mut ClientSession, ) -> Result where - Op: Operation + Sync, + Op: Operation, { let mut details = self .execute_operation_with_details(op.borrow_mut(), &mut *session) @@ -272,7 +256,7 @@ impl Client { session: &mut ClientSession, ) -> Result> where - Op: Operation + Sync, + Op: Operation, { let mut details = self .execute_operation_with_details(op.borrow_mut(), &mut *session) @@ -384,7 +368,7 @@ impl Client { /// Selects a server and executes the given operation on it, optionally using a provided /// session. Retries the operation upon failure if retryability is supported or after /// reauthenticating if reauthentication is required. - async fn execute_operation_with_retry( + async fn execute_operation_with_retry( &self, op: &mut T, mut session: Option<&mut ClientSession>, @@ -556,7 +540,7 @@ impl Client { } /// Executes an operation on a given connection, optionally using a provided session. - pub(crate) async fn execute_operation_on_connection( + pub(crate) async fn execute_operation_on_connection( &self, op: &mut T, connection: &mut PooledConnection, diff --git a/src/operation.rs b/src/operation.rs index 48795f278..87957e34a 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -40,25 +40,15 @@ use crate::{ client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, cmap::{ conn::{pooled::PooledConnection, PinnedConnectionHandle}, - Command, - RawCommandResponse, - StreamDescription, + Command, RawCommandResponse, StreamDescription, }, error::{ - CommandError, - Error, - ErrorKind, - IndexedWriteError, - InsertManyError, - Result, - WriteConcernError, - WriteFailure, + CommandError, Error, ErrorKind, IndexedWriteError, InsertManyError, Result, + WriteConcernError, WriteFailure, }, options::{ClientOptions, WriteConcern}, selection_criteria::SelectionCriteria, - BoxFuture, - ClientSession, - Namespace, + BoxFuture, ClientSession, Namespace, }; pub(crate) use abort_transaction::AbortTransaction; @@ -153,16 +143,13 @@ pub(crate) trait Operation { } /// Interprets the server response taking ownership of the body to enable zero-copy handling. - /// By default, delegates to the borrowed handle_response path. + /// Is only ever called if wants_owned_response returns True fn handle_response_owned<'a>( &'a self, - response: RawCommandResponse, - context: ExecutionContext<'a>, - ) -> BoxFuture<'a, Result> - where - Self: Sync, - { - async move { self.handle_response(&response, context).await }.boxed() + _response: RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> { + unimplemented!() } /// Interpret an error encountered while sending the built command to the server, potentially @@ -274,10 +261,7 @@ pub(crate) trait OperationWithDefaults: Send + Sync { &'a self, response: RawCommandResponse, context: ExecutionContext<'a>, - ) -> BoxFuture<'a, Result> - where - Self: Sync, - { + ) -> BoxFuture<'a, Result> { async move { self.handle_response_owned(response, context) }.boxed() } diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs index ae19c2361..e0300533c 100644 --- a/src/operation/find_raw.rs +++ b/src/operation/find_raw.rs @@ -37,6 +37,14 @@ impl OperationWithDefaults for FindRaw { type O = RawBatchCursorSpecification; const NAME: &'static CStr = cstr!("find"); + fn handle_response<'a>( + &'a self, + _response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + unimplemented!("FindRaw must be handled via owned response path") + } + fn wants_owned_response(&self) -> bool { true } @@ -154,63 +162,7 @@ impl OperationWithDefaults for FindRaw { CursorBody::extract_at_cluster_time(response) } - fn handle_response<'a>( - &'a self, - response: &'a RawCommandResponse, - context: ExecutionContext<'a>, - ) -> Result { - // Build initial spec using minimal parsing and copy of raw reply. - let raw = RawDocumentBuf::from_bytes(response.as_bytes().to_vec())?; - - // Parse minimal fields via raw to avoid per-doc copies. - let raw_root = response.raw_body(); - let cursor_doc = raw_root - .get("cursor")? - .and_then(crate::bson::RawBsonRef::as_document) - .ok_or_else(|| Error::invalid_response("missing cursor in response"))?; - - let id = cursor_doc - .get("id")? - .and_then(crate::bson::RawBsonRef::as_i64) - .ok_or_else(|| Error::invalid_response("missing cursor id"))?; - - let ns_str = cursor_doc - .get("ns")? - .and_then(crate::bson::RawBsonRef::as_str) - .ok_or_else(|| Error::invalid_response("missing cursor ns"))?; - let ns = Namespace::from_str(ns_str) - .ok_or_else(|| Error::invalid_response("invalid cursor ns"))?; - - let post_token_raw = cursor_doc - .get("postBatchResumeToken")? - .and_then(crate::bson::RawBsonRef::as_document) - .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) - .transpose()?; - let post_batch_resume_token = - crate::change_stream::event::ResumeToken::from_raw(post_token_raw); - - let description = context.connection.stream_description()?; - let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION { - None - } else { - self.options.as_ref().and_then(|opts| opts.comment.clone()) - }; - - let info = CursorInformation { - ns, - id, - address: description.server_address.clone(), - batch_size: self.options.as_ref().and_then(|opts| opts.batch_size), - max_time: self.options.as_ref().and_then(|opts| opts.max_await_time), - comment, - }; - - Ok(RawBatchCursorSpecification { - info, - initial_reply: raw, - post_batch_resume_token, - }) - } + // borrowed handle_response intentionally unimplemented above fn supports_read_concern(&self, _description: &StreamDescription) -> bool { true diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs index 420be136d..d24f1f7ec 100644 --- a/src/operation/get_more_raw.rs +++ b/src/operation/get_more_raw.rs @@ -56,6 +56,14 @@ impl OperationWithDefaults for GetMoreRaw<'_> { const NAME: &'static CStr = cstr!("getMore"); + fn handle_response<'a>( + &'a self, + _response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + unimplemented!("GetMoreRaw must be handled via owned response path") + } + fn wants_owned_response(&self) -> bool { true } @@ -131,14 +139,6 @@ impl OperationWithDefaults for GetMoreRaw<'_> { Ok(Command::new(Self::NAME, &self.ns.db, body)) } - fn handle_response<'a>( - &'a self, - _response: &'a RawCommandResponse, - _context: ExecutionContext<'a>, - ) -> Result { - unimplemented!("should call handle_response_owned") - } - fn selection_criteria(&self) -> Option<&SelectionCriteria> { Some(&self.selection_criteria) } From f687b319bb9a721ac61967f94518d2ddd86f5d4e Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 13:53:09 -0800 Subject: [PATCH 08/17] Remove unused functions --- src/cursor/raw_batch.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index b266a3d6d..a32b976ed 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -54,14 +54,6 @@ impl RawBatch { docs.as_array() .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch")) } - - pub fn raw_reply(&self) -> &RawDocument { - self.reply.as_ref() - } - - pub fn into_raw_reply(self) -> RawDocumentBuf { - self.reply - } } pub struct RawBatchCursor { @@ -150,7 +142,8 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -359,7 +352,8 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { self.parent.initial_reply = Some(out.raw_reply); } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.parent.exhausted = true; } let exhausted_now = self.parent.exhausted; @@ -384,7 +378,11 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { let info = self.parent.info.clone(); let client = self.parent.client.clone(); // Avoid borrow conflicts by replicating the handle into a temporary owner. - let pinned_owned = self.parent.pinned_connection.handle().map(|c| c.replicate()); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); // Immediately poll once to register the waker and opportunistically buffer. From 2404ce91c633206d4491e5d31b4942879c49c083 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:06:27 -0800 Subject: [PATCH 09/17] Cleaned up more --- src/client/executor.rs | 3 +++ src/cursor/raw_batch.rs | 21 ++++++++++---------- src/operation.rs | 43 ++++++++++++++++++++++++++++++++--------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index ade3b5bd4..0dc068cb7 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -696,6 +696,9 @@ impl Client { effective_criteria: effective_criteria.clone(), }; + // Owned path: we move the server response into the handler for zero-copy parsing. + // Note: since the response is moved, we must not attempt to attach it to errors + // (e.g. via with_server_response) after this branch. if op.wants_owned_response() { match op.handle_response_owned(response, context).await { Ok(output) => Ok(output), diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index a32b976ed..ed75dfdb0 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -7,7 +7,7 @@ use crate::bson::{RawArray, RawBsonRef}; use futures_core::{future::BoxFuture, Future, Stream}; use crate::{ - bson::{RawDocument, RawDocumentBuf}, + bson::{RawDocumentBuf}, change_stream::event::ResumeToken, client::{options::ServerAddress, AsyncDropToken}, cmap::conn::PinnedConnectionHandle, @@ -29,6 +29,9 @@ pub(crate) struct RawBatchCursorSpecification { pub(crate) post_batch_resume_token: Option, } +/// A raw batch response returned by the server for a cursor getMore/find. +/// +/// This provides zero-copy access to the server's batch array via [`doc_slices`]. #[derive(Debug)] pub struct RawBatch { reply: RawDocumentBuf, @@ -39,6 +42,10 @@ impl RawBatch { Self { reply } } + /// Returns a borrowed view of the batch array (`firstBatch` or `nextBatch`) without copying. + /// + /// This lets callers iterate over [`crate::bson::RawDocument`] items directly for maximal + /// performance. pub fn doc_slices<'a>(&'a self) -> Result<&'a RawArray> { let root = self.reply.as_ref(); let cursor = root @@ -142,8 +149,7 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -352,8 +358,7 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { self.parent.initial_reply = Some(out.raw_reply); } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { self.parent.exhausted = true; } let exhausted_now = self.parent.exhausted; @@ -378,11 +383,7 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { let info = self.parent.info.clone(); let client = self.parent.client.clone(); // Avoid borrow conflicts by replicating the handle into a temporary owner. - let pinned_owned = self - .parent - .pinned_connection - .handle() - .map(|c| c.replicate()); + let pinned_owned = self.parent.pinned_connection.handle().map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); // Immediately poll once to register the waker and opportunistically buffer. diff --git a/src/operation.rs b/src/operation.rs index 87957e34a..3a3e5e6f9 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -40,15 +40,25 @@ use crate::{ client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, cmap::{ conn::{pooled::PooledConnection, PinnedConnectionHandle}, - Command, RawCommandResponse, StreamDescription, + Command, + RawCommandResponse, + StreamDescription, }, error::{ - CommandError, Error, ErrorKind, IndexedWriteError, InsertManyError, Result, - WriteConcernError, WriteFailure, + CommandError, + Error, + ErrorKind, + IndexedWriteError, + InsertManyError, + Result, + WriteConcernError, + WriteFailure, }, options::{ClientOptions, WriteConcern}, selection_criteria::SelectionCriteria, - BoxFuture, ClientSession, Namespace, + BoxFuture, + ClientSession, + Namespace, }; pub(crate) use abort_transaction::AbortTransaction; @@ -137,19 +147,30 @@ pub(crate) trait Operation { /// Whether this operation prefers to take ownership of the server response body for /// zero-copy handling. - /// Defaults to false; operations can override via OperationWithDefaults. + /// + /// Operations that parse raw batches (e.g. raw find/getMore) should return `true` and implement + /// [`handle_response_owned`] to avoid cloning the server reply bytes. fn wants_owned_response(&self) -> bool { false } /// Interprets the server response taking ownership of the body to enable zero-copy handling. - /// Is only ever called if wants_owned_response returns True + /// + /// Default behavior delegates to the borrowed [`handle_response`]; operations that return + /// `true` from [`wants_owned_response`] should override this to consume the response. fn handle_response_owned<'a>( &'a self, - _response: RawCommandResponse, - _context: ExecutionContext<'a>, + response: RawCommandResponse, + context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { - unimplemented!() + // Default: owned path not implemented. Return a Send future that does not capture `self`. + async move { + Err(ErrorKind::Internal { + message: format!("owned response handling not implemented for {}", Self::NAME), + } + .into()) + } + .boxed() } /// Interpret an error encountered while sending the built command to the server, potentially @@ -241,11 +262,15 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Whether this operation prefers to take ownership of the server response body for /// zero-copy handling. + /// + /// Override to `true` for operations that can consume the response without cloning it. fn wants_owned_response(&self) -> bool { false } /// Interprets the server response taking ownership of the body to enable zero-copy handling. + /// + /// Default implementation defers to the borrowed handler; override for true zero-copy handling. fn handle_response_owned<'a>( &'a self, response: RawCommandResponse, From 5f1eb3fd32ec1fc87514f8285f161e6027ab44af Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:08:08 -0800 Subject: [PATCH 10/17] Remove unrelated files --- crates/mox_client/python/tests/conftest.py | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 crates/mox_client/python/tests/conftest.py diff --git a/crates/mox_client/python/tests/conftest.py b/crates/mox_client/python/tests/conftest.py deleted file mode 100644 index 5d871b105..000000000 --- a/crates/mox_client/python/tests/conftest.py +++ /dev/null @@ -1,18 +0,0 @@ -import logging -import sys - - -def pytest_configure(config): - # Configure root logger so Rust logs forwarded via pyo3-log are visible - root = logging.getLogger() - if not root.handlers: # avoid duplicate handlers under repeated runs - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter( - logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") - ) - root.addHandler(handler) - # Default to INFO globally; turn up verbosity for our crate - root.setLevel(logging.INFO) - logging.getLogger("mox_client").setLevel(logging.DEBUG) - - From 133e4e917fd6c214c11f17f60ff0e89419ee399a Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:11:28 -0800 Subject: [PATCH 11/17] Clean up comment --- src/client/executor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 0dc068cb7..ac40efe89 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -697,8 +697,6 @@ impl Client { }; // Owned path: we move the server response into the handler for zero-copy parsing. - // Note: since the response is moved, we must not attempt to attach it to errors - // (e.g. via with_server_response) after this branch. if op.wants_owned_response() { match op.handle_response_owned(response, context).await { Ok(output) => Ok(output), From 67ef3b60fff2e7ee2cc899aacde17a14c067b212 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:12:12 -0800 Subject: [PATCH 12/17] Clean up comment --- src/client/executor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index ac40efe89..ade3b5bd4 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -696,7 +696,6 @@ impl Client { effective_criteria: effective_criteria.clone(), }; - // Owned path: we move the server response into the handler for zero-copy parsing. if op.wants_owned_response() { match op.handle_response_owned(response, context).await { Ok(output) => Ok(output), From 0c82edc3dc84aa958008e80f6266917ed38b1064 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:15:13 -0800 Subject: [PATCH 13/17] Some small cleanups --- src/cursor/raw_batch.rs | 37 ++++++++++--------------------------- src/operation.rs | 31 +++++++------------------------ 2 files changed, 17 insertions(+), 51 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index ed75dfdb0..8829db533 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -7,7 +7,7 @@ use crate::bson::{RawArray, RawBsonRef}; use futures_core::{future::BoxFuture, Future, Stream}; use crate::{ - bson::{RawDocumentBuf}, + bson::RawDocumentBuf, change_stream::event::ResumeToken, client::{options::ServerAddress, AsyncDropToken}, cmap::conn::PinnedConnectionHandle, @@ -149,7 +149,8 @@ impl Stream for RawBatchCursor { self.info.ns = out.ns; } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.mark_exhausted(); } let exhausted_now = self.state.exhausted; @@ -358,7 +359,8 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { self.parent.initial_reply = Some(out.raw_reply); } Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { self.parent.exhausted = true; } let exhausted_now = self.parent.exhausted; @@ -383,7 +385,11 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { let info = self.parent.info.clone(); let client = self.parent.client.clone(); // Avoid borrow conflicts by replicating the handle into a temporary owner. - let pinned_owned = self.parent.pinned_connection.handle().map(|c| c.replicate()); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); let pinned_ref = pinned_owned.as_ref(); self.provider.start_execution(info, client, pinned_ref); // Immediately poll once to register the waker and opportunistically buffer. @@ -499,29 +505,6 @@ impl<'s, S: ClientSessionHandle<'s>> GetMoreRawProvider<'s, S> { } } -pub struct RawDocumentStream { - inner: R, -} - -impl RawBatchCursor { - pub fn into_raw_documents(self) -> RawDocumentStream { - RawDocumentStream { inner: self } - } -} - -impl Stream for RawDocumentStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/operation.rs b/src/operation.rs index 3a3e5e6f9..4c7960457 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -40,25 +40,15 @@ use crate::{ client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, cmap::{ conn::{pooled::PooledConnection, PinnedConnectionHandle}, - Command, - RawCommandResponse, - StreamDescription, + Command, RawCommandResponse, StreamDescription, }, error::{ - CommandError, - Error, - ErrorKind, - IndexedWriteError, - InsertManyError, - Result, - WriteConcernError, - WriteFailure, + CommandError, Error, ErrorKind, IndexedWriteError, InsertManyError, Result, + WriteConcernError, WriteFailure, }, options::{ClientOptions, WriteConcern}, selection_criteria::SelectionCriteria, - BoxFuture, - ClientSession, - Namespace, + BoxFuture, ClientSession, Namespace, }; pub(crate) use abort_transaction::AbortTransaction; @@ -160,17 +150,10 @@ pub(crate) trait Operation { /// `true` from [`wants_owned_response`] should override this to consume the response. fn handle_response_owned<'a>( &'a self, - response: RawCommandResponse, - context: ExecutionContext<'a>, + _response: RawCommandResponse, + _context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { - // Default: owned path not implemented. Return a Send future that does not capture `self`. - async move { - Err(ErrorKind::Internal { - message: format!("owned response handling not implemented for {}", Self::NAME), - } - .into()) - } - .boxed() + unimplemented!() } /// Interpret an error encountered while sending the built command to the server, potentially From 1605622049704b95bd143ee67264328427f426d9 Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:18:02 -0800 Subject: [PATCH 14/17] cleanup --- exe_name.txt | 1 - 1 file changed, 1 deletion(-) delete mode 100644 exe_name.txt diff --git a/exe_name.txt b/exe_name.txt deleted file mode 100644 index 930fc389c..000000000 --- a/exe_name.txt +++ /dev/null @@ -1 +0,0 @@ -/Users/silasmarvin/github/mongo-rust-driver/target/debug/deps/mongodb-dfd7b57d04f5145a \ No newline at end of file From 3f558210113eb84255dce4bd8d5b08f7ee72966d Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 19:22:23 -0800 Subject: [PATCH 15/17] cleanup --- src/cursor/raw_batch.rs | 106 ++------------------------------------ src/operation/find_raw.rs | 7 +-- 2 files changed, 5 insertions(+), 108 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index 8829db533..013a6a89d 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -77,7 +77,6 @@ struct RawBatchCursorState { post_batch_resume_token: Option, provider: GetMoreRawProvider<'static, ImplicitClientSessionHandle>, initial_reply: Option, - pending_reply: Option, } impl RawBatchCursor { @@ -103,7 +102,6 @@ impl RawBatchCursor { GetMoreRawProvider::Idle(Box::new(ImplicitClientSessionHandle(session))) }, initial_reply: Some(spec.initial_reply), - pending_reply: None, }, } } @@ -138,7 +136,7 @@ impl Stream for RawBatchCursor { Poll::Ready(get_more_out) => { match get_more_out.result { Ok(out) => { - self.state.pending_reply = Some(out.raw_reply); + self.state.initial_reply = Some(out.raw_reply); self.state.post_batch_resume_token = out.post_batch_resume_token; if out.exhausted { self.mark_exhausted(); @@ -168,56 +166,8 @@ impl Stream for RawBatchCursor { } } - // Yield any buffered reply (initial or pending). - if let Some(reply) = self - .state - .initial_reply - .take() - .or_else(|| self.state.pending_reply.take()) - { - // Prefetch the next getMore before returning this batch, if applicable. - if !self.state.exhausted - && !matches!(self.state.pinned_connection, PinnedConnection::Invalid(_)) - { - let info = self.info.clone(); - let client = self.client.clone(); - let state = &mut self.state; - state - .provider - .start_execution(info, client, state.pinned_connection.handle()); - // Immediately poll once to register the waker and opportunistically buffer. - if let Some(fut) = state.provider.executing_future() { - match Pin::new(fut).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out) => { - match get_more_out.result { - Ok(out) => { - state.pending_reply = Some(out.raw_reply); - state.post_batch_resume_token = out.post_batch_resume_token; - if out.exhausted { - self.mark_exhausted(); - } - if out.id != 0 { - self.info.id = out.id; - } - self.info.ns = out.ns; - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.mark_exhausted(); - } - // Intentionally do not surface the error here; clear and continue. - } - } - let exhausted_now = self.state.exhausted; - self.state - .provider - .clear_execution(get_more_out.session, exhausted_now); - } - } - } - } + // Yield any buffered reply. + if let Some(reply) = self.state.initial_reply.take() { return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } @@ -376,56 +326,8 @@ impl Stream for SessionRawBatchCursorStream<'_, '_> { } } - // Yield any buffered reply (initial). + // Yield any buffered reply. if let Some(reply) = self.parent.initial_reply.take() { - // Prefetch the next getMore before returning this batch, if applicable. - if !self.parent.exhausted - && !matches!(self.parent.pinned_connection, PinnedConnection::Invalid(_)) - { - let info = self.parent.info.clone(); - let client = self.parent.client.clone(); - // Avoid borrow conflicts by replicating the handle into a temporary owner. - let pinned_owned = self - .parent - .pinned_connection - .handle() - .map(|c| c.replicate()); - let pinned_ref = pinned_owned.as_ref(); - self.provider.start_execution(info, client, pinned_ref); - // Immediately poll once to register the waker and opportunistically buffer. - if let Some(fut) = self.provider.executing_future() { - match Pin::new(fut).poll(cx) { - Poll::Pending => {} - Poll::Ready(get_more_out) => { - match get_more_out.result { - Ok(out) => { - if out.exhausted { - self.parent.exhausted = true; - } - if out.id != 0 { - self.parent.info.id = out.id; - } - self.parent.info.ns = out.ns; - self.parent.post_batch_resume_token = - out.post_batch_resume_token; - // buffer for the next poll - self.parent.initial_reply = Some(out.raw_reply); - } - Err(e) => { - if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) - { - self.parent.exhausted = true; - } - // Intentionally do not surface the error here; clear and continue. - } - } - let exhausted_now = self.parent.exhausted; - self.provider - .clear_execution(get_more_out.session, exhausted_now); - } - } - } - } return Poll::Ready(Some(Ok(RawBatch::new(reply)))); } diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs index e0300533c..b53e689e8 100644 --- a/src/operation/find_raw.rs +++ b/src/operation/find_raw.rs @@ -97,12 +97,9 @@ impl OperationWithDefaults for FindRaw { comment, }; - // Take ownership of the raw reply with zero copies. - let raw = response.into_raw_document_buf(); - Ok(RawBatchCursorSpecification { info, - initial_reply: raw, + initial_reply: response.into_raw_document_buf(), post_batch_resume_token, }) } @@ -162,8 +159,6 @@ impl OperationWithDefaults for FindRaw { CursorBody::extract_at_cluster_time(response) } - // borrowed handle_response intentionally unimplemented above - fn supports_read_concern(&self, _description: &StreamDescription) -> bool { true } From 2bf031564bafdffa1488b29ea4d2b8915bb6237e Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 20:56:46 -0800 Subject: [PATCH 16/17] Refactor --- benchmarks/src/bench/find_raw_batches.rs | 8 +-- src/action.rs | 4 +- src/action/find.rs | 73 ++++-------------------- src/bson_compat.rs | 2 + src/cursor/raw_batch.rs | 50 +++++++++++++++- src/db.rs | 23 ++++++++ src/db/action.rs | 1 + src/db/action/find_raw.rs | 50 ++++++++++++++++ src/test/coll.rs | 5 +- 9 files changed, 143 insertions(+), 73 deletions(-) create mode 100644 src/db/action/find_raw.rs diff --git a/benchmarks/src/bench/find_raw_batches.rs b/benchmarks/src/bench/find_raw_batches.rs index bc254026d..332ee767d 100644 --- a/benchmarks/src/bench/find_raw_batches.rs +++ b/benchmarks/src/bench/find_raw_batches.rs @@ -2,14 +2,13 @@ use anyhow::Result; use futures::stream::StreamExt; use mongodb::{ bson::{doc, Document}, - Client, Collection, Database, + Client, Database, }; use crate::bench::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME}; pub struct FindRawBatchesBenchmark { db: Database, - coll: Collection, uri: String, } @@ -30,19 +29,18 @@ impl Benchmark for FindRawBatchesBenchmark { let db = client.database(&DATABASE_NAME); drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?; - let coll = db.collection(&COLL_NAME); + let coll = db.collection::(&COLL_NAME); let docs = vec![options.doc.clone(); options.num_iter]; coll.insert_many(docs).await?; Ok(FindRawBatchesBenchmark { db, - coll, uri: options.uri, }) } async fn do_task(&self, _state: Self::TaskState) -> Result<()> { - let mut batches = self.coll.find_raw_batches(doc! {}).await?; + let mut batches = self.db.find_raw_batches(COLL_NAME.as_str(), doc! {}).await?; while let Some(batch_res) = batches.next().await { batch_res?; } diff --git a/src/action.rs b/src/action.rs index 91ebc6fd4..8bd1f611a 100644 --- a/src/action.rs +++ b/src/action.rs @@ -44,7 +44,7 @@ pub use delete::Delete; pub use distinct::Distinct; pub use drop::{DropCollection, DropDatabase}; pub use drop_index::DropIndex; -pub use find::{Find, FindOne}; +pub use find::{Find, FindOne, FindRawBatches}; pub use find_and_modify::{FindOneAndDelete, FindOneAndReplace, FindOneAndUpdate}; pub use insert_many::InsertMany; pub use insert_one::InsertOne; @@ -69,7 +69,7 @@ pub struct ListNames; #[allow(missing_docs)] pub struct ImplicitSession; #[allow(missing_docs)] -pub struct ExplicitSession<'a>(&'a mut crate::ClientSession); +pub struct ExplicitSession<'a>(pub(crate) &'a mut crate::ClientSession); #[allow(missing_docs)] pub struct Single; diff --git a/src/action/find.rs b/src/action/find.rs index 9dae77543..4b2888874 100644 --- a/src/action/find.rs +++ b/src/action/find.rs @@ -22,7 +22,6 @@ use super::{ export_doc, option_setters, options_doc, - CollRef, ExplicitSession, ImplicitSession, }; @@ -42,21 +41,6 @@ impl Collection { session: ImplicitSession, } } - - /// Finds the documents and returns raw server batches. - /// - /// `await` will return d[`Result`] (or - /// d[`Result`] if a session is provided). - #[deeplink] - #[options_doc(find)] - pub fn find_raw_batches(&self, filter: Document) -> FindRawBatches<'_> { - FindRawBatches { - cr: CollRef::new(self), - filter, - options: None, - session: ImplicitSession, - } - } } impl Collection { @@ -158,23 +142,28 @@ impl<'a, T: Send + Sync> Action for Find<'a, T, ExplicitSession<'a>> { } } +/// Finds documents in a collection and returns raw server batches. Construct with +/// [`Database::find_raw_batches`](crate::Database::find_raw_batches). #[must_use] pub struct FindRawBatches<'a, Session = ImplicitSession> { - cr: CollRef<'a>, - filter: Document, - options: Option, - session: Session, + pub(crate) db: &'a crate::Database, + pub(crate) collection: String, + pub(crate) filter: Document, + pub(crate) options: Option, + pub(crate) session: Session, } #[option_setters(crate::coll::options::FindOptions)] #[export_doc(find_raw_batches)] impl<'a, Session> FindRawBatches<'a, Session> { + /// Use the provided session when running the operation. pub fn session<'s>( self, value: impl Into<&'s mut ClientSession>, ) -> FindRawBatches<'a, ExplicitSession<'s>> { FindRawBatches { - cr: self.cr, + db: self.db, + collection: self.collection, filter: self.filter, options: self.options, session: ExplicitSession(value.into()), @@ -182,48 +171,6 @@ impl<'a, Session> FindRawBatches<'a, Session> { } } -#[action_impl] -impl<'a> Action for FindRawBatches<'a, ImplicitSession> { - type Future = FindRawBatchesFuture; - - async fn execute(mut self) -> Result { - resolve_options!(self.cr, self.options, [read_concern, selection_criteria]); - let op = crate::operation::find_raw::FindRaw::new( - self.cr.namespace(), - self.filter, - self.options, - ); - self.cr - .client() - .execute_raw_batch_cursor_operation(op) - .await - } -} - -#[action_impl] -impl<'a> Action for FindRawBatches<'a, ExplicitSession<'a>> { - type Future = FindRawBatchesSessionFuture; - - async fn execute(mut self) -> Result { - resolve_read_concern_with_session!(self.cr, self.options, Some(&mut *self.session.0))?; - resolve_selection_criteria_with_session!( - self.cr, - self.options, - Some(&mut *self.session.0) - )?; - - let op = crate::operation::find_raw::FindRaw::new( - self.cr.namespace(), - self.filter, - self.options, - ); - self.cr - .client() - .execute_session_raw_batch_cursor_operation(op, self.session.0) - .await - } -} - /// Finds a single document in a collection matching a filter. Construct with /// [`Collection::find_one`]. #[must_use] diff --git a/src/bson_compat.rs b/src/bson_compat.rs index dc8263db0..90c5862d4 100644 --- a/src/bson_compat.rs +++ b/src/bson_compat.rs @@ -37,6 +37,7 @@ pub(crate) trait RawDocumentBufExt: Sized { value: impl Into> + 'a, ); + #[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))] fn to_document(&self) -> crate::error::Result; } @@ -50,6 +51,7 @@ impl RawDocumentBufExt for crate::bson::RawDocumentBuf { self.append(key, value); } + #[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))] fn to_document(&self) -> crate::error::Result { self.try_into().map_err(Into::into) } diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index 013a6a89d..6d33ba733 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -1,9 +1,49 @@ +//! Raw batch cursor API for zero-copy document processing. +//! +//! This module provides a high-performance alternative to the standard cursor API when you need +//! direct access to server response batches without per-document deserialization overhead. +//! +//! # When to Use +//! +//! **Use `find_raw_batches()` when:** +//! - Processing high-volume queries where deserialization is a bottleneck +//! - Implementing custom batch-level logic (e.g., batch transformation, filtering) +//! - Inspecting raw BSON structure without a known schema +//! - Forwarding documents without modification (e.g., proxying, caching) +//! +//! **Use regular `find()` when:** +//! - Working with strongly-typed `Deserialize` documents +//! - Iterating one document at a time +//! - Deserialization overhead is acceptable for your use case +//! +//! # Example +//! +//! ```no_run +//! # use mongodb::{Client, bson::doc}; +//! # async fn example() -> mongodb::error::Result<()> { +//! # let client = Client::with_uri_str("mongodb://localhost:27017").await?; +//! # let db = client.database("db"); +//! use futures::stream::StreamExt; +//! +//! let mut cursor = db.find_raw_batches("coll", doc! {}).await?; +//! while let Some(batch) = cursor.next().await { +//! let batch = batch?; +//! // Zero-copy access to documents in this batch +//! for doc_result in batch.doc_slices()? { +//! let doc = doc_result?; +//! // Process raw document +//! } +//! } +//! # Ok(()) +//! # } +//! ``` + use std::{ pin::Pin, task::{Context, Poll}, }; -use crate::bson::{RawArray, RawBsonRef}; +use crate::bson::{RawArray, RawBsonRef, RawDocument}; use futures_core::{future::BoxFuture, Future, Stream}; use crate::{ @@ -61,6 +101,14 @@ impl RawBatch { docs.as_array() .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch")) } + + /// Returns a reference to the full server response document. + /// + /// This provides access to all fields in the server's response, including cursor metadata, + /// for debugging or custom parsing. + pub fn as_raw_document(&self) -> &RawDocument { + self.reply.as_ref() + } } pub struct RawBatchCursor { diff --git a/src/db.rs b/src/db.rs index f9da393d7..5391115b7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -142,4 +142,27 @@ impl Database { pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) } + + /// Finds the documents in a collection and returns raw server batches. + /// + /// This method returns raw BSON batches without deserializing individual documents, + /// providing zero-copy access for high-performance scenarios. + /// + /// `await` will return [`Result`] (or + /// [`Result`] if a session is provided). + /// + /// See the [`raw_batch`](crate::cursor::raw_batch) module documentation for usage guidance. + pub fn find_raw_batches( + &self, + collection: impl Into, + filter: crate::bson::Document, + ) -> crate::action::FindRawBatches<'_> { + crate::action::FindRawBatches { + db: self, + collection: collection.into(), + filter, + options: None, + session: crate::action::ImplicitSession, + } + } } diff --git a/src/db/action.rs b/src/db/action.rs index ba61d596b..8306658f4 100644 --- a/src/db/action.rs +++ b/src/db/action.rs @@ -1 +1,2 @@ pub(crate) mod create_collection; +pub(crate) mod find_raw; diff --git a/src/db/action/find_raw.rs b/src/db/action/find_raw.rs new file mode 100644 index 000000000..40b6c91b1 --- /dev/null +++ b/src/db/action/find_raw.rs @@ -0,0 +1,50 @@ +use crate::{ + action::{action_impl, FindRawBatches}, + error::Result, + Namespace, +}; + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, crate::action::ImplicitSession> { + type Future = FindRawBatchesFuture; + + async fn execute(mut self) -> Result { + resolve_options!(self.db, self.options, [read_concern, selection_criteria]); + + let ns = Namespace { + db: self.db.name().to_string(), + coll: self.collection, + }; + + let op = crate::operation::find_raw::FindRaw::new(ns, self.filter, self.options); + self.db + .client() + .execute_raw_batch_cursor_operation(op) + .await + } +} + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, crate::action::ExplicitSession<'a>> { + type Future = FindRawBatchesSessionFuture; + + async fn execute(mut self) -> Result { + resolve_read_concern_with_session!(self.db, self.options, Some(&mut *self.session.0))?; + resolve_selection_criteria_with_session!( + self.db, + self.options, + Some(&mut *self.session.0) + )?; + + let ns = Namespace { + db: self.db.name().to_string(), + coll: self.collection, + }; + + let op = crate::operation::find_raw::FindRaw::new(ns, self.filter, self.options); + self.db + .client() + .execute_session_raw_batch_cursor_operation(op, self.session.0) + .await + } +} diff --git a/src/test/coll.rs b/src/test/coll.rs index 8e08aaeae..07fac4569 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -153,8 +153,9 @@ async fn find_raw_batches_one() { coll.insert_one(doc! { "x": 1 }).await.unwrap(); - let mut batches = coll - .find_raw_batches(doc! { "x": 1 }) + let db = client.database(function_name!()); + let mut batches = db + .find_raw_batches(function_name!(), doc! { "x": 1 }) .limit(-1) .await .unwrap(); From 89f8859c6717b5f1620ddea4ddcb42bff203c99c Mon Sep 17 00:00:00 2001 From: Silas Marvin Date: Wed, 12 Nov 2025 21:58:57 -0800 Subject: [PATCH 17/17] Cleanups --- src/cursor/raw_batch.rs | 4 ++-- src/db.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs index 6d33ba733..be86bf00d 100644 --- a/src/cursor/raw_batch.rs +++ b/src/cursor/raw_batch.rs @@ -71,7 +71,7 @@ pub(crate) struct RawBatchCursorSpecification { /// A raw batch response returned by the server for a cursor getMore/find. /// -/// This provides zero-copy access to the server's batch array via [`doc_slices`]. +/// This provides zero-copy access to the server's batch array via [`doc_slices`](RawBatch::doc_slices). #[derive(Debug)] pub struct RawBatch { reply: RawDocumentBuf, @@ -86,7 +86,7 @@ impl RawBatch { /// /// This lets callers iterate over [`crate::bson::RawDocument`] items directly for maximal /// performance. - pub fn doc_slices<'a>(&'a self) -> Result<&'a RawArray> { + pub fn doc_slices(&self) -> Result<&RawArray> { let root = self.reply.as_ref(); let cursor = root .get("cursor")? diff --git a/src/db.rs b/src/db.rs index 5391115b7..8596cdfda 100644 --- a/src/db.rs +++ b/src/db.rs @@ -151,7 +151,7 @@ impl Database { /// `await` will return [`Result`] (or /// [`Result`] if a session is provided). /// - /// See the [`raw_batch`](crate::cursor::raw_batch) module documentation for usage guidance. + /// See the `cursor::raw_batch` module documentation for usage guidance. pub fn find_raw_batches( &self, collection: impl Into,