diff --git a/benchmarks/Cargo.lock b/benchmarks/Cargo.lock index 4888a65a3..b299b7e0b 100644 --- a/benchmarks/Cargo.lock +++ b/benchmarks/Cargo.lock @@ -39,21 +39,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -112,12 +97,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.22.1" @@ -163,12 +142,12 @@ version = "2.15.0" source = "git+https://github.com/mongodb/bson-rust?branch=2.15.x#f6f163095b5159ce175424b0e02f9bd7acfaddf2" dependencies = [ "ahash", - "base64 0.22.1", + "base64", "bitvec", "getrandom 0.2.15", "getrandom 0.3.3", "hex", - "indexmap 2.7.1", + "indexmap", "js-sys", "once_cell", "rand 0.9.2", @@ -218,11 +197,7 @@ version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ - "android-tzdata", - "iana-time-zone", "num-traits", - "serde", - "windows-targets 0.52.6", ] [[package]] @@ -279,12 +254,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" -[[package]] -name = "core-foundation-sys" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" - [[package]] name = "cpufeatures" version = "0.2.17" @@ -358,7 +327,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", - "serde", ] [[package]] @@ -595,12 +563,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.15.2" @@ -645,7 +607,7 @@ dependencies = [ "ipnet", "once_cell", "rand 0.8.5", - "thiserror", + "thiserror 1.0.69", "tinyvec", "tokio", "tracing", @@ -668,7 +630,7 @@ dependencies = [ "rand 0.8.5", "resolv-conf", "smallvec", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -693,29 +655,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "iana-time-zone" -version = "0.1.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows-core", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - [[package]] name = "icu_collections" version = "1.5.0" @@ -861,17 +800,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", - "serde", -] - [[package]] name = "indexmap" version = "2.7.1" @@ -879,8 +807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown 0.15.2", - "serde", + "hashbrown", ] [[package]] @@ -901,7 +828,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.8", "widestring", "windows-sys 0.48.0", "winreg", @@ -937,9 +864,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "linked-hash-map" @@ -1088,8 +1015,8 @@ source = "git+https://github.com/mongodb/libmongocrypt-rust.git?branch=main#1327 name = "mongodb" version = "3.3.0" dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", + "base64", + "bitflags 2.8.0", "bson", "chrono", "derive-where", @@ -1107,24 +1034,26 @@ dependencies = [ "mongodb-internal-macros", "pbkdf2", "percent-encoding", - "rand 0.8.5", + "rand 0.9.2", "rustc_version_runtime", "rustls", "rustversion", "serde", + "serde_bytes", "serde_with", "sha1", "sha2", - "socket2", + "socket2 0.6.1", "stringprep", "strsim 0.11.1", "take_mut", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-rustls", + "tokio-util", "typed-builder", "uuid", - "webpki-roots 0.26.11", + "webpki-roots", ] [[package]] @@ -1565,7 +1494,7 @@ version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "930cfb6e6abf99298aaad7d29abbef7a9999a9a8806a40088f55f0dcec03146b" dependencies = [ - "indexmap 2.7.1", + "indexmap", "itoa", "memchr", "ryu", @@ -1578,16 +1507,9 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ - "base64 0.22.1", - "chrono", - "hex", - "indexmap 1.9.3", - "indexmap 2.7.1", "serde", "serde_derive", - "serde_json", "serde_with_macros", - "time", ] [[package]] @@ -1664,6 +1586,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spin" version = "0.9.8" @@ -1765,7 +1697,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl 2.0.17", ] [[package]] @@ -1779,6 +1720,17 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "time" version = "0.3.37" @@ -1856,7 +1808,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.8", "tokio-macros", "windows-sys 0.52.0", ] @@ -1901,6 +1853,7 @@ checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -1918,7 +1871,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.7.1", + "indexmap", "toml_datetime", "winnow", ] @@ -1956,18 +1909,18 @@ dependencies = [ [[package]] name = "typed-builder" -version = "0.20.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7" +checksum = "398a3a3c918c96de527dc11e6e846cd549d4508030b8a33e1da12789c856b81a" dependencies = [ "typed-builder-macro", ] [[package]] name = "typed-builder-macro" -version = "0.20.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28" +checksum = "0e48cea23f68d1f78eb7bc092881b6bb88d3d6b5b7e6234f6f9c911da1ffb221" dependencies = [ "proc-macro2", "quote", @@ -2153,15 +2106,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "webpki-roots" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" -dependencies = [ - "webpki-roots 1.0.2", -] - [[package]] name = "webpki-roots" version = "1.0.2" @@ -2200,13 +2144,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows-core" -version = "0.52.0" +name = "windows-link" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets 0.52.6", -] +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" [[package]] name = "windows-sys" @@ -2235,6 +2176,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -2259,13 +2209,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -2278,6 +2245,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -2290,6 +2263,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -2302,12 +2281,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -2320,6 +2311,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -2332,6 +2329,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -2344,6 +2347,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -2356,6 +2365,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.5.40" diff --git a/benchmarks/src/bench.rs b/benchmarks/src/bench.rs index b448254eb..297b149cf 100644 --- a/benchmarks/src/bench.rs +++ b/benchmarks/src/bench.rs @@ -12,6 +12,7 @@ pub mod insert_one; pub mod json_multi_export; pub mod json_multi_import; pub mod run_command; +pub mod find_raw_batches; use std::{ convert::TryInto, diff --git a/benchmarks/src/bench/find_raw_batches.rs b/benchmarks/src/bench/find_raw_batches.rs new file mode 100644 index 000000000..332ee767d --- /dev/null +++ b/benchmarks/src/bench/find_raw_batches.rs @@ -0,0 +1,54 @@ +use anyhow::Result; +use futures::stream::StreamExt; +use mongodb::{ + bson::{doc, Document}, + Client, Database, +}; + +use crate::bench::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME}; + +pub struct FindRawBatchesBenchmark { + db: Database, + uri: String, +} + +/// Specifies the options to `FindRawBatchesBenchmark::setup` operation. +pub struct Options { + pub num_iter: usize, + pub doc: Document, + pub uri: String, +} + +#[async_trait::async_trait] +impl Benchmark for FindRawBatchesBenchmark { + type Options = Options; + type TaskState = (); + + async fn setup(options: Self::Options) -> Result { + let client = Client::with_uri_str(&options.uri).await?; + let db = client.database(&DATABASE_NAME); + drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?; + + let coll = db.collection::(&COLL_NAME); + let docs = vec![options.doc.clone(); options.num_iter]; + coll.insert_many(docs).await?; + + Ok(FindRawBatchesBenchmark { + db, + uri: options.uri, + }) + } + + async fn do_task(&self, _state: Self::TaskState) -> Result<()> { + let mut batches = self.db.find_raw_batches(COLL_NAME.as_str(), doc! {}).await?; + while let Some(batch_res) = batches.next().await { + batch_res?; + } + Ok(()) + } + + async fn teardown(&self) -> Result<()> { + drop_database(self.uri.as_str(), self.db.name()).await?; + Ok(()) + } +} diff --git a/benchmarks/src/main.rs b/benchmarks/src/main.rs index a0089d7e3..47d5dfc29 100644 --- a/benchmarks/src/main.rs +++ b/benchmarks/src/main.rs @@ -24,6 +24,7 @@ use crate::{ bson_decode::BsonDecodeBenchmark, bson_encode::BsonEncodeBenchmark, bulk_write::{InsertBulkWriteBenchmark, MixedBulkWriteBenchmark}, + find_raw_batches::FindRawBatchesBenchmark, find_many::FindManyBenchmark, find_one::FindOneBenchmark, gridfs_download::GridFsDownloadBenchmark, @@ -62,6 +63,7 @@ const FIND_ONE_BENCH: &str = "Find one"; const FIND_MANY_BENCH: &str = "Find many and empty cursor"; const FIND_MANY_BENCH_RAW: &str = "Find many and empty cursor (raw BSON)"; const FIND_MANY_BENCH_SERDE: &str = "Find many and empty cursor (serde structs)"; +const FIND_MANY_BENCH_RAW_BATCHES: &str = "Find many and empty cursor (raw batches)"; const GRIDFS_DOWNLOAD_BENCH: &str = "GridFS download"; const LDJSON_MULTI_EXPORT_BENCH: &str = "LDJSON multi-file export"; const GRIDFS_MULTI_DOWNLOAD_BENCH: &str = "GridFS multi-file download"; @@ -104,6 +106,7 @@ enum BenchmarkId { SmallDocInsertBulkWrite, // 23 LargeDocInsertBulkWrite, // 24 MixedBulkWrite, // 25 + FindManyRawBatches, // 26 } impl BenchmarkId { @@ -127,6 +130,7 @@ impl BenchmarkId { BenchmarkId::BsonFullDocumentEncode => FULL_BSON_ENCODING, BenchmarkId::FindManyRawBson => FIND_MANY_BENCH_RAW, BenchmarkId::FindManySerde => FIND_MANY_BENCH_SERDE, + BenchmarkId::FindManyRawBatches => FIND_MANY_BENCH_RAW_BATCHES, BenchmarkId::GridFsDownload => GRIDFS_DOWNLOAD_BENCH, BenchmarkId::GridFsUpload => GRIDFS_UPLOAD_BENCH, BenchmarkId::GridFsMultiDownload => GRIDFS_MULTI_DOWNLOAD_BENCH, @@ -159,6 +163,7 @@ const SINGLE_BENCHES: &[&str] = &[ /// Benchmarks included in the "MultiBench" composite. const MULTI_BENCHES: &[&str] = &[ FIND_MANY_BENCH_RAW, + FIND_MANY_BENCH_RAW_BATCHES, SMALL_DOC_INSERT_MANY_BENCH, LARGE_DOC_INSERT_MANY_BENCH, GRIDFS_UPLOAD_BENCH, @@ -180,6 +185,7 @@ const PARALLEL_BENCHES: &[&str] = &[ const READ_BENCHES: &[&str] = &[ FIND_ONE_BENCH, FIND_MANY_BENCH_RAW, + FIND_MANY_BENCH_RAW_BATCHES, GRIDFS_DOWNLOAD_BENCH, LDJSON_MULTI_EXPORT_BENCH, GRIDFS_MULTI_DOWNLOAD_BENCH, @@ -199,7 +205,7 @@ const WRITE_BENCHES: &[&str] = &[ MIXED_BULK_WRITE_BENCH, ]; -const MAX_ID: u8 = BenchmarkId::MixedBulkWrite as u8; +const MAX_ID: u8 = BenchmarkId::FindManyRawBatches as u8; async fn run_benchmarks( uri: &str, @@ -465,6 +471,17 @@ async fn run_benchmarks( comp_score += score_test(find_many, id.name(), 16.22, more_info); } + // Find many using raw batches and empty the cursor + BenchmarkId::FindManyRawBatches => { + let options = bench::find_raw_batches::Options { + num_iter: 10000, + doc: get_tweet().await, + uri: uri.to_string(), + }; + let result = + bench::run_benchmark::(options).await?; + comp_score += score_test(result, FIND_MANY_BENCH_RAW_BATCHES, 16.22, more_info); + } // GridFS download BenchmarkId::GridFsDownload => { @@ -589,6 +606,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet { } if matches.is_present("multi") { ids.insert(BenchmarkId::FindManyRawBson); + ids.insert(BenchmarkId::FindManyRawBatches); ids.insert(BenchmarkId::SmallDocInsertMany); ids.insert(BenchmarkId::LargeDocInsertMany); ids.insert(BenchmarkId::GridFsDownload); @@ -621,6 +639,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet { ids.insert(BenchmarkId::FindMany); ids.insert(BenchmarkId::FindManyRawBson); ids.insert(BenchmarkId::FindManySerde); + ids.insert(BenchmarkId::FindManyRawBatches); ids.insert(BenchmarkId::SmallDocInsertMany); ids.insert(BenchmarkId::LargeDocInsertMany); ids.insert(BenchmarkId::LdJsonMultiFileImport); diff --git a/src/action.rs b/src/action.rs index 91ebc6fd4..8bd1f611a 100644 --- a/src/action.rs +++ b/src/action.rs @@ -44,7 +44,7 @@ pub use delete::Delete; pub use distinct::Distinct; pub use drop::{DropCollection, DropDatabase}; pub use drop_index::DropIndex; -pub use find::{Find, FindOne}; +pub use find::{Find, FindOne, FindRawBatches}; pub use find_and_modify::{FindOneAndDelete, FindOneAndReplace, FindOneAndUpdate}; pub use insert_many::InsertMany; pub use insert_one::InsertOne; @@ -69,7 +69,7 @@ pub struct ListNames; #[allow(missing_docs)] pub struct ImplicitSession; #[allow(missing_docs)] -pub struct ExplicitSession<'a>(&'a mut crate::ClientSession); +pub struct ExplicitSession<'a>(pub(crate) &'a mut crate::ClientSession); #[allow(missing_docs)] pub struct Single; diff --git a/src/action/find.rs b/src/action/find.rs index 121e9d00c..4b2888874 100644 --- a/src/action/find.rs +++ b/src/action/find.rs @@ -142,6 +142,35 @@ impl<'a, T: Send + Sync> Action for Find<'a, T, ExplicitSession<'a>> { } } +/// Finds documents in a collection and returns raw server batches. Construct with +/// [`Database::find_raw_batches`](crate::Database::find_raw_batches). +#[must_use] +pub struct FindRawBatches<'a, Session = ImplicitSession> { + pub(crate) db: &'a crate::Database, + pub(crate) collection: String, + pub(crate) filter: Document, + pub(crate) options: Option, + pub(crate) session: Session, +} + +#[option_setters(crate::coll::options::FindOptions)] +#[export_doc(find_raw_batches)] +impl<'a, Session> FindRawBatches<'a, Session> { + /// Use the provided session when running the operation. + pub fn session<'s>( + self, + value: impl Into<&'s mut ClientSession>, + ) -> FindRawBatches<'a, ExplicitSession<'s>> { + FindRawBatches { + db: self.db, + collection: self.collection, + filter: self.filter, + options: self.options, + session: ExplicitSession(value.into()), + } + } +} + /// Finds a single document in a collection matching a filter. Construct with /// [`Collection::find_one`]. #[must_use] diff --git a/src/bson_compat.rs b/src/bson_compat.rs index dc8263db0..90c5862d4 100644 --- a/src/bson_compat.rs +++ b/src/bson_compat.rs @@ -37,6 +37,7 @@ pub(crate) trait RawDocumentBufExt: Sized { value: impl Into> + 'a, ); + #[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))] fn to_document(&self) -> crate::error::Result; } @@ -50,6 +51,7 @@ impl RawDocumentBufExt for crate::bson::RawDocumentBuf { self.append(key, value); } + #[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))] fn to_document(&self) -> crate::error::Result { self.try_into().map_err(Into::into) } diff --git a/src/client/executor.rs b/src/client/executor.rs index 674eeca94..ade3b5bd4 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -21,10 +21,7 @@ use crate::otel::OtelFutureStub as _; use crate::{ bson::Document, change_stream::{ - event::ChangeStreamEvent, - session::SessionChangeStream, - ChangeStream, - ChangeStreamData, + event::ChangeStreamEvent, session::SessionChangeStream, ChangeStream, ChangeStreamData, WatchArgs, }, cmap::{ @@ -33,33 +30,20 @@ use crate::{ wire::{next_request_id, Message}, PinnedConnectionHandle, }, - ConnectionPool, - RawCommandResponse, - StreamDescription, + ConnectionPool, RawCommandResponse, StreamDescription, }, cursor::{session::SessionCursor, Cursor, CursorSpecification}, error::{ - Error, - ErrorKind, - Result, - RETRYABLE_WRITE_ERROR, - TRANSIENT_TRANSACTION_ERROR, + Error, ErrorKind, Result, RETRYABLE_WRITE_ERROR, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT, }, event::command::{ - CommandEvent, - CommandFailedEvent, - CommandStartedEvent, - CommandSucceededEvent, + CommandEvent, CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent, }, hello::LEGACY_HELLO_COMMAND_NAME_LOWERCASE, operation::{ aggregate::{change_stream::ChangeStreamAggregate, AggregateTarget}, - AbortTransaction, - CommandErrorBody, - CommitTransaction, - ExecutionContext, - Operation, + AbortTransaction, CommandErrorBody, CommitTransaction, ExecutionContext, Operation, Retryability, }, options::{ChangeStreamOptions, SelectionCriteria}, @@ -211,6 +195,61 @@ impl Client { .await } + pub(crate) async fn execute_raw_batch_cursor_operation( + &self, + mut op: impl BorrowMut, + ) -> Result + where + Op: Operation, + { + Box::pin(async { + let mut details = self + .execute_operation_with_details(op.borrow_mut(), None) + .await?; + // Mirror pinning logic without a CursorSpecification. + let pinned = if self.is_load_balanced() && details.output.info.id != 0 { + Some(details.connection.pin()?) + } else { + None + }; + Ok(crate::cursor::raw_batch::RawBatchCursor::new( + self.clone(), + details.output, + details.implicit_session, + pinned, + )) + }) + .await + } + + pub(crate) async fn execute_session_raw_batch_cursor_operation( + &self, + mut op: impl BorrowMut, + session: &mut ClientSession, + ) -> Result + where + Op: Operation, + { + let mut details = self + .execute_operation_with_details(op.borrow_mut(), &mut *session) + .await?; + + // Prefer the transaction's pinned connection if present; otherwise mirror load-balanced + // pinning. + let pinned = if let Some(handle) = session.transaction.pinned_connection() { + Some(handle.replicate()) + } else if self.is_load_balanced() && details.output.info.id != 0 { + Some(details.connection.pin()?) + } else { + None + }; + Ok(crate::cursor::raw_batch::SessionRawBatchCursor::new( + self.clone(), + details.output, + pinned, + )) + } + pub(crate) async fn execute_session_cursor_operation( &self, mut op: impl BorrowMut, @@ -657,15 +696,30 @@ impl Client { effective_criteria: effective_criteria.clone(), }; - match op.handle_response(&response, context).await { - Ok(response) => Ok(response), - Err(mut err) => { - err.add_labels_and_update_pin( - Some(connection.stream_description()?), - session, - Some(retryability), - ); - Err(err.with_server_response(&response)) + if op.wants_owned_response() { + match op.handle_response_owned(response, context).await { + Ok(output) => Ok(output), + Err(mut err) => { + err.add_labels_and_update_pin( + Some(connection.stream_description()?), + session, + Some(retryability), + ); + // Cannot attach server response; it was moved. + Err(err) + } + } + } else { + match op.handle_response(&response, context).await { + Ok(output) => Ok(output), + Err(mut err) => { + err.add_labels_and_update_pin( + Some(connection.stream_description()?), + session, + Some(retryability), + ); + Err(err.with_server_response(&response)) + } } } } diff --git a/src/cursor.rs b/src/cursor.rs index a1605ad54..2c06261eb 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,4 +1,5 @@ mod common; +pub mod raw_batch; pub(crate) mod session; #[cfg(test)] diff --git a/src/cursor/raw_batch.rs b/src/cursor/raw_batch.rs new file mode 100644 index 000000000..be86bf00d --- /dev/null +++ b/src/cursor/raw_batch.rs @@ -0,0 +1,484 @@ +//! Raw batch cursor API for zero-copy document processing. +//! +//! This module provides a high-performance alternative to the standard cursor API when you need +//! direct access to server response batches without per-document deserialization overhead. +//! +//! # When to Use +//! +//! **Use `find_raw_batches()` when:** +//! - Processing high-volume queries where deserialization is a bottleneck +//! - Implementing custom batch-level logic (e.g., batch transformation, filtering) +//! - Inspecting raw BSON structure without a known schema +//! - Forwarding documents without modification (e.g., proxying, caching) +//! +//! **Use regular `find()` when:** +//! - Working with strongly-typed `Deserialize` documents +//! - Iterating one document at a time +//! - Deserialization overhead is acceptable for your use case +//! +//! # Example +//! +//! ```no_run +//! # use mongodb::{Client, bson::doc}; +//! # async fn example() -> mongodb::error::Result<()> { +//! # let client = Client::with_uri_str("mongodb://localhost:27017").await?; +//! # let db = client.database("db"); +//! use futures::stream::StreamExt; +//! +//! let mut cursor = db.find_raw_batches("coll", doc! {}).await?; +//! while let Some(batch) = cursor.next().await { +//! let batch = batch?; +//! // Zero-copy access to documents in this batch +//! for doc_result in batch.doc_slices()? { +//! let doc = doc_result?; +//! // Process raw document +//! } +//! } +//! # Ok(()) +//! # } +//! ``` + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use crate::bson::{RawArray, RawBsonRef, RawDocument}; +use futures_core::{future::BoxFuture, Future, Stream}; + +use crate::{ + bson::RawDocumentBuf, + change_stream::event::ResumeToken, + client::{options::ServerAddress, AsyncDropToken}, + cmap::conn::PinnedConnectionHandle, + cursor::common::{ + kill_cursor, ClientSessionHandle, ExplicitClientSessionHandle, ImplicitClientSessionHandle, + PinnedConnection, + }, + error::{Error, ErrorKind, Result}, + operation::get_more_raw::GetMoreRaw, + Client, ClientSession, +}; + +use super::common::CursorInformation; + +#[derive(Debug, Clone)] +pub(crate) struct RawBatchCursorSpecification { + pub(crate) info: CursorInformation, + pub(crate) initial_reply: RawDocumentBuf, + pub(crate) post_batch_resume_token: Option, +} + +/// A raw batch response returned by the server for a cursor getMore/find. +/// +/// This provides zero-copy access to the server's batch array via [`doc_slices`](RawBatch::doc_slices). +#[derive(Debug)] +pub struct RawBatch { + reply: RawDocumentBuf, +} + +impl RawBatch { + pub(crate) fn new(reply: RawDocumentBuf) -> Self { + Self { reply } + } + + /// Returns a borrowed view of the batch array (`firstBatch` or `nextBatch`) without copying. + /// + /// This lets callers iterate over [`crate::bson::RawDocument`] items directly for maximal + /// performance. + pub fn doc_slices(&self) -> Result<&RawArray> { + let root = self.reply.as_ref(); + let cursor = root + .get("cursor")? + .and_then(RawBsonRef::as_document) + .ok_or_else(|| Error::invalid_response("missing cursor subdocument"))?; + + let docs = cursor + .get("firstBatch")? + .or_else(|| cursor.get("nextBatch").ok().flatten()) + .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch"))?; + + docs.as_array() + .ok_or_else(|| Error::invalid_response("missing firstBatch/nextBatch")) + } + + /// Returns a reference to the full server response document. + /// + /// This provides access to all fields in the server's response, including cursor metadata, + /// for debugging or custom parsing. + pub fn as_raw_document(&self) -> &RawDocument { + self.reply.as_ref() + } +} + +pub struct RawBatchCursor { + client: Client, + drop_token: AsyncDropToken, + info: CursorInformation, + state: RawBatchCursorState, + drop_address: Option, +} + +struct RawBatchCursorState { + exhausted: bool, + pinned_connection: PinnedConnection, + post_batch_resume_token: Option, + provider: GetMoreRawProvider<'static, ImplicitClientSessionHandle>, + initial_reply: Option, +} + +impl RawBatchCursor { + pub(crate) fn new( + client: Client, + spec: RawBatchCursorSpecification, + session: Option, + pin: Option, + ) -> Self { + let exhausted = spec.info.id == 0; + Self { + client: client.clone(), + drop_token: client.register_async_drop(), + info: spec.info, + drop_address: None, + state: RawBatchCursorState { + exhausted, + pinned_connection: PinnedConnection::new(pin), + post_batch_resume_token: spec.post_batch_resume_token, + provider: if exhausted { + GetMoreRawProvider::Done + } else { + GetMoreRawProvider::Idle(Box::new(ImplicitClientSessionHandle(session))) + }, + initial_reply: Some(spec.initial_reply), + }, + } + } + + pub fn address(&self) -> &ServerAddress { + &self.info.address + } + + pub fn set_drop_address(&mut self, address: ServerAddress) { + self.drop_address = Some(address); + } + + pub fn is_exhausted(&self) -> bool { + self.state.exhausted + } + + fn mark_exhausted(&mut self) { + self.state.exhausted = true; + self.state.pinned_connection = PinnedConnection::Unpinned; + } +} + +impl Stream for RawBatchCursor { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + // If a getMore is in flight, poll it and update state. + if let Some(future) = self.state.provider.executing_future() { + match Pin::new(future).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + self.state.initial_reply = Some(out.raw_reply); + self.state.post_batch_resume_token = out.post_batch_resume_token; + if out.exhausted { + self.mark_exhausted(); + } + if out.id != 0 { + self.info.id = out.id; + } + self.info.ns = out.ns; + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.mark_exhausted(); + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + let exhausted_now = self.state.exhausted; + self.state + .provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } + + // Yield any buffered reply. + if let Some(reply) = self.state.initial_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } + + // If not exhausted and the connection is valid, start a getMore and iterate. + if !self.state.exhausted + && !matches!(self.state.pinned_connection, PinnedConnection::Invalid(_)) + { + let info = self.info.clone(); + let client = self.client.clone(); + let state = &mut self.state; + state + .provider + .start_execution(info, client, state.pinned_connection.handle()); + continue; + } + + // Otherwise, we're done. + return Poll::Ready(None); + } + } +} + +impl Drop for RawBatchCursor { + fn drop(&mut self) { + if self.is_exhausted() { + return; + } + kill_cursor( + self.client.clone(), + &mut self.drop_token, + &self.info.ns, + self.info.id, + self.state.pinned_connection.replicate(), + self.drop_address.take(), + #[cfg(test)] + None, + ); + } +} + +#[derive(Debug)] +pub struct SessionRawBatchCursor { + client: Client, + drop_token: AsyncDropToken, + info: CursorInformation, + exhausted: bool, + pinned_connection: PinnedConnection, + post_batch_resume_token: Option, + initial_reply: Option, + drop_address: Option, +} + +impl SessionRawBatchCursor { + pub(crate) fn new( + client: Client, + spec: RawBatchCursorSpecification, + pinned: Option, + ) -> Self { + let exhausted = spec.info.id == 0; + Self { + drop_token: client.register_async_drop(), + client, + info: spec.info, + exhausted, + pinned_connection: PinnedConnection::new(pinned), + post_batch_resume_token: spec.post_batch_resume_token, + initial_reply: Some(spec.initial_reply), + drop_address: None, + } + } + + pub fn stream<'session>( + &mut self, + session: &'session mut ClientSession, + ) -> SessionRawBatchCursorStream<'_, 'session> { + SessionRawBatchCursorStream { + parent: self, + provider: GetMoreRawProvider::Idle(Box::new(ExplicitClientSessionHandle(session))), + } + } + + pub fn address(&self) -> &ServerAddress { + &self.info.address + } + + pub fn set_drop_address(&mut self, address: ServerAddress) { + self.drop_address = Some(address); + } + + pub fn is_exhausted(&self) -> bool { + self.exhausted + } +} + +impl Drop for SessionRawBatchCursor { + fn drop(&mut self) { + if self.is_exhausted() { + return; + } + kill_cursor( + self.client.clone(), + &mut self.drop_token, + &self.info.ns, + self.info.id, + self.pinned_connection.replicate(), + self.drop_address.take(), + #[cfg(test)] + None, + ); + } +} + +pub struct SessionRawBatchCursorStream<'cursor, 'session> { + parent: &'cursor mut SessionRawBatchCursor, + provider: GetMoreRawProvider<'session, ExplicitClientSessionHandle<'session>>, +} + +impl Stream for SessionRawBatchCursorStream<'_, '_> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + // If a getMore is in flight, poll it and update state. + if let Some(future) = self.provider.executing_future() { + match Pin::new(future).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(get_more_out) => { + match get_more_out.result { + Ok(out) => { + if out.exhausted { + self.parent.exhausted = true; + } + if out.id != 0 { + self.parent.info.id = out.id; + } + self.parent.info.ns = out.ns; + self.parent.post_batch_resume_token = out.post_batch_resume_token; + // Buffer next reply to yield on following polls. + self.parent.initial_reply = Some(out.raw_reply); + } + Err(e) => { + if matches!(*e.kind, ErrorKind::Command(ref ce) if ce.code == 43 || ce.code == 237) + { + self.parent.exhausted = true; + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + return Poll::Ready(Some(Err(e))); + } + } + let exhausted_now = self.parent.exhausted; + self.provider + .clear_execution(get_more_out.session, exhausted_now); + } + } + } + + // Yield any buffered reply. + if let Some(reply) = self.parent.initial_reply.take() { + return Poll::Ready(Some(Ok(RawBatch::new(reply)))); + } + + // If not exhausted and the connection is valid, start a getMore and iterate. + if !self.parent.exhausted + && !matches!(self.parent.pinned_connection, PinnedConnection::Invalid(_)) + { + let info = self.parent.info.clone(); + let client = self.parent.client.clone(); + let pinned_owned = self + .parent + .pinned_connection + .handle() + .map(|c| c.replicate()); + let pinned_ref = pinned_owned.as_ref(); + self.provider.start_execution(info, client, pinned_ref); + continue; + } + + // Otherwise, we're done. + return Poll::Ready(None); + } + } +} +pub struct GetMoreRawResultAndSession { + pub result: Result, + pub session: S, +} + +enum GetMoreRawProvider<'s, S> { + Executing(BoxFuture<'s, GetMoreRawResultAndSession>), + Idle(Box), + Done, +} + +impl<'s, S: ClientSessionHandle<'s>> GetMoreRawProvider<'s, S> { + fn executing_future(&mut self) -> Option<&mut BoxFuture<'s, GetMoreRawResultAndSession>> { + if let Self::Executing(future) = self { + Some(future) + } else { + None + } + } + + fn clear_execution(&mut self, session: S, exhausted: bool) { + if exhausted && session.is_implicit() { + *self = Self::Done + } else { + *self = Self::Idle(Box::new(session)) + } + } + + fn start_execution( + &mut self, + info: CursorInformation, + client: Client, + pinned_connection: Option<&PinnedConnectionHandle>, + ) { + take_mut::take(self, |this| { + if let Self::Idle(mut session) = this { + let pinned = pinned_connection.map(|c| c.replicate()); + let fut = Box::pin(async move { + let get_more = GetMoreRaw::new(info, pinned.as_ref()); + let res = client + .execute_operation(get_more, session.borrow_mut()) + .await; + GetMoreRawResultAndSession { + result: res, + session: *session, + } + }); + Self::Executing(fut) + } else { + this + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bson::{doc, Document}; + + #[test] + fn raw_batch_into_docs_works() { + let reply_doc: Document = doc! { + "ok": 1, + "cursor": { + "id": 0_i64, + "ns": "db.coll", + "firstBatch": [ + { "x": 1 }, + { "x": 2 } + ] + } + }; + let mut bytes = Vec::new(); + reply_doc.to_writer(&mut bytes).unwrap(); + let raw = RawDocumentBuf::from_bytes(bytes).unwrap(); + + let batch = RawBatch::new(raw); + let docs: Vec<_> = batch.doc_slices().unwrap().into_iter().collect(); + assert_eq!(docs.len(), 2); + } +} diff --git a/src/db.rs b/src/db.rs index f9da393d7..8596cdfda 100644 --- a/src/db.rs +++ b/src/db.rs @@ -142,4 +142,27 @@ impl Database { pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) } + + /// Finds the documents in a collection and returns raw server batches. + /// + /// This method returns raw BSON batches without deserializing individual documents, + /// providing zero-copy access for high-performance scenarios. + /// + /// `await` will return [`Result`] (or + /// [`Result`] if a session is provided). + /// + /// See the `cursor::raw_batch` module documentation for usage guidance. + pub fn find_raw_batches( + &self, + collection: impl Into, + filter: crate::bson::Document, + ) -> crate::action::FindRawBatches<'_> { + crate::action::FindRawBatches { + db: self, + collection: collection.into(), + filter, + options: None, + session: crate::action::ImplicitSession, + } + } } diff --git a/src/db/action.rs b/src/db/action.rs index ba61d596b..8306658f4 100644 --- a/src/db/action.rs +++ b/src/db/action.rs @@ -1 +1,2 @@ pub(crate) mod create_collection; +pub(crate) mod find_raw; diff --git a/src/db/action/find_raw.rs b/src/db/action/find_raw.rs new file mode 100644 index 000000000..40b6c91b1 --- /dev/null +++ b/src/db/action/find_raw.rs @@ -0,0 +1,50 @@ +use crate::{ + action::{action_impl, FindRawBatches}, + error::Result, + Namespace, +}; + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, crate::action::ImplicitSession> { + type Future = FindRawBatchesFuture; + + async fn execute(mut self) -> Result { + resolve_options!(self.db, self.options, [read_concern, selection_criteria]); + + let ns = Namespace { + db: self.db.name().to_string(), + coll: self.collection, + }; + + let op = crate::operation::find_raw::FindRaw::new(ns, self.filter, self.options); + self.db + .client() + .execute_raw_batch_cursor_operation(op) + .await + } +} + +#[action_impl] +impl<'a> Action for FindRawBatches<'a, crate::action::ExplicitSession<'a>> { + type Future = FindRawBatchesSessionFuture; + + async fn execute(mut self) -> Result { + resolve_read_concern_with_session!(self.db, self.options, Some(&mut *self.session.0))?; + resolve_selection_criteria_with_session!( + self.db, + self.options, + Some(&mut *self.session.0) + )?; + + let ns = Namespace { + db: self.db.name().to_string(), + coll: self.collection, + }; + + let op = crate::operation::find_raw::FindRaw::new(ns, self.filter, self.options); + self.db + .client() + .execute_session_raw_batch_cursor_operation(op, self.session.0) + .await + } +} diff --git a/src/lib.rs b/src/lib.rs index 6e6c628aa..a10839c4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ pub use crate::{ client::{session::ClientSession, Client}, coll::Collection, cursor::{ + raw_batch::RawBatch, session::{SessionCursor, SessionCursorStream}, Cursor, }, diff --git a/src/operation.rs b/src/operation.rs index b55fa6b69..4c7960457 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -13,7 +13,9 @@ pub(crate) mod drop_database; mod drop_indexes; mod find; pub(crate) mod find_and_modify; +pub(crate) mod find_raw; mod get_more; +pub(crate) mod get_more_raw; mod insert; pub(crate) mod list_collections; pub(crate) mod list_databases; @@ -38,25 +40,15 @@ use crate::{ client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, cmap::{ conn::{pooled::PooledConnection, PinnedConnectionHandle}, - Command, - RawCommandResponse, - StreamDescription, + Command, RawCommandResponse, StreamDescription, }, error::{ - CommandError, - Error, - ErrorKind, - IndexedWriteError, - InsertManyError, - Result, - WriteConcernError, - WriteFailure, + CommandError, Error, ErrorKind, IndexedWriteError, InsertManyError, Result, + WriteConcernError, WriteFailure, }, options::{ClientOptions, WriteConcern}, selection_criteria::SelectionCriteria, - BoxFuture, - ClientSession, - Namespace, + BoxFuture, ClientSession, Namespace, }; pub(crate) use abort_transaction::AbortTransaction; @@ -143,6 +135,27 @@ pub(crate) trait Operation { context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result>; + /// Whether this operation prefers to take ownership of the server response body for + /// zero-copy handling. + /// + /// Operations that parse raw batches (e.g. raw find/getMore) should return `true` and implement + /// [`handle_response_owned`] to avoid cloning the server reply bytes. + fn wants_owned_response(&self) -> bool { + false + } + + /// Interprets the server response taking ownership of the body to enable zero-copy handling. + /// + /// Default behavior delegates to the borrowed [`handle_response`]; operations that return + /// `true` from [`wants_owned_response`] should override this to consume the response. + fn handle_response_owned<'a>( + &'a self, + _response: RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> { + unimplemented!() + } + /// Interpret an error encountered while sending the built command to the server, potentially /// recovering. fn handle_error(&self, error: Error) -> Result; @@ -230,6 +243,36 @@ pub(crate) trait OperationWithDefaults: Send + Sync { async move { self.handle_response(response, context) }.boxed() } + /// Whether this operation prefers to take ownership of the server response body for + /// zero-copy handling. + /// + /// Override to `true` for operations that can consume the response without cloning it. + fn wants_owned_response(&self) -> bool { + false + } + + /// Interprets the server response taking ownership of the body to enable zero-copy handling. + /// + /// Default implementation defers to the borrowed handler; override for true zero-copy handling. + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> Result { + // By default, delegate to borrowed path by re-borrowing. + // Note: default impls that want zero-copy should override this. + self.handle_response(&response, context) + } + + /// Async wrapper for owned-response handling. + fn handle_response_owned_async<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> { + async move { self.handle_response_owned(response, context) }.boxed() + } + /// Interpret an error encountered while sending the built command to the server, potentially /// recovering. fn handle_error(&self, error: Error) -> Result { @@ -309,6 +352,16 @@ where ) -> BoxFuture<'a, Result> { self.handle_response_async(response, context) } + fn wants_owned_response(&self) -> bool { + self.wants_owned_response() + } + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> BoxFuture<'a, Result> { + self.handle_response_owned_async(response, context) + } fn handle_error(&self, error: Error) -> Result { self.handle_error(error) } diff --git a/src/operation/find_raw.rs b/src/operation/find_raw.rs new file mode 100644 index 000000000..b53e689e8 --- /dev/null +++ b/src/operation/find_raw.rs @@ -0,0 +1,171 @@ +use crate::bson::RawDocumentBuf; + +use crate::{ + bson::{rawdoc, Document}, + bson_compat::{cstr, CStr}, + cmap::{Command, RawCommandResponse, StreamDescription}, + cursor::{raw_batch::RawBatchCursorSpecification, CursorInformation}, + error::{Error, Result}, + operation::{ + append_options_to_raw_document, CursorBody, OperationWithDefaults, + SERVER_4_4_0_WIRE_VERSION, + }, + options::{CursorType, FindOptions, SelectionCriteria}, + Namespace, +}; + +use super::ExecutionContext; + +#[derive(Debug)] +pub(crate) struct FindRaw { + ns: Namespace, + filter: Document, + options: Option>, +} + +impl FindRaw { + pub(crate) fn new(ns: Namespace, filter: Document, options: Option) -> Self { + Self { + ns, + filter, + options: options.map(Box::new), + } + } +} + +impl OperationWithDefaults for FindRaw { + type O = RawBatchCursorSpecification; + const NAME: &'static CStr = cstr!("find"); + + fn handle_response<'a>( + &'a self, + _response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + unimplemented!("FindRaw must be handled via owned response path") + } + + fn wants_owned_response(&self) -> bool { + true + } + + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + context: ExecutionContext<'a>, + ) -> Result { + // Parse minimal fields via raw to avoid per-doc copies. + let raw_root = response.raw_body(); + let cursor_doc = raw_root + .get("cursor")? + .and_then(crate::bson::RawBsonRef::as_document) + .ok_or_else(|| Error::invalid_response("missing cursor in response"))?; + + let id = cursor_doc + .get("id")? + .and_then(crate::bson::RawBsonRef::as_i64) + .ok_or_else(|| Error::invalid_response("missing cursor id"))?; + + let ns_str = cursor_doc + .get("ns")? + .and_then(crate::bson::RawBsonRef::as_str) + .ok_or_else(|| Error::invalid_response("missing cursor ns"))?; + let ns = Namespace::from_str(ns_str) + .ok_or_else(|| Error::invalid_response("invalid cursor ns"))?; + + let post_token_raw = cursor_doc + .get("postBatchResumeToken")? + .and_then(crate::bson::RawBsonRef::as_document) + .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) + .transpose()?; + let post_batch_resume_token = + crate::change_stream::event::ResumeToken::from_raw(post_token_raw); + + let description = context.connection.stream_description()?; + let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION { + None + } else { + self.options.as_ref().and_then(|opts| opts.comment.clone()) + }; + + let info = CursorInformation { + ns, + id, + address: description.server_address.clone(), + batch_size: self.options.as_ref().and_then(|opts| opts.batch_size), + max_time: self.options.as_ref().and_then(|opts| opts.max_await_time), + comment, + }; + + Ok(RawBatchCursorSpecification { + info, + initial_reply: response.into_raw_document_buf(), + post_batch_resume_token, + }) + } + + fn build(&mut self, _description: &StreamDescription) -> Result { + let mut body = rawdoc! { + Self::NAME: self.ns.coll.clone(), + }; + + if let Some(ref mut options) = self.options { + if options.limit.map(|limit| limit < 0) == Some(true) { + body.append(cstr!("singleBatch"), true); + } + + if let Some(ref mut batch_size) = options.batch_size { + if i32::try_from(*batch_size).is_err() { + return Err(Error::invalid_argument( + "the batch size must fit into a signed 32-bit integer", + )); + } + if let Some(limit) = options.limit.and_then(|limit| u32::try_from(limit).ok()) { + if *batch_size == limit { + *batch_size += 1; + } + } + } + + match options.cursor_type { + Some(CursorType::Tailable) => { + body.append(cstr!("tailable"), true); + } + Some(CursorType::TailableAwait) => { + body.append(cstr!("tailable"), true); + body.append(cstr!("awaitData"), true); + } + _ => {} + }; + } + + append_options_to_raw_document(&mut body, self.options.as_ref())?; + + let raw_filter: RawDocumentBuf = (&self.filter).try_into()?; + body.append(cstr!("filter"), raw_filter); + + Ok(Command::new_read( + Self::NAME, + &self.ns.db, + self.options.as_ref().and_then(|o| o.read_concern.clone()), + body, + )) + } + + fn extract_at_cluster_time( + &self, + response: &crate::bson::RawDocument, + ) -> Result> { + CursorBody::extract_at_cluster_time(response) + } + + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + true + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.options + .as_ref() + .and_then(|opts| opts.selection_criteria.as_ref()) + } +} diff --git a/src/operation/get_more_raw.rs b/src/operation/get_more_raw.rs new file mode 100644 index 000000000..d24f1f7ec --- /dev/null +++ b/src/operation/get_more_raw.rs @@ -0,0 +1,152 @@ +use crate::bson::RawDocumentBuf; +use std::time::Duration; + +use crate::{ + bson::{rawdoc, RawBson}, + bson_compat::{cstr, CStr}, + cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, + cursor::CursorInformation, + error::Result, + operation::OperationWithDefaults, + options::SelectionCriteria, + Namespace, +}; + +use super::ExecutionContext; + +#[derive(Debug)] +pub(crate) struct GetMoreRaw<'conn> { + ns: Namespace, + cursor_id: i64, + selection_criteria: SelectionCriteria, + batch_size: Option, + max_time: Option, + pinned_connection: Option<&'conn PinnedConnectionHandle>, + comment: Option, +} + +impl<'conn> GetMoreRaw<'conn> { + pub(crate) fn new( + info: CursorInformation, + pinned: Option<&'conn PinnedConnectionHandle>, + ) -> Self { + Self { + ns: info.ns, + cursor_id: info.id, + selection_criteria: SelectionCriteria::from_address(info.address), + batch_size: info.batch_size, + max_time: info.max_time, + pinned_connection: pinned, + comment: info.comment, + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct GetMoreRawResult { + pub(crate) raw_reply: RawDocumentBuf, + pub(crate) exhausted: bool, + pub(crate) post_batch_resume_token: Option, + pub(crate) ns: Namespace, + pub(crate) id: i64, +} + +impl OperationWithDefaults for GetMoreRaw<'_> { + type O = GetMoreRawResult; + + const NAME: &'static CStr = cstr!("getMore"); + + fn handle_response<'a>( + &'a self, + _response: &'a RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + unimplemented!("GetMoreRaw must be handled via owned response path") + } + + fn wants_owned_response(&self) -> bool { + true + } + + fn handle_response_owned<'a>( + &'a self, + response: RawCommandResponse, + _context: ExecutionContext<'a>, + ) -> Result { + // Extract minimal fields directly from the raw reply to avoid walking the batch via serde. + let root = response.raw_body(); + let cursor = root + .get("cursor")? + .and_then(crate::bson::RawBsonRef::as_document) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor subdocument"))?; + + let id = cursor + .get("id")? + .and_then(crate::bson::RawBsonRef::as_i64) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor id"))?; + + let ns_str = cursor + .get("ns")? + .and_then(crate::bson::RawBsonRef::as_str) + .ok_or_else(|| crate::error::Error::invalid_response("missing cursor ns"))?; + let ns = Namespace::from_str(ns_str) + .ok_or_else(|| crate::error::Error::invalid_response("invalid cursor ns"))?; + + let token_raw = cursor + .get("postBatchResumeToken")? + .and_then(crate::bson::RawBsonRef::as_document) + .map(|d| RawDocumentBuf::from_bytes(d.as_bytes().to_vec())) + .transpose()?; + let post_batch_resume_token = crate::change_stream::event::ResumeToken::from_raw(token_raw); + + // Take ownership of the raw bytes without copying. + let raw = response.into_raw_document_buf(); + + Ok(GetMoreRawResult { + raw_reply: raw, + exhausted: id == 0, + post_batch_resume_token, + ns, + id, + }) + } + + fn build(&mut self, _description: &StreamDescription) -> Result { + let mut body = rawdoc! { + Self::NAME: self.cursor_id, + "collection": self.ns.coll.clone(), + }; + + if let Some(batch_size) = self.batch_size { + let batch_size = crate::checked::Checked::from(batch_size).try_into::()?; + if batch_size != 0 { + body.append(cstr!("batchSize"), batch_size); + } + } + + if let Some(ref max_time) = self.max_time { + body.append( + cstr!("maxTimeMS"), + max_time.as_millis().try_into().unwrap_or(i32::MAX), + ); + } + + if let Some(comment) = &self.comment { + let raw_comment: RawBson = comment.clone().try_into()?; + body.append(cstr!("comment"), raw_comment); + } + + Ok(Command::new(Self::NAME, &self.ns.db, body)) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + Some(&self.selection_criteria) + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.pinned_connection + } + + #[cfg(feature = "opentelemetry")] + type Otel = crate::otel::Witness; +} diff --git a/src/test/coll.rs b/src/test/coll.rs index 3e8e41df1..07fac4569 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -143,6 +143,36 @@ async fn find() { } } +#[tokio::test] +#[function_name::named] +async fn find_raw_batches_one() { + let client = Client::for_test().await; + let coll = client + .init_db_and_coll(function_name!(), function_name!()) + .await; + + coll.insert_one(doc! { "x": 1 }).await.unwrap(); + + let db = client.database(function_name!()); + let mut batches = db + .find_raw_batches(function_name!(), doc! { "x": 1 }) + .limit(-1) + .await + .unwrap(); + + // get the first (and only) server batch due to limit -1 + let mut found_one = false; + while let Some(batch_res) = batches.next().await { + let batch = batch_res.unwrap(); + let mut iter = batch.doc_slices().unwrap().into_iter(); + let first = iter.next().unwrap().unwrap(); + let doc = Document::try_from(first.as_document().unwrap()).unwrap(); + assert_eq!(doc.get_i32("x").unwrap(), 1); + found_one = true; + } + assert!(found_one); +} + #[tokio::test] #[function_name::named] async fn update() {