Skip to content

Commit bfb8c27

Browse files
authored
RUST-757 Remove wait_queue_timeout option (#338)
1 parent 79d2782 commit bfb8c27

File tree

8 files changed

+23
-116
lines changed

8 files changed

+23
-116
lines changed

src/client/options/mod.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -443,14 +443,6 @@ pub struct ClientOptions {
443443
#[builder(default, setter(strip_option))]
444444
pub tls: Option<Tls>,
445445

446-
/// The amount of time a thread should block while waiting to check out a connection before
447-
/// returning an error. Note that if there are fewer than `max_pool_size` connections checked
448-
/// out or if a connection is available in the pool, checking out a connection will not block.
449-
///
450-
/// By default, threads will wait indefinitely for a connection to become available.
451-
#[builder(default, setter(strip_option))]
452-
pub wait_queue_timeout: Option<Duration>,
453-
454446
/// Specifies the default write concern for operations performed on the Client. See the
455447
/// WriteConcern type documentation for more details.
456448
#[builder(default, setter(strip_option))]
@@ -689,7 +681,6 @@ impl From<ClientOptionsParser> for ClientOptions {
689681
max_pool_size: parser.max_pool_size,
690682
min_pool_size: parser.min_pool_size,
691683
max_idle_time: parser.max_idle_time,
692-
wait_queue_timeout: parser.wait_queue_timeout,
693684
server_selection_timeout: parser.server_selection_timeout,
694685
compressors: parser.compressors,
695686
connect_timeout: parser.connect_timeout,
@@ -723,8 +714,8 @@ impl ClientOptions {
723714
}
724715
}
725716

726-
/// Parses a MongoDB connection string into a ClientOptions struct. If the string is malformed
727-
/// or one of the options has an invalid value, an error will be returned.
717+
/// Parses a MongoDB connection string into a [`ClientOptions`] struct. If the string is
718+
/// malformed or one of the options has an invalid value, an error will be returned.
728719
///
729720
/// In the case that "mongodb+srv" is used, SRV and TXT record lookups will be done as
730721
/// part of this method.
@@ -759,7 +750,7 @@ impl ClientOptions {
759750
/// * `retryWrites`: not yet implemented
760751
/// * `retryReads`: maps to the `retry_reads` field
761752
/// * `serverSelectionTimeoutMS`: maps to the `server_selection_timeout` field
762-
/// * `socketTimeoutMS`: maps to the `socket_timeout` field
753+
/// * `socketTimeoutMS`: unsupported, does not map to any field
763754
/// * `ssl`: an alias of the `tls` option
764755
/// * `tls`: maps to the TLS variant of the `tls` field`.
765756
/// * `tlsInsecure`: relaxes the TLS constraints on connections being made; currently is just
@@ -770,7 +761,7 @@ impl ClientOptions {
770761
/// * `tlsCAFile`: maps to the `ca_file_path` field of the `tls` field
771762
/// * `tlsCertificateKeyFile`: maps to the `cert_key_file_path` field of the `tls` field
772763
/// * `w`: maps to the `w` field of the `write_concern` field
773-
/// * `waitQueueTimeoutMS`: maps to the `wait_queue_timeout` field
764+
/// * `waitQueueTimeoutMS`: unsupported, does not map to any field
774765
/// * `wTimeoutMS`: maps to the `w_timeout` field of the `write_concern` field
775766
/// * `zlibCompressionLevel`: not yet implemented
776767
///
@@ -939,7 +930,6 @@ impl ClientOptions {
939930
server_selection_timeout,
940931
socket_timeout,
941932
tls,
942-
wait_queue_timeout,
943933
write_concern,
944934
zlib_compression,
945935
original_srv_hostname,

src/cmap/connection_requester.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use tokio::sync::{mpsc, oneshot};
22

33
use super::{worker::PoolWorkerHandle, Connection};
4-
use crate::{error::Result, options::StreamAddress, runtime::AsyncJoinHandle, RUNTIME};
5-
use std::time::Duration;
4+
use crate::{error::Result, options::StreamAddress, runtime::AsyncJoinHandle};
65

76
/// Returns a new requester/receiver pair.
87
pub(super) fn channel(
@@ -32,28 +31,16 @@ pub(super) struct ConnectionRequester {
3231

3332
impl ConnectionRequester {
3433
/// Request a connection from the pool that owns the receiver end of this requester.
35-
/// Returns None if it takes longer than wait_queue_timeout before the pool returns a result.
36-
pub(super) async fn request(
37-
&self,
38-
wait_queue_timeout: Option<Duration>,
39-
) -> Option<ConnectionRequestResult> {
34+
pub(super) async fn request(&self) -> ConnectionRequestResult {
4035
let (sender, receiver) = oneshot::channel();
4136

4237
// this only errors if the receiver end is dropped, which can't happen because
4338
// we own a handle to the worker, keeping it alive.
4439
self.sender.send(sender).unwrap();
4540

46-
match wait_queue_timeout {
47-
Some(timeout) => RUNTIME
48-
.timeout(timeout, receiver)
49-
.await
50-
.map(|r| r.unwrap()) // see comment below as to why this is safe
51-
.ok(),
52-
53-
// similarly, the receiver only returns an error if the sender is dropped, which
54-
// can't happen due to the handle.
55-
None => Some(receiver.await.unwrap()),
56-
}
41+
// similarly, the receiver only returns an error if the sender is dropped, which
42+
// can't happen due to the handle.
43+
receiver.await.unwrap()
5744
}
5845
}
5946

src/cmap/mod.rs

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub(crate) mod options;
99
mod status;
1010
mod worker;
1111

12-
use std::{sync::Arc, time::Duration};
12+
use std::sync::Arc;
1313

1414
use derivative::Derivative;
1515

@@ -21,7 +21,7 @@ pub(crate) use self::{
2121
};
2222
use self::{connection_requester::ConnectionRequestResult, options::ConnectionPoolOptions};
2323
use crate::{
24-
error::{Error, ErrorKind, Result},
24+
error::{Error, Result},
2525
event::cmap::{
2626
CmapEventHandler,
2727
ConnectionCheckoutFailedEvent,
@@ -52,8 +52,6 @@ pub(crate) struct ConnectionPool {
5252
connection_requester: ConnectionRequester,
5353
generation_subscriber: PoolGenerationSubscriber,
5454

55-
wait_queue_timeout: Option<Duration>,
56-
5755
#[derivative(Debug = "ignore")]
5856
event_handler: Option<Arc<dyn CmapEventHandler>>,
5957
}
@@ -73,7 +71,6 @@ impl ConnectionPool {
7371
);
7472

7573
let event_handler = options.as_ref().and_then(|opts| opts.event_handler.clone());
76-
let wait_queue_timeout = options.as_ref().and_then(|opts| opts.wait_queue_timeout);
7774

7875
if let Some(ref handler) = event_handler {
7976
handler.handle_pool_created_event(PoolCreatedEvent {
@@ -87,7 +84,6 @@ impl ConnectionPool {
8784
manager,
8885
connection_requester,
8986
generation_subscriber,
90-
wait_queue_timeout,
9187
event_handler,
9288
}
9389
}
@@ -104,7 +100,6 @@ impl ConnectionPool {
104100
manager,
105101
connection_requester,
106102
generation_subscriber,
107-
wait_queue_timeout: None,
108103
event_handler: None,
109104
}
110105
}
@@ -120,9 +115,7 @@ impl ConnectionPool {
120115

121116
/// Checks out a connection from the pool. This method will yield until this thread is at the
122117
/// front of the wait queue, and then will block again if no available connections are in the
123-
/// pool and the total number of connections is not less than the max pool size. If the method
124-
/// blocks for longer than `wait_queue_timeout` waiting for an available connection or to
125-
/// start establishing a new one, a `WaitQueueTimeoutError` will be returned.
118+
/// pool and the total number of connections is not less than the max pool size.
126119
pub(crate) async fn check_out(&self) -> Result<Connection> {
127120
self.emit_event(|handler| {
128121
let event = ConnectionCheckoutStartedEvent {
@@ -132,21 +125,12 @@ impl ConnectionPool {
132125
handler.handle_connection_checkout_started_event(event);
133126
});
134127

135-
let response = self
136-
.connection_requester
137-
.request(self.wait_queue_timeout)
138-
.await;
128+
let response = self.connection_requester.request().await;
139129

140130
let conn = match response {
141-
Some(ConnectionRequestResult::Pooled(c)) => Ok(c),
142-
Some(ConnectionRequestResult::Establishing(task)) => task.await,
143-
Some(ConnectionRequestResult::PoolCleared) => {
144-
Err(Error::pool_cleared_error(&self.address))
145-
}
146-
None => Err(ErrorKind::WaitQueueTimeout {
147-
address: self.address.clone(),
148-
}
149-
.into()),
131+
ConnectionRequestResult::Pooled(c) => Ok(c),
132+
ConnectionRequestResult::Establishing(task) => task.await,
133+
ConnectionRequestResult::PoolCleared => Err(Error::pool_cleared_error(&self.address)),
150134
};
151135

152136
match conn {
@@ -155,17 +139,11 @@ impl ConnectionPool {
155139
handler.handle_connection_checked_out_event(conn.checked_out_event());
156140
});
157141
}
158-
Err(ref e) => {
159-
let failure_reason = if let ErrorKind::WaitQueueTimeout { .. } = *e.kind {
160-
ConnectionCheckoutFailedReason::Timeout
161-
} else {
162-
ConnectionCheckoutFailedReason::ConnectionError
163-
};
164-
142+
Err(_) => {
165143
self.emit_event(|handler| {
166144
handler.handle_connection_checkout_failed_event(ConnectionCheckoutFailedEvent {
167145
address: self.address.clone(),
168-
reason: failure_reason,
146+
reason: ConnectionCheckoutFailedReason::ConnectionError,
169147
})
170148
});
171149
}

src/cmap/options.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,6 @@ pub(crate) struct ConnectionPoolOptions {
8585
/// The default is not to use TLS for connections.
8686
#[serde(skip)]
8787
pub(crate) tls_options: Option<TlsOptions>,
88-
89-
/// Rather than wait indefinitely for a connection to become available, instead return an error
90-
/// after the given duration.
91-
///
92-
/// The default is to block indefinitely until a connection becomes available.
93-
#[serde(rename = "waitQueueTimeoutMS")]
94-
#[serde(default)]
95-
#[serde(deserialize_with = "bson_util::deserialize_duration_from_u64_millis")]
96-
pub(crate) wait_queue_timeout: Option<Duration>,
9788
}
9889

9990
impl ConnectionPoolOptions {
@@ -107,7 +98,6 @@ impl ConnectionPoolOptions {
10798
max_pool_size: options.max_pool_size,
10899
server_api: options.server_api.clone(),
109100
tls_options: options.tls_options(),
110-
wait_queue_timeout: options.wait_queue_timeout,
111101
credential: options.credential.clone(),
112102
event_handler: options.cmap_event_handler.clone(),
113103
#[cfg(test)]
@@ -127,7 +117,6 @@ impl ConnectionPoolOptions {
127117
max_pool_size: self.max_pool_size,
128118
server_api: self.server_api.clone(),
129119
tls_options: self.tls_options.clone(),
130-
wait_queue_timeout: self.wait_queue_timeout,
131120
}
132121
}
133122
}

src/cmap/test/file.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,7 @@ use std::{sync::Arc, time::Duration};
33
use serde::Deserialize;
44

55
use super::{event::Event, State};
6-
use crate::{
7-
bson_util,
8-
cmap::options::ConnectionPoolOptions,
9-
error::{ErrorKind, Result},
10-
test::RunOn,
11-
};
6+
use crate::{bson_util, cmap::options::ConnectionPoolOptions, error::Result, test::RunOn};
127
use bson::Document;
138

149
#[derive(Debug, Deserialize)]
@@ -94,16 +89,3 @@ pub struct Error {
9489
message: String,
9590
address: Option<String>,
9691
}
97-
98-
impl Error {
99-
pub fn assert_matches(&self, error: &crate::error::Error, description: &str) {
100-
match error.kind.as_ref() {
101-
ErrorKind::WaitQueueTimeout { .. } => {
102-
assert_eq!(self.type_, "WaitQueueTimeoutError", "{}", description);
103-
}
104-
_ => {
105-
panic!("Expected {}, but got {:?}", self.type_, error);
106-
}
107-
}
108-
}
109-
}

src/cmap/test/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use bson::doc;
3434
const TEST_DESCRIPTIONS_TO_SKIP: &[&str] = &[
3535
"must destroy checked in connection if pool has been closed",
3636
"must throw error if checkOut is called on a closed pool",
37+
// WaitQueueTimeoutMS is not supported
38+
"must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
39+
"waiting on maxConnecting is limited by WaitQueueTimeoutMS",
3740
];
3841

3942
/// Many different types of CMAP events are emitted from tasks spawned in the drop
@@ -185,17 +188,14 @@ impl Executor {
185188
}
186189

187190
match (self.error, error) {
188-
(Some(ref expected), Some(ref actual)) => {
189-
expected.assert_matches(actual, self.description.as_str())
190-
}
191191
(Some(ref expected), None) => {
192192
panic!("Expected {}, but no error occurred", expected.type_)
193193
}
194194
(None, Some(ref actual)) => panic!(
195195
"Expected no error to occur, but the following error was returned: {:?}",
196196
actual
197197
),
198-
(None, None) => {}
198+
(None, None) | (Some(_), Some(_)) => {}
199199
}
200200

201201
let ignored_event_names = self.ignored_event_names;
@@ -354,9 +354,6 @@ impl Matchable for EventOptions {
354354
&& self.max_pool_size.matches(&expected.max_pool_size)
355355
&& self.min_pool_size.matches(&expected.min_pool_size)
356356
&& self.tls_options.matches(&expected.tls_options)
357-
&& self
358-
.wait_queue_timeout
359-
.matches(&expected.wait_queue_timeout)
360357
}
361358
}
362359

src/error.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -392,13 +392,6 @@ pub enum ErrorKind {
392392
#[non_exhaustive]
393393
InvalidTlsConfig { message: String },
394394

395-
/// The Client timed out while checking out a connection from connection pool.
396-
#[error(
397-
"Timed out while checking out a connection from connection pool with address {address}"
398-
)]
399-
#[non_exhaustive]
400-
WaitQueueTimeout { address: StreamAddress },
401-
402395
/// An error occurred when trying to execute a write operation
403396
#[error("An error occurred when trying to execute a write operation: {0:?}")]
404397
Write(WriteFailure),

src/event/cmap.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,6 @@ pub struct ConnectionPoolOptions {
8888
/// The default is not to use TLS for connections.
8989
#[serde(skip)]
9090
pub tls_options: Option<TlsOptions>,
91-
92-
/// Rather than wait indefinitely for a connection to become available, instead return an error
93-
/// after the given duration.
94-
///
95-
/// The default is to block indefinitely until a connection becomes available.
96-
#[serde(rename = "waitQueueTimeoutMS")]
97-
#[serde(default)]
98-
#[serde(deserialize_with = "crate::bson_util::deserialize_duration_from_u64_millis")]
99-
pub wait_queue_timeout: Option<Duration>,
10091
}
10192

10293
/// Event emitted when a connection pool becomes ready.

0 commit comments

Comments
 (0)