Skip to content

Commit 6a4b739

Browse files
authored
RUST-1902 Fix benchmarks for client 3.0 API (#1172)
1 parent a6d629f commit 6a4b739

14 files changed

+65
-86
lines changed

benchmarks/Cargo.toml

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,18 @@ version = "0.1.0"
44
authors = ["benjirewis <[email protected]>"]
55
edition = "2018"
66

7-
[features]
8-
default = ["tokio-runtime"]
9-
tokio-runtime = [
10-
"tokio/fs",
11-
"tokio/macros",
12-
"tokio/rt",
13-
"tokio/rt-multi-thread",
14-
"tokio-stream",
15-
"mongodb/tokio-runtime"
16-
]
17-
async-std-runtime = ["async-std", "mongodb/async-std-runtime"]
18-
197
[dependencies]
20-
mongodb = { path = "..", default-features = false }
8+
mongodb = { path = ".." }
219
serde_json = "1.0.59"
2210
once_cell = "1.19.0"
2311
clap = "2.33.3"
2412
indicatif = "0.15.0"
2513
async-trait = "0.1.41"
26-
tokio = { version = "1.6", features = ["sync"] }
14+
tokio = { version = "1.6", features = ["sync", "fs"] }
2715
tokio-util = "0.7"
28-
tokio-stream = { version = "0.1.6", features = ["io-util"], optional = true }
29-
# "unstable" feature is needed for `spawn_blocking`, which is only used in task setup
30-
async-std = { version = "1.9.0", optional = true, features = ["attributes", "unstable"] }
16+
tokio-stream = { version = "0.1.6", features = ["io-util"] }
3117
futures = "0.3.8"
3218
anyhow = "1.0.34"
3319
serde = "1"
3420
num_enum = "0.5"
21+
futures-util = "0.3.30"

benchmarks/src/bench.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ pub async fn drop_database(uri: &str, database: &str) -> Result<()> {
149149

150150
let hello = client
151151
.database("admin")
152-
.run_command(doc! { "hello": true }, None)
152+
.run_command(doc! { "hello": true })
153153
.await?;
154154

155155
client.database(&database).drop().await?;
@@ -162,10 +162,10 @@ pub async fn drop_database(uri: &str, database: &str) -> Result<()> {
162162
for host in options.hosts {
163163
client
164164
.database("admin")
165-
.run_command(
166-
doc! { "flushRouterConfig": 1 },
167-
SelectionCriteria::Predicate(Arc::new(move |s| s.address() == &host)),
168-
)
165+
.run_command(doc! { "flushRouterConfig": 1 })
166+
.selection_criteria(SelectionCriteria::Predicate(Arc::new(move |s| {
167+
s.address() == &host
168+
})))
169169
.await?;
170170
}
171171
}

benchmarks/src/bench/find_many.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{convert::TryInto, path::PathBuf};
33
use anyhow::{bail, Result};
44
use futures::stream::StreamExt;
55
use mongodb::{
6-
bson::{Bson, Document, RawDocumentBuf},
6+
bson::{doc, Bson, Document, RawDocumentBuf},
77
Client,
88
Collection,
99
Database,
@@ -59,7 +59,7 @@ impl Benchmark for FindManyBenchmark {
5959

6060
let coll = db.collection(&COLL_NAME);
6161
let docs = vec![doc.clone(); num_iter];
62-
coll.insert_many(docs, None).await?;
62+
coll.insert_many(docs).await?;
6363

6464
Ok(FindManyBenchmark {
6565
db,
@@ -74,7 +74,7 @@ impl Benchmark for FindManyBenchmark {
7474
bench: &FindManyBenchmark,
7575
) -> Result<()> {
7676
let coll = bench.coll.clone_with_type::<T>();
77-
let mut cursor = coll.find(None, None).await?;
77+
let mut cursor = coll.find(doc! {}).await?;
7878
while let Some(doc) = cursor.next().await {
7979
doc?;
8080
}

benchmarks/src/bench/find_one.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl Benchmark for FindOneBenchmark {
5050
let coll = db.collection(&COLL_NAME);
5151
for i in 0..num_iter {
5252
doc.insert("_id", i as i32);
53-
coll.insert_one(doc.clone(), None).await?;
53+
coll.insert_one(doc.clone()).await?;
5454
}
5555

5656
Ok(FindOneBenchmark {
@@ -63,9 +63,7 @@ impl Benchmark for FindOneBenchmark {
6363

6464
async fn do_task(&self) -> Result<()> {
6565
for i in 0..self.num_iter {
66-
self.coll
67-
.find_one(Some(doc! { "_id": i as i32 }), None)
68-
.await?;
66+
self.coll.find_one(doc! { "_id": i as i32 }).await?;
6967
}
7068

7169
Ok(())

benchmarks/src/bench/gridfs_download.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::PathBuf;
22

33
use anyhow::{Context, Result};
4+
use futures::{AsyncReadExt, AsyncWriteExt};
45
use mongodb::{bson::Bson, gridfs::GridFsBucket, Client};
56

67
use crate::{
@@ -33,24 +34,25 @@ impl Benchmark for GridFsDownloadBenchmark {
3334
let bucket = db.gridfs_bucket(None);
3435

3536
let file = open_async_read_compat(&options.path).await?;
36-
let file_id = bucket
37-
.upload_from_futures_0_3_reader("gridfstest", file, None)
38-
.await
39-
.context("upload file")?;
37+
let mut upload = bucket.open_upload_stream("gridfstest").await?;
38+
let file_id = upload.id().clone();
39+
futures_util::io::copy(file, &mut upload).await?;
40+
upload.close().await?;
4041

4142
Ok(Self {
4243
uri: options.uri,
4344
bucket,
44-
file_id: file_id.into(),
45+
file_id,
4546
})
4647
}
4748

4849
async fn do_task(&self) -> Result<()> {
4950
let mut buf = vec![];
50-
self.bucket
51-
.download_to_futures_0_3_writer(self.file_id.clone(), &mut buf)
52-
.await
53-
.context("download file")?;
51+
let mut download = self
52+
.bucket
53+
.open_download_stream(self.file_id.clone())
54+
.await?;
55+
download.read_to_end(&mut buf).await?;
5456

5557
Ok(())
5658
}

benchmarks/src/bench/gridfs_multi_download.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
use anyhow::{Context, Result};
77
use futures::AsyncWriteExt;
8-
use mongodb::{bson::oid::ObjectId, gridfs::GridFsBucket, Client};
8+
use mongodb::{bson::Bson, gridfs::GridFsBucket, Client};
99
use once_cell::sync::Lazy;
1010

1111
use crate::{
@@ -19,7 +19,7 @@ static DOWNLOAD_PATH: Lazy<PathBuf> =
1919
pub struct GridFsMultiDownloadBenchmark {
2020
uri: String,
2121
bucket: GridFsBucket,
22-
ids: Vec<ObjectId>,
22+
ids: Vec<Bson>,
2323
}
2424

2525
pub struct Options {
@@ -46,10 +46,12 @@ impl Benchmark for GridFsMultiDownloadBenchmark {
4646
let path = entry?.path();
4747

4848
let file = open_async_read_compat(&path).await?;
49-
let id = bucket
50-
.upload_from_futures_0_3_reader(path.display().to_string(), file, None)
51-
.await
52-
.context("upload file")?;
49+
let mut upload = bucket
50+
.open_upload_stream(path.display().to_string())
51+
.await?;
52+
let id = upload.id().clone();
53+
futures_util::io::copy(file, &mut upload).await?;
54+
upload.close().await?;
5355
ids.push(id);
5456
}
5557

@@ -64,7 +66,7 @@ impl Benchmark for GridFsMultiDownloadBenchmark {
6466

6567
async fn before_task(&mut self) -> Result<()> {
6668
for id in &self.ids {
67-
let path = get_filename(id.clone());
69+
let path = get_filename(&id);
6870
if Path::try_exists(&path)? {
6971
remove_file(path)?;
7072
}
@@ -81,15 +83,12 @@ impl Benchmark for GridFsMultiDownloadBenchmark {
8183
let id = id.clone();
8284

8385
tasks.push(crate::spawn(async move {
84-
let download_path = get_filename(id);
86+
let download_path = get_filename(&id);
8587
let mut file = open_async_write_compat(&download_path)
8688
.await
8789
.context("open file")?;
88-
89-
bucket
90-
.download_to_futures_0_3_writer(id.into(), &mut file)
91-
.await
92-
.context("download file")?;
90+
let download = bucket.open_download_stream(id).await?;
91+
futures_util::io::copy(download, &mut file).await?;
9392

9493
file.flush().await?;
9594

@@ -114,6 +113,6 @@ impl Benchmark for GridFsMultiDownloadBenchmark {
114113
}
115114
}
116115

117-
fn get_filename(id: ObjectId) -> PathBuf {
116+
fn get_filename(id: &Bson) -> PathBuf {
118117
DOWNLOAD_PATH.join(format!("file{}.txt", id))
119118
}

benchmarks/src/bench/gridfs_multi_upload.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{fs::read_dir, path::PathBuf};
22

33
use anyhow::{Context, Result};
4+
use futures::AsyncWriteExt;
45
use mongodb::{gridfs::GridFsBucket, Client};
56

67
use crate::{
@@ -39,11 +40,9 @@ impl Benchmark for GridFsMultiUploadBenchmark {
3940

4041
async fn before_task(&mut self) -> Result<()> {
4142
self.bucket.drop().await.context("bucket drop")?;
42-
43-
self.bucket
44-
.upload_from_futures_0_3_reader("beforetask", &[11u8][..], None)
45-
.await
46-
.context("single byte upload")?;
43+
let mut upload = self.bucket.open_upload_stream("beforetask").await?;
44+
upload.write_all(&[11u8][..]).await?;
45+
upload.close().await?;
4746

4847
Ok(())
4948
}
@@ -57,10 +56,11 @@ impl Benchmark for GridFsMultiUploadBenchmark {
5756
tasks.push(crate::spawn(async move {
5857
let path = entry?.path();
5958
let file = open_async_read_compat(&path).await?;
60-
bucket
61-
.upload_from_futures_0_3_reader(path.display().to_string(), file, None)
62-
.await
63-
.context("upload file")?;
59+
let mut upload = bucket
60+
.open_upload_stream(path.display().to_string())
61+
.await?;
62+
futures_util::io::copy(file, &mut upload).await?;
63+
upload.close().await?;
6464

6565
let ok: anyhow::Result<()> = Ok(());
6666
ok

benchmarks/src/bench/gridfs_upload.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::path::PathBuf;
22

33
use anyhow::{Context, Result};
4-
use futures::AsyncReadExt;
4+
use futures::{AsyncReadExt, AsyncWriteExt};
55
use mongodb::{gridfs::GridFsBucket, Client};
66

77
use crate::{
@@ -44,20 +44,17 @@ impl Benchmark for GridFsUploadBenchmark {
4444

4545
async fn before_task(&mut self) -> Result<()> {
4646
self.bucket.drop().await.context("bucket drop")?;
47-
48-
self.bucket
49-
.upload_from_futures_0_3_reader("beforetask", &[11u8][..], None)
50-
.await
51-
.context("single byte upload")?;
47+
let mut upload = self.bucket.open_upload_stream("beforetask").await?;
48+
upload.write_all(&[11u8][..]).await?;
49+
upload.close().await?;
5250

5351
Ok(())
5452
}
5553

5654
async fn do_task(&self) -> Result<()> {
57-
self.bucket
58-
.upload_from_futures_0_3_reader("gridfstest", &self.bytes[..], None)
59-
.await
60-
.context("upload bytes")?;
55+
let mut upload = self.bucket.open_upload_stream("gridfstest").await?;
56+
upload.write_all(&self.bytes[..]).await?;
57+
upload.close().await?;
6158

6259
Ok(())
6360
}

benchmarks/src/bench/insert_many.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl Benchmark for InsertManyBenchmark {
6464
async fn before_task(&mut self) -> Result<()> {
6565
self.coll.drop().await?;
6666
self.db
67-
.create_collection(COLL_NAME.as_str(), None)
67+
.create_collection(COLL_NAME.as_str())
6868
.await
6969
.context("create in before")?;
7070

@@ -74,7 +74,7 @@ impl Benchmark for InsertManyBenchmark {
7474
async fn do_task(&self) -> Result<()> {
7575
let insertions = vec![&self.doc; self.num_copies];
7676
self.coll
77-
.insert_many(insertions, None)
77+
.insert_many(insertions)
7878
.await
7979
.context("insert many")?;
8080

benchmarks/src/bench/insert_one.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl Benchmark for InsertOneBenchmark {
6464
async fn before_task(&mut self) -> Result<()> {
6565
self.coll.drop().await?;
6666
self.db
67-
.create_collection(COLL_NAME.as_str(), None)
67+
.create_collection(COLL_NAME.as_str())
6868
.await
6969
.context("create collection")?;
7070

@@ -74,7 +74,7 @@ impl Benchmark for InsertOneBenchmark {
7474
async fn do_task(&self) -> Result<()> {
7575
for _ in 0..self.num_iter {
7676
self.coll
77-
.insert_one(&self.doc, None)
77+
.insert_one(&self.doc)
7878
.await
7979
.context("insert one")?;
8080
}

0 commit comments

Comments
 (0)