Skip to content

Commit ed3d87a

Browse files
authored
RUST-228 Document client options (#96)
1 parent e591a17 commit ed3d87a

File tree

16 files changed

+458
-184
lines changed

16 files changed

+458
-184
lines changed

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ struct ClientInner {
7777
impl Client {
7878
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
7979
/// MongoDB connection string.
80+
///
81+
/// See the documentation on ClientOptions::parse for more details.
8082
pub fn with_uri_str(uri: &str) -> Result<Self> {
8183
let options = ClientOptions::parse(uri)?;
8284

src/client/options/mod.rs

Lines changed: 237 additions & 91 deletions
Large diffs are not rendered by default.

src/client/options/test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ fn document_from_client_options(mut options: ClientOptions) -> Document {
114114

115115
doc.insert("readpreferencetags", tags);
116116
}
117-
}
118117

119-
if let Some(i) = options.max_staleness.take() {
120-
doc.insert("maxstalenessseconds", i.as_secs() as i64);
118+
if let Some(i) = max_staleness {
119+
doc.insert("maxstalenessseconds", i.as_secs() as i64);
120+
}
121121
}
122122

123123
if let Some(b) = options.retry_reads.take() {

src/cmap/conn/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl Connection {
7070
id: u32,
7171
address: StreamAddress,
7272
generation: u32,
73+
connect_timeout: Option<Duration>,
7374
tls_options: Option<TlsOptions>,
7475
handler: Option<Arc<dyn CmapEventHandler>>,
7576
) -> Result<Self> {
@@ -79,7 +80,7 @@ impl Connection {
7980
pool: None,
8081
stream_description: None,
8182
ready_and_available_time: None,
82-
stream: Stream::connect(address.clone(), tls_options)?,
83+
stream: Stream::connect(address.clone(), connect_timeout, tls_options)?,
8384
address,
8485
handler,
8586
};

src/cmap/conn/stream.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{
22
io::{self, Read, Write},
3-
net::TcpStream,
3+
net::{TcpStream, ToSocketAddrs},
44
sync::Arc,
5+
time::Duration,
56
};
67

78
use derivative::Derivative;
@@ -12,6 +13,8 @@ use crate::{
1213
options::{StreamAddress, TlsOptions},
1314
};
1415

16+
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
17+
1518
/// Stream encapsulates the different socket types that can be used and adds a thin wrapper for I/O.
1619
#[derive(Derivative)]
1720
#[derivative(Debug)]
@@ -33,8 +36,22 @@ pub(super) enum Stream {
3336

3437
impl Stream {
3538
/// Creates a new stream connected to `address`.
36-
pub(super) fn connect(host: StreamAddress, tls_options: Option<TlsOptions>) -> Result<Self> {
37-
let inner = TcpStream::connect(host.to_string())?;
39+
pub(super) fn connect(
40+
host: StreamAddress,
41+
connect_timeout: Option<Duration>,
42+
tls_options: Option<TlsOptions>,
43+
) -> Result<Self> {
44+
let timeout = connect_timeout.unwrap_or(DEFAULT_CONNECT_TIMEOUT);
45+
46+
// The URI options spec requires that the default is 10 seconds, but that 0 should indicate
47+
// no timeout.
48+
let inner = if timeout == Duration::from_secs(0) {
49+
TcpStream::connect(&host)?
50+
} else {
51+
let socket_addrs: Vec<_> = host.to_socket_addrs()?.collect();
52+
53+
TcpStream::connect_timeout(&socket_addrs[0], timeout)?
54+
};
3855
inner.set_nodelay(true)?;
3956

4057
match tls_options {

src/cmap/mod.rs

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,30 @@ pub(crate) struct ConnectionPoolInner {
6464
/// The address the pool's connections will connect to.
6565
address: StreamAddress,
6666

67-
/// If a checkout operation takes longer than `wait_queue_timeout`, the pool will return an
68-
/// error. If `wait_queue_timeout` is `None`, then the checkout operation will not time out.
69-
wait_queue_timeout: Option<Duration>,
67+
/// The set of available connections in the pool. Because the CMAP spec requires that
68+
/// connections are checked out in a FIFO manner, connections are pushed/popped from the back
69+
/// of the Vec.
70+
connections: Arc<RwLock<Vec<Connection>>>,
71+
72+
/// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
73+
/// server.
74+
connect_timeout: Option<Duration>,
75+
76+
/// The credential to use for authenticating connections in this pool.
77+
credential: Option<Credential>,
78+
79+
/// Contains the logic for "establishing" a connection. This includes handshaking and
80+
/// authenticating a connection when it's first created.
81+
establisher: ConnectionEstablisher,
82+
83+
/// The event handler specified by the user to process CMAP events.
84+
#[derivative(Debug = "ignore")]
85+
event_handler: Option<Arc<dyn CmapEventHandler>>,
86+
87+
/// The current generation of the pool. The generation is incremented whenever the pool is
88+
/// cleared. Connections belonging to a previous generation are considered stale and will be
89+
/// closed when checked back in or when popped off of the set of available connections.
90+
generation: AtomicU32,
7091

7192
/// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
7293
/// be closed either by the background thread or when popped off of the set of available
@@ -84,75 +105,66 @@ pub(crate) struct ConnectionPoolInner {
84105
/// them to the pool.
85106
min_pool_size: Option<u32>,
86107

108+
/// The ID of the next connection created by the pool.
109+
next_connection_id: AtomicU32,
110+
111+
/// If a checkout operation takes longer than `wait_queue_timeout`, the pool will return an
112+
/// error. If `wait_queue_timeout` is `None`, then the checkout operation will not time out.
113+
wait_queue_timeout: Option<Duration>,
114+
87115
/// The TLS options to use for the connections. If `tls_options` is None, then TLS will not be
88116
/// used to connect to the server.
89117
tls_options: Option<TlsOptions>,
90118

91-
/// The credential to use for authenticating connections in this pool.
92-
credential: Option<Credential>,
93-
94-
/// The current generation of the pool. The generation is incremented whenever the pool is
95-
/// cleared. Connections belonging to a previous generation are considered stale and will be
96-
/// closed when checked back in or when popped off of the set of available connections.
97-
generation: AtomicU32,
98-
99119
/// The total number of connections currently in the pool. This includes connections which are
100120
/// currently checked out of the pool.
101121
total_connection_count: AtomicU32,
102122

103-
/// The ID of the next connection created by the pool.
104-
next_connection_id: AtomicU32,
105-
106123
/// Connections are checked out by concurrent threads on a first-come, first-server basis. This
107124
/// is enforced by threads entering the wait queue when they first try to check out a
108125
/// connection and then blocking until they are at the front of the queue.
109126
wait_queue: WaitQueue,
110-
111-
/// The set of available connections in the pool. Because the CMAP spec requires that
112-
/// connections are checked out in a FIFO manner, connections are pushed/popped from the back
113-
/// of the Vec.
114-
connections: Arc<RwLock<Vec<Connection>>>,
115-
116-
/// Contains the logic for "establishing" a connection. This includes handshaking and
117-
/// authenticating a connection when it's first created.
118-
establisher: ConnectionEstablisher,
119-
120-
/// The event handler specified by the user to process CMAP events.
121-
#[derivative(Debug = "ignore")]
122-
event_handler: Option<Arc<dyn CmapEventHandler>>,
123127
}
124128

125129
impl ConnectionPool {
126130
pub(crate) fn new(address: StreamAddress, mut options: Option<ConnectionPoolOptions>) -> Self {
127131
// Get the individual options from `options`.
128-
let max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time);
129-
let wait_queue_timeout = options.as_ref().and_then(|opts| opts.wait_queue_timeout);
132+
let connect_timeout = options.as_ref().and_then(|opts| opts.connect_timeout);
133+
let credential = options.as_mut().and_then(|opts| opts.credential.clone());
134+
let establisher = ConnectionEstablisher::new(options.as_ref());
135+
let event_handler = options.as_mut().and_then(|opts| opts.event_handler.take());
136+
137+
// The CMAP spec indicates that a max idle time of zero means that connections should not be
138+
// closed due to idleness.
139+
let mut max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time);
140+
if max_idle_time == Some(Duration::from_millis(0)) {
141+
max_idle_time = None;
142+
}
143+
130144
let max_pool_size = options
131145
.as_ref()
132146
.and_then(|opts| opts.max_pool_size)
133147
.unwrap_or(DEFAULT_MAX_POOL_SIZE);
134148
let min_pool_size = options.as_ref().and_then(|opts| opts.min_pool_size);
135149
let tls_options = options.as_mut().and_then(|opts| opts.tls_options.take());
136-
let event_handler = options.as_mut().and_then(|opts| opts.event_handler.take());
137-
138-
let credential = options.as_mut().and_then(|opts| opts.credential.clone());
139-
let establisher = ConnectionEstablisher::new(options.as_ref());
150+
let wait_queue_timeout = options.as_ref().and_then(|opts| opts.wait_queue_timeout);
140151

141152
let inner = ConnectionPoolInner {
142153
address: address.clone(),
143-
wait_queue_timeout,
154+
connect_timeout,
155+
credential,
156+
establisher,
157+
event_handler,
158+
generation: AtomicU32::new(0),
144159
max_idle_time,
145160
max_pool_size,
146161
min_pool_size,
162+
next_connection_id: AtomicU32::new(1),
147163
tls_options,
148-
credential,
149-
generation: AtomicU32::new(0),
150164
total_connection_count: AtomicU32::new(0),
151-
next_connection_id: AtomicU32::new(1),
152165
wait_queue: WaitQueue::new(address.clone(), wait_queue_timeout),
166+
wait_queue_timeout,
153167
connections: Default::default(),
154-
establisher,
155-
event_handler,
156168
};
157169

158170
let pool = Self {
@@ -329,6 +341,7 @@ impl ConnectionPool {
329341
self.inner.next_connection_id.fetch_add(1, Ordering::SeqCst),
330342
self.inner.address.clone(),
331343
self.inner.generation.load(Ordering::SeqCst),
344+
self.inner.connect_timeout,
332345
self.inner.tls_options.clone(),
333346
self.inner.event_handler.clone(),
334347
)?;

src/cmap/options.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,33 @@ pub struct ConnectionPoolOptions {
2121
#[builder(default)]
2222
pub app_name: Option<String>,
2323

24+
/// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
25+
/// server.
26+
#[builder(default)]
27+
#[serde(skip)]
28+
pub connect_timeout: Option<Duration>,
29+
30+
/// The credential to use for authenticating connections in this pool.
31+
#[builder(default)]
32+
#[serde(skip)]
33+
pub credential: Option<Credential>,
34+
35+
/// Processes all events generated by the pool.
36+
#[derivative(Debug = "ignore", PartialEq = "ignore")]
37+
#[builder(default)]
38+
#[serde(skip)]
39+
pub event_handler: Option<Arc<dyn CmapEventHandler>>,
40+
41+
/// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
42+
/// not be used.
43+
///
44+
/// The default is that connections will not be closed due to being idle.
45+
#[builder(default)]
46+
#[serde(rename = "maxIdleTimeMS")]
47+
#[serde(default)]
48+
#[serde(deserialize_with = "self::deserialize_duration_from_u64_millis")]
49+
pub max_idle_time: Option<Duration>,
50+
2451
/// The maximum number of connections that the pool can have at a given time. This includes
2552
/// connections which are currently checked out of the pool.
2653
///
@@ -36,15 +63,13 @@ pub struct ConnectionPoolOptions {
3663
#[builder(default)]
3764
pub min_pool_size: Option<u32>,
3865

39-
/// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
40-
/// not be used.
66+
/// The options specifying how a TLS connection should be configured. If `tls_options` is
67+
/// `None`, then TLS will not be used for the connections.
4168
///
42-
/// The default is that connections will not be closed due to being idle.
69+
/// The default is not to use TLS for connections.
4370
#[builder(default)]
44-
#[serde(rename = "maxIdleTimeMS")]
45-
#[serde(default)]
46-
#[serde(deserialize_with = "self::deserialize_duration_from_u64_millis")]
47-
pub max_idle_time: Option<Duration>,
71+
#[serde(skip)]
72+
pub tls_options: Option<TlsOptions>,
4873

4974
/// Rather than wait indefinitely for a connection to become available, instead return an error
5075
/// after the given duration.
@@ -55,24 +80,6 @@ pub struct ConnectionPoolOptions {
5580
#[serde(default)]
5681
#[serde(deserialize_with = "self::deserialize_duration_from_u64_millis")]
5782
pub wait_queue_timeout: Option<Duration>,
58-
59-
/// The options specifying how a TLS connection should be configured. If `tls_options` is
60-
/// `None`, then TLS will not be used for the connections.
61-
///
62-
/// The default is not to use TLS for connections.
63-
#[builder(default)]
64-
#[serde(skip)]
65-
pub tls_options: Option<TlsOptions>,
66-
67-
#[derivative(Debug = "ignore", PartialEq = "ignore")]
68-
#[builder(default)]
69-
#[serde(skip)]
70-
pub event_handler: Option<Arc<dyn CmapEventHandler>>,
71-
72-
/// The credential to use for authenticating connections in this pool.
73-
#[builder(default)]
74-
#[serde(skip)]
75-
pub credential: Option<Credential>,
7683
}
7784

7885
fn deserialize_duration_from_u64_millis<'de, D>(
@@ -88,13 +95,15 @@ where
8895
impl ConnectionPoolOptions {
8996
pub(crate) fn from_client_options(options: &ClientOptions) -> Self {
9097
Self::builder()
98+
.app_name(options.app_name.clone())
99+
.connect_timeout(options.connect_timeout)
91100
.credential(options.credential.clone())
101+
.event_handler(options.cmap_event_handler.clone())
102+
.max_idle_time(options.max_idle_time)
92103
.max_pool_size(options.max_pool_size)
93104
.min_pool_size(options.min_pool_size)
94-
.max_idle_time(options.max_idle_time)
95-
.wait_queue_timeout(options.wait_queue_timeout)
96105
.tls_options(options.tls_options())
97-
.event_handler(options.cmap_event_handler.clone())
106+
.wait_queue_timeout(options.wait_queue_timeout)
98107
.build()
99108
}
100109
}

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ impl std::ops::Deref for Error {
6262

6363
#[derive(Debug, Error)]
6464
pub enum ErrorKind {
65+
#[error(display = "{}", _0)]
66+
AddrParse(#[error(source)] std::net::AddrParseError),
67+
6568
#[error(
6669
display = "An invalid argument was provided to a database operation: {}",
6770
message

src/event/cmap.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,35 @@ pub struct ConnectionCheckedInEvent {
177177
/// by the driver.
178178
///
179179
/// ```rust
180-
/// # use mongodb::{Client, event::cmap::{CmapEventHandler, ConnectionCheckoutFailedEvent}};
181-
///
182-
/// struct FailedCheckoutLogger {}
180+
/// # use std::sync::Arc;
181+
/// #
182+
/// # use mongodb::{
183+
/// # error::Result,
184+
/// # event::cmap::{
185+
/// # CmapEventHandler,
186+
/// # ConnectionCheckoutFailedEvent
187+
/// # },
188+
/// # options::ClientOptions,
189+
/// # Client,
190+
/// # };
191+
/// #
192+
/// struct FailedCheckoutLogger;
183193
///
184194
/// impl CmapEventHandler for FailedCheckoutLogger {
185195
/// fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) {
186196
/// eprintln!("Failed connection checkout: {:?}", event);
187197
/// }
188198
/// }
189199
///
190-
/// # fn main() {
191-
/// // TODO: Construct client with event handler by using `ClientOptions`.
200+
/// # fn do_stuff() -> Result<()> {
201+
/// let handler: Arc<dyn CmapEventHandler> = Arc::new(FailedCheckoutLogger);
202+
/// let options = ClientOptions::builder()
203+
/// .cmap_event_handler(handler)
204+
/// .build();
205+
/// let client = Client::with_options(options)?;
192206
///
193-
/// // Do things with the client, and failed checkout events will be logged to stderr.
207+
/// // Do things with the client, and failed connection pool checkouts will be logged to stderr.
208+
/// # Ok(())
194209
/// # }
195210
/// ```
196211
pub trait CmapEventHandler: Send + Sync {

0 commit comments

Comments
 (0)