Skip to content

Commit 7f798b0

Browse files
authored
RUST-1855 Propagate cluster time for unified tests (#1053)
1 parent 862a8e6 commit 7f798b0

File tree

3 files changed

+49
-68
lines changed

3 files changed

+49
-68
lines changed

src/test/coll.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@ use crate::{
3232
WriteConcern,
3333
},
3434
results::DeleteResult,
35-
test::{
36-
get_client_options,
37-
log_uncaptured,
38-
util::{drop_collection, TestClient},
39-
},
35+
test::{get_client_options, log_uncaptured, util::TestClient},
4036
Collection,
4137
IndexModel,
4238
};
@@ -209,7 +205,7 @@ async fn aggregate_out() {
209205
let db = client.database(function_name!());
210206
let coll = db.collection(function_name!());
211207

212-
drop_collection(&coll).await;
208+
coll.drop().await.unwrap();
213209

214210
let result = coll
215211
.insert_many((0i32..5).map(|n| doc! { "x": n }).collect::<Vec<_>>())
@@ -226,7 +222,7 @@ async fn aggregate_out() {
226222
},
227223
doc! {"$out": out_coll.name()},
228224
];
229-
drop_collection(&out_coll).await;
225+
out_coll.drop().await.unwrap();
230226

231227
coll.aggregate(pipeline.clone()).await.unwrap();
232228
assert!(db
@@ -235,7 +231,7 @@ async fn aggregate_out() {
235231
.unwrap()
236232
.into_iter()
237233
.any(|name| name.as_str() == out_coll.name()));
238-
drop_collection(&out_coll).await;
234+
out_coll.drop().await.unwrap();
239235

240236
// check that even with a batch size of 0, a new collection is created.
241237
coll.aggregate(pipeline).batch_size(0).await.unwrap();
@@ -262,7 +258,7 @@ async fn kill_cursors_on_drop() {
262258
let db = client.database(function_name!());
263259
let coll = db.collection(function_name!());
264260

265-
drop_collection(&coll).await;
261+
coll.drop().await.unwrap();
266262

267263
coll.insert_many(vec![doc! { "x": 1 }, doc! { "x": 2 }])
268264
.await
@@ -295,7 +291,7 @@ async fn no_kill_cursors_on_exhausted() {
295291
let db = client.database(function_name!());
296292
let coll = db.collection(function_name!());
297293

298-
drop_collection(&coll).await;
294+
coll.drop().await.unwrap();
299295

300296
coll.insert_many(vec![doc! { "x": 1 }, doc! { "x": 2 }])
301297
.await

src/test/spec/unified_runner/test_runner.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,9 @@ use tokio::sync::{mpsc, RwLock};
77
use crate::{
88
bson::{doc, Document},
99
client::options::ClientOptions,
10-
concern::{Acknowledgment, WriteConcern},
10+
concern::WriteConcern,
1111
gridfs::GridFsBucket,
12-
options::{
13-
CollectionOptions,
14-
CreateCollectionOptions,
15-
ReadConcern,
16-
ReadPreference,
17-
SelectionCriteria,
18-
},
12+
options::{CollectionOptions, ReadConcern, ReadPreference, SelectionCriteria},
1913
runtime,
2014
sdam::{TopologyDescription, MIN_HEARTBEAT_FREQUENCY},
2115
test::{
@@ -35,6 +29,8 @@ use crate::{
3529
SERVERLESS,
3630
SERVER_API,
3731
},
32+
ClientSession,
33+
ClusterTime,
3834
Collection,
3935
Database,
4036
};
@@ -82,6 +78,7 @@ pub(crate) struct TestRunner {
8278
pub(crate) internal_client: TestClient,
8379
pub(crate) entities: Arc<RwLock<EntityMap>>,
8480
pub(crate) fail_point_guards: Arc<RwLock<Vec<FailPointGuard>>>,
81+
pub(crate) cluster_time: Arc<RwLock<Option<ClusterTime>>>,
8582
}
8683

8784
impl TestRunner {
@@ -90,6 +87,7 @@ impl TestRunner {
9087
internal_client: TestClient::new().await,
9188
entities: Default::default(),
9289
fail_point_guards: Default::default(),
90+
cluster_time: Default::default(),
9391
}
9492
}
9593

@@ -99,6 +97,7 @@ impl TestRunner {
9997
internal_client: TestClient::with_options(Some(options)).await,
10098
entities: Arc::new(RwLock::new(EntityMap::new())),
10199
fail_point_guards: Arc::new(RwLock::new(Vec::new())),
100+
cluster_time: Default::default(),
102101
}
103102
}
104103

@@ -205,9 +204,11 @@ impl TestRunner {
205204
log_uncaptured(format!("Executing {:?}", &test_case.description));
206205

207206
if let Some(ref initial_data) = test_file.initial_data {
207+
let mut session = self.internal_client.start_session().await.unwrap();
208208
for data in initial_data {
209-
self.insert_initial_data(data).await;
209+
self.insert_initial_data(data, &mut session).await;
210210
}
211+
*self.cluster_time.write().await = session.cluster_time().cloned();
211212
}
212213

213214
self.entities.write().await.clear();
@@ -370,33 +371,37 @@ impl TestRunner {
370371
}
371372
}
372373

373-
pub(crate) async fn insert_initial_data(&self, data: &CollectionData) {
374-
let write_concern = WriteConcern::builder().w(Acknowledgment::Majority).build();
375-
374+
pub(crate) async fn insert_initial_data(
375+
&self,
376+
data: &CollectionData,
377+
session: &mut ClientSession,
378+
) {
376379
if !data.documents.is_empty() {
377380
let collection_options = CollectionOptions::builder()
378-
.write_concern(write_concern)
381+
.write_concern(WriteConcern::majority())
379382
.build();
383+
let coll = self.internal_client.get_coll_with_options(
384+
&data.database_name,
385+
&data.collection_name,
386+
collection_options,
387+
);
388+
coll.drop().session(&mut *session).await.unwrap();
389+
coll.insert_many(data.documents.clone())
390+
.session(session)
391+
.await
392+
.unwrap();
393+
} else {
380394
let coll = self
381395
.internal_client
382-
.init_db_and_coll_with_options(
383-
&data.database_name,
384-
&data.collection_name,
385-
collection_options,
386-
)
387-
.await;
388-
coll.insert_many(data.documents.clone()).await.unwrap();
389-
} else {
390-
let collection_options = CreateCollectionOptions::builder()
391-
.write_concern(write_concern)
392-
.build();
396+
.get_coll(&data.database_name, &data.collection_name);
397+
coll.drop().session(&mut *session).await.unwrap();
393398
self.internal_client
394-
.create_fresh_collection(
395-
&data.database_name,
396-
&data.collection_name,
397-
collection_options,
398-
)
399-
.await;
399+
.database(&data.database_name)
400+
.create_collection(&data.collection_name)
401+
.session(&mut *session)
402+
.write_concern(WriteConcern::majority())
403+
.await
404+
.unwrap();
400405
}
401406
}
402407

@@ -526,11 +531,14 @@ impl TestRunner {
526531
TestFileEntity::Session(session) => {
527532
let id = session.id.clone();
528533
let client = self.get_client(&session.client).await;
529-
let client_session = client
534+
let mut client_session = client
530535
.start_session()
531536
.with_options(session.session_options.clone())
532537
.await
533538
.unwrap();
539+
if let Some(time) = &*self.cluster_time.read().await {
540+
client_session.advance_cluster_time(time);
541+
}
534542
(id, Entity::Session(SessionEntity::new(client_session)))
535543
}
536544
TestFileEntity::Bucket(bucket) => {

src/test/util.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{fmt::Debug, time::Duration};
3737

3838
use super::get_client_options;
3939
use crate::{
40-
error::{CommandError, ErrorKind, Result},
40+
error::Result,
4141
options::{AuthMechanism, ClientOptions, CollectionOptions, CreateCollectionOptions},
4242
test::{
4343
update_options_for_testing,
@@ -257,7 +257,7 @@ impl TestClient {
257257
coll_name: &str,
258258
) -> Collection<Document> {
259259
let coll = self.get_coll(db_name, coll_name);
260-
drop_collection(&coll).await;
260+
coll.drop().await.unwrap();
261261
coll
262262
}
263263

@@ -270,7 +270,7 @@ impl TestClient {
270270
T: Serialize + DeserializeOwned + Unpin + Debug + Send + Sync,
271271
{
272272
let coll = self.database(db_name).collection(coll_name);
273-
drop_collection(&coll).await;
273+
coll.drop().await.unwrap();
274274
coll
275275
}
276276

@@ -284,17 +284,6 @@ impl TestClient {
284284
.collection_with_options(coll_name, options)
285285
}
286286

287-
pub(crate) async fn init_db_and_coll_with_options(
288-
&self,
289-
db_name: &str,
290-
coll_name: &str,
291-
options: CollectionOptions,
292-
) -> Collection<Document> {
293-
let coll = self.get_coll_with_options(db_name, coll_name, options);
294-
drop_collection(&coll).await;
295-
coll
296-
}
297-
298287
pub(crate) async fn create_fresh_collection(
299288
&self,
300289
db_name: &str,
@@ -410,7 +399,7 @@ impl TestClient {
410399

411400
pub(crate) async fn drop_collection(&self, db_name: &str, coll_name: &str) {
412401
let coll = self.get_coll(db_name, coll_name);
413-
drop_collection(&coll).await;
402+
coll.drop().await.unwrap();
414403
}
415404

416405
/// Returns the `Topology' that can be determined without a server query, i.e. all except
@@ -490,18 +479,6 @@ impl TestClient {
490479
}
491480
}
492481

493-
pub(crate) async fn drop_collection<T>(coll: &Collection<T>)
494-
where
495-
T: Serialize + DeserializeOwned + Unpin + Debug + Send + Sync,
496-
{
497-
match coll.drop().await.map_err(|e| *e.kind) {
498-
Err(ErrorKind::Command(CommandError { code: 26, .. })) | Ok(_) => {}
499-
e @ Err(_) => {
500-
e.unwrap();
501-
}
502-
};
503-
}
504-
505482
pub(crate) fn get_default_name(description: &str) -> String {
506483
let mut db_name = description.replace('$', "%").replace([' ', '.'], "_");
507484
// database names must have fewer than 38 characters

0 commit comments

Comments
 (0)