Skip to content

Commit d1a7719

Browse files
authored
RUST-714 Fix possible deadlock during retry (#318)
1 parent ce0c29c commit d1a7719

File tree

3 files changed

+55
-4
lines changed

3 files changed

+55
-4
lines changed

src/client/executor.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,13 @@ impl Client {
125125
.topology
126126
.handle_application_error(
127127
err.clone(),
128-
HandshakePhase::after_completion(conn),
128+
HandshakePhase::after_completion(&conn),
129129
&server,
130130
)
131131
.await;
132+
// release the connection to be processed by the connection pool
133+
drop(conn);
134+
// release the selected server to decrement its operation count
132135
drop(server);
133136

134137
// TODO RUST-90: Do not retry read if session is in a transaction
@@ -173,7 +176,7 @@ impl Client {
173176
.topology
174177
.handle_application_error(
175178
err.clone(),
176-
HandshakePhase::after_completion(conn),
179+
HandshakePhase::after_completion(&conn),
177180
&server,
178181
)
179182
.await;

src/sdam/state/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ pub(crate) enum HandshakePhase {
563563
}
564564

565565
impl HandshakePhase {
566-
pub(crate) fn after_completion(handshaked_connection: Connection) -> Self {
566+
pub(crate) fn after_completion(handshaked_connection: &Connection) -> Self {
567567
Self::AfterCompletion {
568568
generation: handshaked_connection.generation,
569569
// given that this is a handshaked connection, the stream description should

src/test/spec/retryable_reads.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
1+
use std::time::Duration;
2+
3+
use bson::doc;
14
use tokio::sync::RwLockWriteGuard;
25

3-
use crate::test::{run_spec_test, LOCK};
6+
use crate::{
7+
test::{
8+
run_spec_test,
9+
FailCommandOptions,
10+
FailPoint,
11+
FailPointMode,
12+
TestClient,
13+
CLIENT_OPTIONS,
14+
LOCK,
15+
},
16+
RUNTIME,
17+
};
418

519
use super::run_v2_test;
620

@@ -10,3 +24,37 @@ async fn run() {
1024
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;
1125
run_spec_test(&["retryable-reads"], run_v2_test).await;
1226
}
27+
28+
/// Test ensures that the connection used in the first attempt of a retry is released back into the
29+
/// pool before the second attempt.
30+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
31+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
32+
async fn retry_releases_connection() {
33+
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;
34+
35+
let mut client_options = CLIENT_OPTIONS.clone();
36+
client_options.hosts.drain(1..);
37+
client_options.retry_reads = Some(true);
38+
client_options.max_pool_size = Some(1);
39+
40+
let client = TestClient::with_options(Some(client_options)).await;
41+
if !client.supports_fail_command().await {
42+
println!("skipping retry_releases_connection due to failCommand not being supported");
43+
return;
44+
}
45+
46+
let collection = client
47+
.database("retry_releases_connection")
48+
.collection("retry_releases_connection");
49+
collection.insert_one(doc! { "x": 1 }, None).await.unwrap();
50+
51+
let options = FailCommandOptions::builder().error_code(91).build();
52+
let failpoint = FailPoint::fail_command(&["find"], FailPointMode::Times(1), Some(options));
53+
let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap();
54+
55+
RUNTIME
56+
.timeout(Duration::from_secs(1), collection.find_one(doc! {}, None))
57+
.await
58+
.expect("operation should not time out")
59+
.expect("find should succeed");
60+
}

0 commit comments

Comments
 (0)