Skip to content

Commit ab01ac0

Browse files
authored
RUST-695 Retry ConnectionPoolCleared errors (#365)
1 parent cce91ce commit ab01ac0

File tree

13 files changed

+339
-56
lines changed

13 files changed

+339
-56
lines changed

src/client/executor.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,12 @@ impl Client {
131131
Ok(conn) => conn,
132132
Err(mut err) => {
133133
err.add_labels(None, &session, None)?;
134-
return Err(err);
134+
135+
if err.is_pool_cleared() {
136+
return self.execute_retry(&mut op, &mut session, None, err).await;
137+
} else {
138+
return Err(err);
139+
}
135140
}
136141
};
137142

@@ -151,7 +156,7 @@ impl Client {
151156
None => None,
152157
};
153158

154-
let first_error = match self
159+
match self
155160
.execute_operation_on_connection(
156161
&op,
157162
&mut conn,
@@ -161,9 +166,7 @@ impl Client {
161166
)
162167
.await
163168
{
164-
Ok(result) => {
165-
return Ok(result);
166-
}
169+
Ok(result) => Ok(result),
167170
Err(mut err) => {
168171
// Retryable writes are only supported by storage engines with document-level
169172
// locking, so users need to disable retryable writes if using mmapv1.
@@ -194,13 +197,22 @@ impl Client {
194197
if retryability == Retryability::Read && err.is_read_retryable()
195198
|| retryability == Retryability::Write && err.is_write_retryable()
196199
{
197-
err
200+
self.execute_retry(&mut op, &mut session, txn_number, err)
201+
.await
198202
} else {
199-
return Err(err);
203+
Err(err)
200204
}
201205
}
202-
};
206+
}
207+
}
203208

209+
async fn execute_retry<T: Operation>(
210+
&self,
211+
op: &mut T,
212+
session: &mut Option<&mut ClientSession>,
213+
txn_number: Option<u64>,
214+
first_error: Error,
215+
) -> Result<T::O> {
204216
let server = match self.select_server(op.selection_criteria()).await {
205217
Ok(server) => server,
206218
Err(_) => {
@@ -213,21 +225,15 @@ impl Client {
213225
Err(_) => return Err(first_error),
214226
};
215227

216-
let retryability = self.get_retryability(&conn, &op, &session).await?;
228+
let retryability = self.get_retryability(&conn, op, &session).await?;
217229
if retryability == Retryability::None {
218230
return Err(first_error);
219231
}
220232

221233
op.update_for_retry();
222234

223235
match self
224-
.execute_operation_on_connection(
225-
&op,
226-
&mut conn,
227-
&mut session,
228-
txn_number,
229-
&retryability,
230-
)
236+
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
231237
.await
232238
{
233239
Ok(result) => Ok(result),

src/cmap/connection_requester.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use tokio::sync::{mpsc, oneshot};
22

33
use super::{worker::PoolWorkerHandle, Connection};
4-
use crate::{error::Result, options::ServerAddress, runtime::AsyncJoinHandle};
4+
use crate::{
5+
error::{Error, Result},
6+
options::ServerAddress,
7+
runtime::AsyncJoinHandle,
8+
};
59

610
/// Returns a new requester/receiver pair.
711
pub(super) fn channel(
@@ -86,8 +90,8 @@ pub(super) enum ConnectionRequestResult {
8690
Establishing(AsyncJoinHandle<Result<Connection>>),
8791

8892
/// The request was rejected because the pool was cleared before it could
89-
/// be fulfilled.
90-
PoolCleared,
93+
/// be fulfilled. The error that caused the pool to be cleared is returned.
94+
PoolCleared(Error),
9195
}
9296

9397
impl ConnectionRequestResult {

src/cmap/manager.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use tokio::sync::mpsc;
22

33
use super::Connection;
4-
use crate::runtime::AcknowledgedMessage;
4+
use crate::{error::Error, runtime::AcknowledgedMessage};
55

66
pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
77
let (sender, receiver) = mpsc::unbounded_channel();
@@ -20,11 +20,14 @@ pub(super) struct PoolManager {
2020

2121
impl PoolManager {
2222
/// Lazily clear the pool.
23-
pub(super) async fn clear(&self) {
23+
pub(super) async fn clear(&self, cause: Error) {
2424
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
2525
if self
2626
.sender
27-
.send(PoolManagementRequest::Clear(message))
27+
.send(PoolManagementRequest::Clear {
28+
completion_handler: message,
29+
cause,
30+
})
2831
.is_ok()
2932
{
3033
acknowledgment_receiver.wait_for_acknowledgment().await;
@@ -84,7 +87,10 @@ impl ManagementRequestReceiver {
8487
#[derive(Debug)]
8588
pub(super) enum PoolManagementRequest {
8689
/// Clear the pool, transitioning it to Paused.
87-
Clear(AcknowledgedMessage<()>),
90+
Clear {
91+
completion_handler: AcknowledgedMessage<()>,
92+
cause: Error,
93+
},
8894

8995
/// Mark the pool as Ready, allowing connections to be created and checked out.
9096
MarkAsReady {

src/cmap/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ impl ConnectionPool {
130130
let conn = match response {
131131
ConnectionRequestResult::Pooled(c) => Ok(c),
132132
ConnectionRequestResult::Establishing(task) => task.await,
133-
ConnectionRequestResult::PoolCleared => Err(Error::pool_cleared_error(&self.address)),
133+
ConnectionRequestResult::PoolCleared(e) => {
134+
Err(Error::pool_cleared_error(&self.address, &e))
135+
}
134136
};
135137

136138
match conn {
@@ -154,8 +156,8 @@ impl ConnectionPool {
154156

155157
/// Increments the generation of the pool. Rather than eagerly removing stale connections from
156158
/// the pool, they are left for the background thread to clean up.
157-
pub(crate) async fn clear(&self) {
158-
self.manager.clear().await
159+
pub(crate) async fn clear(&self, cause: Error) {
160+
self.manager.clear(cause).await
159161
}
160162

161163
/// Mark the pool as "ready", allowing connections to be created and checked out.

src/cmap/test/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use self::{
1313

1414
use crate::{
1515
cmap::{Connection, ConnectionPool, ConnectionPoolOptions},
16-
error::{Error, Result},
16+
error::{Error, ErrorKind, Result},
1717
event::cmap::ConnectionPoolOptions as EventOptions,
1818
options::TlsOptions,
1919
runtime::AsyncJoinHandle,
@@ -168,7 +168,7 @@ impl Executor {
168168
RUNTIME.execute(async move {
169169
while let Some(update) = update_receiver.recv().await {
170170
match update.into_message() {
171-
ServerUpdate::Error { .. } => manager.clear().await,
171+
ServerUpdate::Error { error, .. } => manager.clear(error).await,
172172
}
173173
}
174174
});
@@ -285,7 +285,13 @@ impl Operation {
285285
}
286286
Operation::Clear => {
287287
if let Some(pool) = state.pool.read().await.as_ref() {
288-
pool.clear().await;
288+
pool.clear(
289+
ErrorKind::Internal {
290+
message: "test error".to_string(),
291+
}
292+
.into(),
293+
)
294+
.await;
289295
}
290296
}
291297
Operation::Ready => {

src/cmap/worker.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::{
1919
DEFAULT_MAX_POOL_SIZE,
2020
};
2121
use crate::{
22-
error::Result,
22+
error::{Error, ErrorKind, Result},
2323
event::cmap::{
2424
CmapEventHandler,
2525
ConnectionClosedEvent,
@@ -172,7 +172,7 @@ impl ConnectionPoolWorker {
172172
{
173173
PoolState::Ready
174174
} else {
175-
PoolState::Paused
175+
PoolState::New
176176
};
177177
#[cfg(test)]
178178
let maintenance_frequency = options
@@ -181,7 +181,7 @@ impl ConnectionPoolWorker {
181181
.unwrap_or(MAINTENACE_FREQUENCY);
182182

183183
#[cfg(not(test))]
184-
let (state, maintenance_frequency) = (PoolState::Paused, MAINTENACE_FREQUENCY);
184+
let (state, maintenance_frequency) = (PoolState::New, MAINTENACE_FREQUENCY);
185185

186186
let worker = ConnectionPoolWorker {
187187
address,
@@ -244,16 +244,27 @@ impl ConnectionPoolWorker {
244244
PoolState::Ready => {
245245
self.wait_queue.push_back(request);
246246
}
247-
PoolState::Paused => {
247+
PoolState::Paused(ref e) => {
248248
// if receiver doesn't listen to error that's ok.
249-
let _ = request.fulfill(ConnectionRequestResult::PoolCleared);
249+
let _ = request.fulfill(ConnectionRequestResult::PoolCleared(e.clone()));
250+
}
251+
PoolState::New => {
252+
let _ = request.fulfill(ConnectionRequestResult::PoolCleared(
253+
ErrorKind::Internal {
254+
message: "check out attempted from new pool".to_string(),
255+
}
256+
.into(),
257+
));
250258
}
251259
},
252260
PoolTask::HandleManagementRequest(PoolManagementRequest::CheckIn(connection)) => {
253261
self.check_in(connection)
254262
}
255-
PoolTask::HandleManagementRequest(PoolManagementRequest::Clear(_message)) => {
256-
self.clear();
263+
PoolTask::HandleManagementRequest(PoolManagementRequest::Clear {
264+
completion_handler: _,
265+
cause,
266+
}) => {
267+
self.clear(cause);
257268
}
258269
PoolTask::HandleManagementRequest(PoolManagementRequest::MarkAsReady {
259270
completion_handler: _handler,
@@ -428,12 +439,12 @@ impl ConnectionPoolWorker {
428439
}
429440
}
430441

431-
fn clear(&mut self) {
442+
fn clear(&mut self, cause: Error) {
432443
self.generation += 1;
433-
let previous_state = std::mem::replace(&mut self.state, PoolState::Paused);
444+
let previous_state = std::mem::replace(&mut self.state, PoolState::Paused(cause.clone()));
434445
self.generation_publisher.publish(self.generation);
435446

436-
if !matches!(previous_state, PoolState::Paused) {
447+
if matches!(previous_state, PoolState::Ready) {
437448
self.emit_event(|handler| {
438449
let event = PoolClearedEvent {
439450
address: self.address.clone(),
@@ -446,7 +457,7 @@ impl ConnectionPoolWorker {
446457
// an error means the other end hung up already, which is okay because we were
447458
// returning an error anyways
448459
let _: std::result::Result<_, _> =
449-
request.fulfill(ConnectionRequestResult::PoolCleared);
460+
request.fulfill(ConnectionRequestResult::PoolCleared(cause.clone()));
450461
}
451462
}
452463
}
@@ -583,8 +594,11 @@ async fn establish_connection(
583594
/// once it goes out of scope and cannot be manually closed before then.
584595
#[derive(Debug)]
585596
enum PoolState {
597+
/// Same as Paused, but only for a new pool, not one that has been cleared due to an error.
598+
New,
599+
586600
/// Connections may not be checked out nor created in the background to satisfy minPoolSize.
587-
Paused,
601+
Paused(Error),
588602

589603
/// Pool is operational.
590604
Ready,

src/error.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,12 @@ impl Error {
5757
}
5858
}
5959

60-
pub(crate) fn pool_cleared_error(address: &ServerAddress) -> Self {
60+
pub(crate) fn pool_cleared_error(address: &ServerAddress, cause: &Error) -> Self {
6161
ErrorKind::ConnectionPoolCleared {
6262
message: format!(
63-
"Connection pool for {} cleared during operation execution",
64-
address
63+
"Connection pool for {} cleared because another operation failed with: {}",
64+
address,
65+
cause
6566
),
6667
}
6768
.into()
@@ -292,6 +293,10 @@ impl Error {
292293
.map(|code| SHUTTING_DOWN_CODES.contains(&code))
293294
.unwrap_or(false)
294295
}
296+
297+
pub(crate) fn is_pool_cleared(&self) -> bool {
298+
matches!(self.kind.as_ref(), ErrorKind::ConnectionPoolCleared { .. })
299+
}
295300
}
296301

297302
impl<E> From<E> for Error

src/sdam/state/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl Topology {
257257
.await;
258258

259259
if updated && (error.is_shutting_down() || handshake.wire_version().unwrap_or(0) < 8) {
260-
server.pool.clear().await;
260+
server.pool.clear(error).await;
261261
}
262262
self.request_topology_check();
263263

@@ -272,7 +272,7 @@ impl Topology {
272272
.mark_server_as_unknown(error.to_string(), server, state_lock)
273273
.await;
274274
if updated {
275-
server.pool.clear().await;
275+
server.pool.clear(error).await;
276276
}
277277
updated
278278
} else {
@@ -286,7 +286,7 @@ impl Topology {
286286
.mark_server_as_unknown(error.to_string(), server, state_lock)
287287
.await;
288288
if updated {
289-
server.pool.clear().await;
289+
server.pool.clear(error).await;
290290
}
291291
updated
292292
}

0 commit comments

Comments
 (0)