Skip to content

Commit 8fd04d2

Browse files
authored
RUST-655 / RUST-691 Simplification of SDAM error handling (#312)
1 parent de58ec2 commit 8fd04d2

File tree

186 files changed

+16310
-256
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

186 files changed

+16310
-256
lines changed

src/client/executor.rs

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
1313
operation::{Operation, Retryability},
1414
options::SelectionCriteria,
15-
sdam::{SelectedServer, SessionSupportStatus},
15+
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus},
1616
};
1717

1818
lazy_static! {
@@ -100,30 +100,36 @@ impl Client {
100100
return Ok(result);
101101
}
102102
Err(err) => {
103-
self.inner
104-
.topology
105-
.handle_post_handshake_error(&err, &conn, server)
106-
.await;
103+
// For a pre-4.4 connection, an error label should be added to any write-retryable
104+
// error as long as the retry_writes client option is not set to false. For a 4.4+
105+
// connection, a label should be added only to network errors.
106+
let mut err = match retryability {
107+
Retryability::Write => get_error_with_retryable_write_label(&conn, err).await?,
108+
_ => err,
109+
};
107110

108111
// Retryable writes are only supported by storage engines with document-level
109112
// locking, so users need to disable retryable writes if using mmapv1.
110-
if let ErrorKind::CommandError(ref err) = err.kind {
111-
if err.code == 20 && err.message.starts_with("Transaction numbers") {
112-
let mut err = err.clone();
113-
err.message = "This MongoDB deployment does not support retryable writes. \
114-
Please add retryWrites=false to your connection string."
113+
if let ErrorKind::CommandError(ref mut command_error) = err.kind {
114+
if command_error.code == 20
115+
&& command_error.message.starts_with("Transaction numbers")
116+
{
117+
command_error.message = "This MongoDB deployment does not support \
118+
retryable writes. Please add retryWrites=false \
119+
to your connection string."
115120
.to_string();
116-
return Err(ErrorKind::CommandError(err).into());
117121
}
118122
}
119123

120-
// For a pre-4.4 connection, an error label should be added to any write-retryable
121-
// error as long as the retry_writes client option is not set to false. For a 4.4+
122-
// connection, a label should be added only to network errors.
123-
let err = match retryability {
124-
Retryability::Write => get_error_with_retryable_write_label(&conn, err).await?,
125-
_ => err,
126-
};
124+
self.inner
125+
.topology
126+
.handle_application_error(
127+
err.clone(),
128+
HandshakePhase::after_completion(conn),
129+
&server,
130+
)
131+
.await;
132+
drop(server);
127133

128134
// TODO RUST-90: Do not retry read if session is in a transaction
129135
if retryability == Retryability::Read && err.is_read_retryable()
@@ -159,15 +165,19 @@ impl Client {
159165
{
160166
Ok(result) => Ok(result),
161167
Err(err) => {
162-
self.inner
163-
.topology
164-
.handle_post_handshake_error(&err, &conn, server)
165-
.await;
166-
167168
let err = match retryability {
168169
Retryability::Write => get_error_with_retryable_write_label(&conn, err).await?,
169170
_ => err,
170171
};
172+
self.inner
173+
.topology
174+
.handle_application_error(
175+
err.clone(),
176+
HandshakePhase::after_completion(conn),
177+
&server,
178+
)
179+
.await;
180+
drop(server);
171181

172182
if err.is_server_error() || err.is_read_retryable() || err.is_write_retryable() {
173183
Err(err)

src/cmap/conn/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct ConnectionInfo {
4646
pub(crate) struct Connection {
4747
pub(super) id: u32,
4848
pub(super) address: StreamAddress,
49-
pub(super) generation: u32,
49+
pub(crate) generation: u32,
5050

5151
/// The cached StreamDescription from the connection's handshake.
5252
pub(super) stream_description: Option<StreamDescription>,

src/cmap/manager.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@ pub(super) struct PoolManager {
2020

2121
impl PoolManager {
2222
/// Lazily clear the pool.
23-
pub(super) fn clear(&self) {
24-
let _ = self.sender.send(PoolManagementRequest::Clear);
23+
pub(super) async fn clear(&self) {
24+
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
25+
if self
26+
.sender
27+
.send(PoolManagementRequest::Clear(message))
28+
.is_ok()
29+
{
30+
acknowledgment_receiver.wait_for_acknowledgment().await;
31+
}
2532
}
2633

2734
/// Mark the pool as "ready" as per the CMAP specification.
@@ -77,7 +84,7 @@ impl ManagementRequestReceiver {
7784
#[derive(Debug)]
7885
pub(super) enum PoolManagementRequest {
7986
/// Clear the pool, transitioning it to Paused.
80-
Clear,
87+
Clear(AcknowledgedMessage<()>),
8188

8289
/// Mark the pool as Ready, allowing connections to be created and checked out.
8390
MarkAsReady {

src/cmap/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,20 +176,16 @@ impl ConnectionPool {
176176

177177
/// Increments the generation of the pool. Rather than eagerly removing stale connections from
178178
/// the pool, they are left for the background thread to clean up.
179-
pub(crate) fn clear(&self) {
180-
self.manager.clear();
179+
pub(crate) async fn clear(&self) {
180+
self.manager.clear().await
181181
}
182182

183183
/// Mark the pool as "ready", allowing connections to be created and checked out.
184184
pub(crate) async fn mark_as_ready(&self) {
185185
self.manager.mark_as_ready().await;
186186
}
187187

188-
/// Subscribe to updates to the pool's generation.
189-
///
190-
/// This can be used to listen for errors that occur during connection
191-
/// establishment or to get the current generation of the pool.
192-
pub(crate) fn subscribe_to_generation_updates(&self) -> PoolGenerationSubscriber {
193-
self.generation_subscriber.clone()
188+
pub(crate) fn generation(&self) -> u32 {
189+
self.generation_subscriber.generation()
194190
}
195191
}

src/cmap/status.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,4 @@ impl PoolGenerationSubscriber {
4949
pub(crate) fn generation(&self) -> u32 {
5050
self.receiver.borrow().generation
5151
}
52-
53-
#[cfg(test)]
54-
pub(crate) async fn wait_for_generation_change(
55-
&mut self,
56-
timeout: std::time::Duration,
57-
) -> Option<u32> {
58-
crate::RUNTIME
59-
.timeout(timeout, self.receiver.changed())
60-
.await
61-
.ok()?
62-
.ok()?;
63-
64-
Some(self.receiver.borrow().generation)
65-
}
6652
}

src/cmap/test/mod.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl Executor {
164164
RUNTIME.execute(async move {
165165
while let Some(update) = update_receiver.recv().await {
166166
match update.into_message() {
167-
ServerUpdate::Error { .. } => manager.clear(),
167+
ServerUpdate::Error { .. } => manager.clear().await,
168168
}
169169
}
170170
});
@@ -284,13 +284,7 @@ impl Operation {
284284
}
285285
Operation::Clear => {
286286
if let Some(pool) = state.pool.read().await.as_ref() {
287-
let mut subscriber = pool.subscribe_to_generation_updates();
288-
pool.clear();
289-
290-
subscriber
291-
.wait_for_generation_change(EVENT_TIMEOUT)
292-
.await
293-
.expect("generation did not change after clearing pool");
287+
pool.clear().await;
294288
}
295289
}
296290
Operation::Ready => {

src/cmap/worker.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,9 @@ impl ConnectionPoolWorker {
252252
PoolTask::HandleManagementRequest(PoolManagementRequest::CheckIn(connection)) => {
253253
self.check_in(connection)
254254
}
255-
PoolTask::HandleManagementRequest(PoolManagementRequest::Clear) => self.clear(),
255+
PoolTask::HandleManagementRequest(PoolManagementRequest::Clear(_message)) => {
256+
self.clear();
257+
}
256258
PoolTask::HandleManagementRequest(PoolManagementRequest::MarkAsReady {
257259
completion_handler: _handler,
258260
}) => {
@@ -562,7 +564,7 @@ async fn establish_connection(
562564
};
563565
handler.handle_connection_closed_event(event);
564566
}
565-
server_updater.handle_error(&e, generation).await;
567+
server_updater.handle_error(e.clone(), generation).await;
566568
manager.handle_connection_failed();
567569
}
568570
Ok(ref mut connection) => {

src/error.rs

Lines changed: 70 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use thiserror::Error;
88
use crate::{bson::Document, options::StreamAddress};
99

1010
const RECOVERING_CODES: [i32; 5] = [11600, 11602, 13436, 189, 91];
11-
const NOTMASTER_CODES: [i32; 2] = [10107, 13435];
11+
const NOTMASTER_CODES: [i32; 3] = [10107, 13435, 10058];
1212
const SHUTTING_DOWN_CODES: [i32; 2] = [11600, 91];
1313
const RETRYABLE_READ_CODES: [i32; 11] =
1414
[11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001];
@@ -61,6 +61,22 @@ impl Error {
6161
Error::authentication_error(mechanism_name, "invalid server response")
6262
}
6363

64+
pub(crate) fn is_state_change_error(&self) -> bool {
65+
self.is_recovering() || self.is_not_master()
66+
}
67+
68+
pub(crate) fn is_auth_error(&self) -> bool {
69+
matches!(self.kind, ErrorKind::AuthenticationError { .. })
70+
}
71+
72+
pub(crate) fn is_command_error(&self) -> bool {
73+
matches!(self.kind, ErrorKind::CommandError(_))
74+
}
75+
76+
pub(crate) fn is_network_timeout(&self) -> bool {
77+
matches!(self.kind, ErrorKind::Io(ref io_err) if io_err.kind() == std::io::ErrorKind::TimedOut)
78+
}
79+
6480
/// Whether this error is an "ns not found" error or not.
6581
pub(crate) fn is_ns_not_found(&self) -> bool {
6682
matches!(self.kind, ErrorKind::CommandError(ref err) if err.code == 26)
@@ -71,15 +87,9 @@ impl Error {
7187
if self.is_network_error() {
7288
return true;
7389
}
74-
match &self.kind.code_and_message() {
75-
Some((code, message)) => {
76-
if RETRYABLE_READ_CODES.contains(&code) {
77-
return true;
78-
}
79-
if is_not_master(*code, message) || is_recovering(*code, message) {
80-
return true;
81-
}
82-
false
90+
match &self.kind.code() {
91+
Some(code) => {
92+
RETRYABLE_READ_CODES.contains(&code)
8393
}
8494
None => false,
8595
}
@@ -100,8 +110,8 @@ impl Error {
100110
if self.is_network_error() {
101111
return true;
102112
}
103-
match &self.kind.code_and_message() {
104-
Some((code, _)) => RETRYABLE_WRITE_CODES.contains(&code),
113+
match &self.kind.code() {
114+
Some(code) => RETRYABLE_WRITE_CODES.contains(&code),
105115
None => false,
106116
}
107117
}
@@ -363,19 +373,48 @@ impl ErrorKind {
363373
)
364374
}
365375

366-
/// Gets the code/message tuple from this error, if applicable. In the case of write errors, the
367-
/// code and message are taken from the write concern error, if there is one.
368-
pub(crate) fn code_and_message(&self) -> Option<(i32, &str)> {
376+
/// Gets the code from this error for performing SDAM updates, if applicable.
377+
/// Any codes contained in WriteErrors are ignored.
378+
pub(crate) fn code(&self) -> Option<i32> {
369379
match self {
370-
ErrorKind::CommandError(ref cmd_err) => Some((cmd_err.code, cmd_err.message.as_str())),
371-
ErrorKind::WriteError(WriteFailure::WriteConcernError(ref wc_err)) => {
372-
Some((wc_err.code, wc_err.message.as_str()))
380+
ErrorKind::CommandError(command_error) => {
381+
Some(command_error.code)
382+
},
383+
// According to SDAM spec, write concern error codes MUST also be checked, and writeError codes
384+
// MUST NOT be checked.
385+
ErrorKind::BulkWriteError(BulkWriteFailure { write_concern_error: Some(wc_error), .. }) => {
386+
Some(wc_error.code)
373387
}
374-
ErrorKind::BulkWriteError(ref bwe) => bwe
375-
.write_concern_error
376-
.as_ref()
377-
.map(|wc_err| (wc_err.code, wc_err.message.as_str())),
378-
_ => None,
388+
ErrorKind::WriteError(WriteFailure::WriteConcernError(wc_error)) => Some(wc_error.code),
389+
_ => None
390+
}
391+
}
392+
393+
/// Gets the server's message for this error, if applicable, for use in testing.
394+
/// If this error is a BulkWriteError, the messages are concatenated.
395+
#[cfg(test)]
396+
pub(crate) fn server_message(&self) -> Option<String> {
397+
match self {
398+
ErrorKind::CommandError(command_error) => {
399+
Some(command_error.message.clone())
400+
},
401+
// since this is used primarily for errorMessageContains assertions in the unified runner, we just
402+
// concatenate all the relevant server messages into one for bulk errors.
403+
ErrorKind::BulkWriteError(BulkWriteFailure { write_concern_error, write_errors }) => {
404+
let mut msg = "".to_string();
405+
if let Some(wc_error) = write_concern_error {
406+
msg.push_str(wc_error.message.as_str());
407+
}
408+
if let Some(write_errors) = write_errors {
409+
for we in write_errors {
410+
msg.push_str(we.message.as_str());
411+
}
412+
}
413+
Some(msg)
414+
}
415+
ErrorKind::WriteError(WriteFailure::WriteConcernError(wc_error)) => Some(wc_error.message.clone()),
416+
ErrorKind::WriteError(WriteFailure::WriteError(write_error)) => Some(write_error.message.clone()),
417+
_ => None
379418
}
380419
}
381420

@@ -398,42 +437,24 @@ impl ErrorKind {
398437

399438
/// If this error corresponds to a "not master" error as per the SDAM spec.
400439
pub(crate) fn is_not_master(&self) -> bool {
401-
self.code_and_message()
402-
.map(|(code, msg)| is_not_master(code, msg))
403-
.unwrap_or(false)
440+
self.code().map(|code| NOTMASTER_CODES.contains(&code)).unwrap_or(false)
404441
}
405442

406443
/// If this error corresponds to a "node is recovering" error as per the SDAM spec.
407444
pub(crate) fn is_recovering(&self) -> bool {
408-
self.code_and_message()
409-
.map(|(code, msg)| is_recovering(code, msg))
445+
self.code()
446+
.map(|code| RECOVERING_CODES.contains(&code))
410447
.unwrap_or(false)
411448
}
412449

413450
/// If this error corresponds to a "node is shutting down" error as per the SDAM spec.
414451
pub(crate) fn is_shutting_down(&self) -> bool {
415-
self.code_and_message()
416-
.map(|(code, _)| SHUTTING_DOWN_CODES.contains(&code))
452+
self.code()
453+
.map(|code| SHUTTING_DOWN_CODES.contains(&code))
417454
.unwrap_or(false)
418455
}
419456
}
420457

421-
fn is_not_master(code: i32, message: &str) -> bool {
422-
if NOTMASTER_CODES.contains(&code) {
423-
return true;
424-
} else if is_recovering(code, message) {
425-
return false;
426-
}
427-
message.contains("not master")
428-
}
429-
430-
fn is_recovering(code: i32, message: &str) -> bool {
431-
if RECOVERING_CODES.contains(&code) {
432-
return true;
433-
}
434-
message.contains("not master or secondary") || message.contains("node is recovering")
435-
}
436-
437458
/// An error that occurred due to a database command failing.
438459
#[derive(Clone, Debug, Deserialize)]
439460
#[non_exhaustive]
@@ -508,6 +529,7 @@ pub struct WriteError {
508529
#[non_exhaustive]
509530
pub struct BulkWriteError {
510531
/// Index into the list of operations that this error corresponds to.
532+
#[serde(default)]
511533
pub index: usize,
512534

513535
/// Identifies the type of write concern error.
@@ -526,7 +548,8 @@ pub struct BulkWriteError {
526548
}
527549

528550
/// The set of errors that occurred during a write operation.
529-
#[derive(Clone, Debug)]
551+
#[derive(Clone, Debug, Deserialize)]
552+
#[serde(rename_all = "camelCase")]
530553
#[non_exhaustive]
531554
pub struct BulkWriteFailure {
532555
/// The error(s) that occurred on account of a non write concern failure.

0 commit comments

Comments
 (0)