Skip to content

Commit 78df02f

Browse files
RUST-2105 Fix serverless tests (#1259)
1 parent 0ac9fdf commit 78df02f

File tree

7 files changed

+97
-74
lines changed

7 files changed

+97
-74
lines changed

.evergreen/config.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ buildvariants:
272272
- test-plain-auth
273273

274274
- name: serverless
275-
patchable: false
276275
display_name: "Serverless"
277276
run_on:
278277
- rhel80-small

.evergreen/run-serverless-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ cargo_test test::spec::transactions
1919
cargo_test test::spec::load_balancers
2020
cargo_test test::cursor
2121
cargo_test test::spec::collection_management
22-
cargo_test test::spec::command_monitoring_unified
22+
cargo_test test::spec::command_monitoring::command_monitoring_unified
2323

2424
exit $CARGO_RESULT

src/cmap/conn.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,12 +331,14 @@ impl PinnedConnectionHandle {
331331
/// connection has been unpinned.
332332
pub(crate) async fn take_connection(&self) -> Result<PooledConnection> {
333333
let mut receiver = self.receiver.lock().await;
334-
receiver.recv().await.ok_or_else(|| {
334+
let mut connection = receiver.recv().await.ok_or_else(|| {
335335
Error::internal(format!(
336336
"cannot take connection after unpin (id={})",
337337
self.id
338338
))
339-
})
339+
})?;
340+
connection.mark_pinned_in_use();
341+
Ok(connection)
340342
}
341343

342344
pub(crate) fn id(&self) -> u32 {

src/cmap/conn/pooled.rs

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ enum PooledConnectionState {
7272
/// The state of the pinned connection.
7373
pinned_state: PinnedState,
7474

75+
pinned_sender: mpsc::Sender<PooledConnection>,
76+
7577
/// The manager used to check this connection back into the pool.
7678
pool_manager: PoolManager,
7779
},
@@ -81,10 +83,7 @@ enum PooledConnectionState {
8183
#[derive(Clone, Debug)]
8284
enum PinnedState {
8385
/// The state associated with a pinned connection that is currently in use.
84-
InUse {
85-
/// The sender that can be used to return the connection to its pinner.
86-
pinned_sender: mpsc::Sender<PooledConnection>,
87-
},
86+
InUse,
8887

8988
/// The state associated with a pinned connection that has been returned to its pinner.
9089
Returned {
@@ -219,13 +218,11 @@ impl PooledConnection {
219218

220219
/// Pin the connection and return a handle to the pinned connection.
221220
pub(crate) fn pin(&mut self) -> Result<PinnedConnectionHandle> {
222-
let rx = match &mut self.state {
223-
PooledConnectionState::CheckedIn { .. } => {
224-
return Err(Error::internal(format!(
225-
"cannot pin a checked-in connection (id = {})",
226-
self.id
227-
)))
228-
}
221+
match &mut self.state {
222+
PooledConnectionState::CheckedIn { .. } => Err(Error::internal(format!(
223+
"cannot pin a checked-in connection (id = {})",
224+
self.id
225+
))),
229226
PooledConnectionState::CheckedOut {
230227
ref pool_manager, ..
231228
} => {
@@ -234,29 +231,37 @@ impl PooledConnection {
234231
// Mark the connection as in-use while the operation currently using the
235232
// connection finishes. Once that operation drops the connection, it will be
236233
// sent back to the pinner.
237-
pinned_state: PinnedState::InUse { pinned_sender: tx },
234+
pinned_sender: tx,
235+
pinned_state: PinnedState::InUse,
238236
pool_manager: pool_manager.clone(),
239237
};
240-
rx
238+
239+
Ok(PinnedConnectionHandle {
240+
id: self.id,
241+
receiver: Arc::new(Mutex::new(rx)),
242+
})
241243
}
242-
PooledConnectionState::Pinned { pinned_state, .. } => match pinned_state {
243-
PinnedState::InUse { .. } => {
244-
return Err(Error::internal(format!(
245-
"cannot pin an already-pinned connection (id = {})",
246-
self.id
247-
)))
248-
}
249-
PinnedState::Returned { .. } => {
250-
let (tx, rx) = mpsc::channel(1);
251-
*pinned_state = PinnedState::InUse { pinned_sender: tx };
252-
rx
244+
PooledConnectionState::Pinned { .. } => Err(Error::internal(format!(
245+
"cannot pin an already-pinned connection (id = {})",
246+
self.id
247+
))),
248+
}
249+
}
250+
251+
pub(crate) fn mark_pinned_in_use(&mut self) {
252+
match self.state {
253+
PooledConnectionState::Pinned {
254+
ref mut pinned_state,
255+
..
256+
} => {
257+
*pinned_state = PinnedState::InUse;
258+
}
259+
_ => {
260+
if cfg!(debug_assertions) {
261+
panic!("attempting to mark a non-pinned connection in use")
253262
}
254-
},
255-
};
256-
Ok(PinnedConnectionHandle {
257-
id: self.id,
258-
receiver: Arc::new(Mutex::new(rx)),
259-
})
263+
}
264+
}
260265
}
261266

262267
/// Emit a [`ConnectionClosedEvent`] for this connection with the supplied reason.
@@ -327,17 +332,19 @@ impl Drop for PooledConnection {
327332
}
328333
// A pinned connection should be returned to its pinner or to the connection pool.
329334
PooledConnectionState::Pinned {
335+
pinned_sender,
330336
pinned_state,
331337
pool_manager,
332338
} => {
333339
let pool_manager = pool_manager.clone();
334340
match pinned_state {
335341
// If the pinned connection is in use, it is being dropped at the end of an
336342
// operation and should be sent back to its pinner.
337-
PinnedState::InUse { pinned_sender } => {
343+
PinnedState::InUse => {
338344
let pinned_sender = pinned_sender.clone();
339345

340346
let dropped_connection = self.take(PooledConnectionState::Pinned {
347+
pinned_sender: pinned_sender.clone(),
341348
pinned_state: PinnedState::Returned {
342349
returned_time: Instant::now(),
343350
},

src/test/spec/crud.rs

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,51 @@ use crate::test::spec::unified_runner::run_unified_tests;
22

33
#[tokio::test(flavor = "multi_thread")]
44
async fn run_unified() {
5+
let skipped_files = vec![
6+
// The Rust driver does not support unacknowledged writes (and does not intend to in
7+
// the future).
8+
"bulkWrite-deleteMany-hint-unacknowledged.json",
9+
"bulkWrite-deleteOne-hint-unacknowledged.json",
10+
"bulkWrite-replaceOne-hint-unacknowledged.json",
11+
"bulkWrite-updateMany-hint-unacknowledged.json",
12+
"bulkWrite-updateOne-hint-unacknowledged.json",
13+
"deleteMany-hint-unacknowledged.json",
14+
"deleteOne-hint-unacknowledged.json",
15+
"findOneAndDelete-hint-unacknowledged.json",
16+
"findOneAndReplace-hint-unacknowledged.json",
17+
"findOneAndUpdate-hint-unacknowledged.json",
18+
"replaceOne-hint-unacknowledged.json",
19+
"updateMany-hint-unacknowledged.json",
20+
"updateOne-hint-unacknowledged.json",
21+
// TODO RUST-1405: unskip the errorResponse tests
22+
"client-bulkWrite-errorResponse.json",
23+
"bulkWrite-errorResponse.json",
24+
"updateOne-errorResponse.json",
25+
"insertOne-errorResponse.json",
26+
"deleteOne-errorResponse.json",
27+
"aggregate-merge-errorResponse.json",
28+
"findOneAndUpdate-errorResponse.json",
29+
];
30+
31+
let mut skipped_tests = vec![
32+
// Unacknowledged write; see above.
33+
"Unacknowledged write using dollar-prefixed or dotted keys may be silently rejected on \
34+
pre-5.0 server",
35+
// TODO RUST-663: Unskip these tests.
36+
"Aggregate with $out includes read preference for 5.0+ server",
37+
"Aggregate with $out omits read preference for pre-5.0 server",
38+
"Aggregate with $merge includes read preference for 5.0+ server",
39+
"Aggregate with $merge omits read preference for pre-5.0 server",
40+
"Database-level aggregate with $out omits read preference for pre-5.0 server",
41+
"Database-level aggregate with $merge omits read preference for pre-5.0 server",
42+
];
43+
// TODO: remove this manual skip when this test is fixed to skip on serverless
44+
if std::env::var("SERVERLESS").is_ok() {
45+
skipped_tests.push("inserting _id with type null via clientBulkWrite");
46+
}
47+
548
run_unified_tests(&["crud", "unified"])
6-
.skip_files(&[
7-
// The Rust driver does not support unacknowledged writes (and does not intend to in
8-
// the future).
9-
"bulkWrite-deleteMany-hint-unacknowledged.json",
10-
"bulkWrite-deleteOne-hint-unacknowledged.json",
11-
"bulkWrite-replaceOne-hint-unacknowledged.json",
12-
"bulkWrite-updateMany-hint-unacknowledged.json",
13-
"bulkWrite-updateOne-hint-unacknowledged.json",
14-
"deleteMany-hint-unacknowledged.json",
15-
"deleteOne-hint-unacknowledged.json",
16-
"findOneAndDelete-hint-unacknowledged.json",
17-
"findOneAndReplace-hint-unacknowledged.json",
18-
"findOneAndUpdate-hint-unacknowledged.json",
19-
"replaceOne-hint-unacknowledged.json",
20-
"updateMany-hint-unacknowledged.json",
21-
"updateOne-hint-unacknowledged.json",
22-
// TODO RUST-1405: unskip the errorResponse tests
23-
"client-bulkWrite-errorResponse.json",
24-
"bulkWrite-errorResponse.json",
25-
"updateOne-errorResponse.json",
26-
"insertOne-errorResponse.json",
27-
"deleteOne-errorResponse.json",
28-
"aggregate-merge-errorResponse.json",
29-
"findOneAndUpdate-errorResponse.json",
30-
])
31-
.skip_tests(&[
32-
// Unacknowledged write; see above.
33-
"Unacknowledged write using dollar-prefixed or dotted keys may be silently rejected \
34-
on pre-5.0 server",
35-
// TODO RUST-663: Unskip these tests.
36-
"Aggregate with $out includes read preference for 5.0+ server",
37-
"Aggregate with $out omits read preference for pre-5.0 server",
38-
"Aggregate with $merge includes read preference for 5.0+ server",
39-
"Aggregate with $merge omits read preference for pre-5.0 server",
40-
"Database-level aggregate with $out omits read preference for pre-5.0 server",
41-
"Database-level aggregate with $merge omits read preference for pre-5.0 server",
42-
])
49+
.skip_files(&skipped_files)
50+
.skip_tests(&skipped_tests)
4351
.await;
4452
}

src/test/spec/json/retryable-writes/unified/aggregate-out-merge.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"description": "aggregate with $out/$merge does not set txnNumber",
3-
"schemaVersion": "1.3",
3+
"schemaVersion": "1.4",
44
"runOnRequirements": [
55
{
66
"minServerVersion": "3.6",
@@ -45,6 +45,11 @@
4545
"tests": [
4646
{
4747
"description": "aggregate with $out does not set txnNumber",
48+
"runOnRequirements": [
49+
{
50+
"serverless": "forbid"
51+
}
52+
],
4853
"operations": [
4954
{
5055
"object": "collection0",

src/test/spec/json/retryable-writes/unified/aggregate-out-merge.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
description: "aggregate with $out/$merge does not set txnNumber"
22

3-
schemaVersion: "1.3"
3+
schemaVersion: "1.4"
44

55
runOnRequirements:
66
- minServerVersion: "3.6"
@@ -30,6 +30,8 @@ initialData:
3030

3131
tests:
3232
- description: "aggregate with $out does not set txnNumber"
33+
runOnRequirements:
34+
- serverless: forbid # $out is not supported on serverless
3335
operations:
3436
- object: *collection0
3537
name: aggregate

0 commit comments

Comments
 (0)