Skip to content

Commit a779891

Browse files
authored
RUST-590 Pause connection pool when server is marked as Unknown (#278)
1 parent 4c03d8c commit a779891

File tree

89 files changed

+2208
-478
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+2208
-478
lines changed

src/client/executor.rs

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,7 @@ impl Client {
8282
) -> Result<T::O> {
8383
let server = self.select_server(op.selection_criteria()).await?;
8484

85-
let mut conn = match server.checkout_connection().await {
86-
Ok(conn) => conn,
87-
Err(err) => {
88-
self.inner
89-
.topology
90-
.handle_pre_handshake_error(err.clone(), server.address.clone())
91-
.await;
92-
return Err(err);
93-
}
94-
};
85+
let mut conn = server.pool.check_out().await?;
9586

9687
let retryability = self.get_retryability(&conn, &op).await?;
9788

@@ -153,15 +144,9 @@ impl Client {
153144
}
154145
};
155146

156-
let mut conn = match server.checkout_connection().await {
157-
Ok(conn) => conn,
158-
Err(err) => {
159-
self.inner
160-
.topology
161-
.handle_pre_handshake_error(err.clone(), server.address.clone())
162-
.await;
163-
return Err(first_error);
164-
}
147+
let mut conn = match server.pool.check_out().await {
148+
Ok(c) => c,
149+
Err(_) => return Err(first_error),
165150
};
166151

167152
let retryability = self.get_retryability(&conn, &op).await?;

src/cmap/connection_requester.rs

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
use tokio::sync::{mpsc, oneshot};
22

33
use super::{worker::PoolWorkerHandle, Connection};
4-
use crate::{
5-
error::{ErrorKind, Result},
6-
options::StreamAddress,
7-
runtime::AsyncJoinHandle,
8-
RUNTIME,
9-
};
4+
use crate::{error::Result, options::StreamAddress, runtime::AsyncJoinHandle, RUNTIME};
105
use std::time::Duration;
116

127
/// Returns a new requester/receiver pair.
@@ -21,7 +16,7 @@ pub(super) fn channel(
2116
sender,
2217
handle,
2318
},
24-
ConnectionRequestReceiver::new(receiver),
19+
ConnectionRequestReceiver { receiver },
2520
)
2621
}
2722

@@ -31,112 +26,87 @@ pub(super) fn channel(
3126
#[derive(Clone, Debug)]
3227
pub(super) struct ConnectionRequester {
3328
address: StreamAddress,
34-
sender: mpsc::UnboundedSender<oneshot::Sender<RequestedConnection>>,
29+
sender: mpsc::UnboundedSender<oneshot::Sender<ConnectionRequestResult>>,
3530
handle: PoolWorkerHandle,
3631
}
3732

3833
impl ConnectionRequester {
3934
/// Request a connection from the pool that owns the receiver end of this requester.
40-
/// Returns an error if it takes longer than wait_queue_timeout before either a connection is
41-
/// received or an establishment begins.
42-
pub(super) async fn request(&self, wait_queue_timeout: Option<Duration>) -> Result<Connection> {
35+
/// Returns None if it takes longer than wait_queue_timeout before the pool returns a result.
36+
pub(super) async fn request(
37+
&self,
38+
wait_queue_timeout: Option<Duration>,
39+
) -> Option<ConnectionRequestResult> {
4340
let (sender, receiver) = oneshot::channel();
4441

4542
// this only errors if the receiver end is dropped, which can't happen because
4643
// we own a handle to the worker, keeping it alive.
4744
self.sender.send(sender).unwrap();
4845

49-
let response = match wait_queue_timeout {
46+
match wait_queue_timeout {
5047
Some(timeout) => RUNTIME
5148
.timeout(timeout, receiver)
5249
.await
5350
.map(|r| r.unwrap()) // see comment below as to why this is safe
54-
.map_err(|_| {
55-
ErrorKind::WaitQueueTimeoutError {
56-
address: self.address.clone(),
57-
}
58-
.into()
59-
}),
51+
.ok(),
6052

6153
// similarly, the receiver only returns an error if the sender is dropped, which
6254
// can't happen due to the handle.
63-
None => Ok(receiver.await.unwrap()),
64-
};
65-
66-
match response {
67-
Ok(RequestedConnection::Pooled(c)) => Ok(c),
68-
Ok(RequestedConnection::Establishing(task)) => task.await,
69-
Err(e) => Err(e),
55+
None => Some(receiver.await.unwrap()),
7056
}
7157
}
7258
}
7359

7460
/// Receiving end of a given ConnectionRequester.
7561
#[derive(Debug)]
7662
pub(super) struct ConnectionRequestReceiver {
77-
receiver: mpsc::UnboundedReceiver<oneshot::Sender<RequestedConnection>>,
78-
cache: Option<ConnectionRequest>,
63+
receiver: mpsc::UnboundedReceiver<oneshot::Sender<ConnectionRequestResult>>,
7964
}
8065

8166
impl ConnectionRequestReceiver {
82-
pub(super) fn new(
83-
receiver: mpsc::UnboundedReceiver<oneshot::Sender<RequestedConnection>>,
84-
) -> Self {
85-
Self {
86-
receiver,
87-
cache: None,
88-
}
89-
}
90-
9167
pub(super) async fn recv(&mut self) -> Option<ConnectionRequest> {
92-
match self.cache.take() {
93-
Some(request) => Some(request),
94-
None => self
95-
.receiver
96-
.recv()
97-
.await
98-
.map(|sender| ConnectionRequest { sender }),
99-
}
100-
}
101-
102-
/// Put a request back into the receiver. Next call to `recv` will immediately
103-
/// return this value.
104-
pub(super) fn cache_request(&mut self, request: ConnectionRequest) {
105-
self.cache = Some(request);
68+
self.receiver
69+
.recv()
70+
.await
71+
.map(|sender| ConnectionRequest { sender })
10672
}
10773
}
10874

10975
/// Struct encapsulating a request for a connection.
11076
#[derive(Debug)]
11177
pub(super) struct ConnectionRequest {
112-
sender: oneshot::Sender<RequestedConnection>,
78+
sender: oneshot::Sender<ConnectionRequestResult>,
11379
}
11480

11581
impl ConnectionRequest {
11682
/// Respond to the connection request, either with a pooled connection or one that is
11783
/// establishing asynchronously.
11884
pub(super) fn fulfill(
11985
self,
120-
conn: RequestedConnection,
121-
) -> std::result::Result<(), RequestedConnection> {
122-
self.sender.send(conn)
86+
result: ConnectionRequestResult,
87+
) -> std::result::Result<(), ConnectionRequestResult> {
88+
self.sender.send(result)
12389
}
12490
}
12591

12692
#[derive(Debug)]
127-
pub(super) enum RequestedConnection {
93+
pub(super) enum ConnectionRequestResult {
12894
/// A connection that was already established and was simply checked out of the pool.
12995
Pooled(Connection),
13096

13197
/// A new connection in the process of being established.
13298
/// The handle can be awaited upon to receive the established connection.
13399
Establishing(AsyncJoinHandle<Result<Connection>>),
100+
101+
/// The request was rejected because the pool was cleared before it could
102+
/// be fulfilled.
103+
PoolCleared,
134104
}
135105

136-
impl RequestedConnection {
106+
impl ConnectionRequestResult {
137107
pub(super) fn unwrap_pooled_connection(self) -> Connection {
138108
match self {
139-
RequestedConnection::Pooled(c) => c,
109+
ConnectionRequestResult::Pooled(c) => c,
140110
_ => panic!("attempted to unwrap pooled connection when was establishing"),
141111
}
142112
}

src/cmap/manager.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use tokio::sync::mpsc;
22

33
use super::Connection;
4-
use crate::error::Error;
4+
use crate::runtime::AcknowledgedMessage;
55

66
pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
77
let (sender, receiver) = mpsc::unbounded_channel();
@@ -24,6 +24,20 @@ impl PoolManager {
2424
let _ = self.sender.send(PoolManagementRequest::Clear);
2525
}
2626

27+
/// Mark the pool as "ready" as per the CMAP specification.
28+
pub(super) async fn mark_as_ready(&self) {
29+
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
30+
if self
31+
.sender
32+
.send(PoolManagementRequest::MarkAsReady {
33+
completion_handler: message,
34+
})
35+
.is_ok()
36+
{
37+
acknowledgment_receiver.wait_for_acknowledgment().await;
38+
}
39+
}
40+
2741
/// Check in the given connection to the pool.
2842
/// This returns an error containing the connection if the pool has been dropped already.
2943
pub(crate) fn check_in(&self, connection: Connection) -> std::result::Result<(), Connection> {
@@ -35,10 +49,10 @@ impl PoolManager {
3549
}
3650

3751
/// Notify the pool that establishing a connection failed.
38-
pub(super) fn handle_connection_failed(&self, error: Error) {
52+
pub(super) fn handle_connection_failed(&self) {
3953
let _ = self
4054
.sender
41-
.send(PoolManagementRequest::HandleConnectionFailed(error));
55+
.send(PoolManagementRequest::HandleConnectionFailed);
4256
}
4357

4458
/// Notify the pool that establishing a connection succeeded.
@@ -62,9 +76,22 @@ impl ManagementRequestReceiver {
6276

6377
#[derive(Debug)]
6478
pub(super) enum PoolManagementRequest {
79+
/// Clear the pool, transitioning it to Paused.
6580
Clear,
81+
82+
/// Mark the pool as Ready, allowing connections to be created and checked out.
83+
MarkAsReady {
84+
completion_handler: AcknowledgedMessage<()>,
85+
},
86+
87+
/// Check in the given connection.
6688
CheckIn(Connection),
67-
HandleConnectionFailed(Error),
89+
90+
/// Update the pool based on the given establishment error.
91+
HandleConnectionFailed,
92+
93+
/// Update the pool after a successful connection, optionally populating the pool
94+
/// with the successful connection.
6895
HandleConnectionSucceeded(Option<Connection>),
6996
}
7097

src/cmap/mod.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
#[cfg(test)]
2-
mod test;
2+
pub(crate) mod test;
33

44
pub(crate) mod conn;
55
mod connection_requester;
66
mod establish;
77
mod manager;
88
pub(crate) mod options;
9+
mod status;
910
mod worker;
1011

1112
use std::{sync::Arc, time::Duration};
1213

1314
use derivative::Derivative;
1415

1516
pub use self::conn::ConnectionInfo;
16-
use self::options::ConnectionPoolOptions;
1717
pub(crate) use self::{
1818
conn::{Command, CommandResponse, Connection, StreamDescription},
1919
establish::handshake::{is_master, Handshaker},
20+
status::PoolGenerationSubscriber,
2021
};
22+
use self::{connection_requester::ConnectionRequestResult, options::ConnectionPoolOptions};
2123
use crate::{
22-
error::{ErrorKind, Result},
24+
error::{Error, ErrorKind, Result},
2325
event::cmap::{
2426
CmapEventHandler,
2527
ConnectionCheckoutFailedEvent,
@@ -29,6 +31,7 @@ use crate::{
2931
},
3032
options::StreamAddress,
3133
runtime::HttpClient,
34+
sdam::ServerUpdateSender,
3235
};
3336
use connection_requester::ConnectionRequester;
3437
use manager::PoolManager;
@@ -47,6 +50,7 @@ pub(crate) struct ConnectionPool {
4750
address: StreamAddress,
4851
manager: PoolManager,
4952
connection_requester: ConnectionRequester,
53+
generation_subscriber: PoolGenerationSubscriber,
5054

5155
wait_queue_timeout: Option<Duration>,
5256

@@ -58,10 +62,15 @@ impl ConnectionPool {
5862
pub(crate) fn new(
5963
address: StreamAddress,
6064
http_client: HttpClient,
65+
server_updater: ServerUpdateSender,
6166
options: Option<ConnectionPoolOptions>,
6267
) -> Self {
63-
let (manager, connection_requester) =
64-
ConnectionPoolWorker::start(address.clone(), http_client, options.clone());
68+
let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
69+
address.clone(),
70+
http_client,
71+
server_updater,
72+
options.clone(),
73+
);
6574

6675
let event_handler = options.as_ref().and_then(|opts| opts.event_handler.clone());
6776
let wait_queue_timeout = options.as_ref().and_then(|opts| opts.wait_queue_timeout);
@@ -79,6 +88,7 @@ impl ConnectionPool {
7988
connection_requester,
8089
wait_queue_timeout,
8190
event_handler,
91+
generation_subscriber,
8292
}
8393
}
8494

@@ -87,11 +97,13 @@ impl ConnectionPool {
8797
let (manager, _) = manager::channel();
8898
let handle = PoolWorkerHandle::new_mocked();
8999
let (connection_requester, _) = connection_requester::channel(Default::default(), handle);
100+
let (_, generation_subscriber) = status::channel();
90101

91102
Self {
92103
address,
93104
manager,
94105
connection_requester,
106+
generation_subscriber,
95107
wait_queue_timeout: None,
96108
event_handler: None,
97109
}
@@ -120,11 +132,23 @@ impl ConnectionPool {
120132
handler.handle_connection_checkout_started_event(event);
121133
});
122134

123-
let conn = self
135+
let response = self
124136
.connection_requester
125137
.request(self.wait_queue_timeout)
126138
.await;
127139

140+
let conn = match response {
141+
Some(ConnectionRequestResult::Pooled(c)) => Ok(c),
142+
Some(ConnectionRequestResult::Establishing(task)) => task.await,
143+
Some(ConnectionRequestResult::PoolCleared) => {
144+
Err(Error::pool_cleared_error(&self.address))
145+
}
146+
None => Err(ErrorKind::WaitQueueTimeoutError {
147+
address: self.address.clone(),
148+
}
149+
.into()),
150+
};
151+
128152
match conn {
129153
Ok(ref conn) => {
130154
self.emit_event(|handler| {
@@ -156,4 +180,17 @@ impl ConnectionPool {
156180
pub(crate) fn clear(&self) {
157181
self.manager.clear();
158182
}
183+
184+
/// Mark the pool as "ready", allowing connections to be created and checked out.
185+
pub(crate) async fn mark_as_ready(&self) {
186+
self.manager.mark_as_ready().await;
187+
}
188+
189+
/// Subscribe to updates to the pool's generation.
190+
///
191+
/// This can be used to listen for errors that occur during connection
192+
/// establishment or to get the current generation of the pool.
193+
pub(crate) fn subscribe_to_generation_updates(&self) -> PoolGenerationSubscriber {
194+
self.generation_subscriber.clone()
195+
}
159196
}

0 commit comments

Comments
 (0)