Skip to content

Commit 9ee4f6a

Browse files
authored
RUST-935 direct retries to different mongos (#968)
1 parent ed2c8d9 commit 9ee4f6a

File tree

6 files changed

+285
-15
lines changed

6 files changed

+285
-15
lines changed

src/client.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ use derivative::Derivative;
1919
use futures_core::{future::BoxFuture, Future};
2020
use futures_util::{future::join_all, FutureExt};
2121

22-
#[cfg(test)]
23-
use crate::options::ServerAddress;
2422
#[cfg(feature = "tracing-unstable")]
2523
use crate::trace::{
2624
command::CommandTracingEventEmitter,
@@ -49,6 +47,7 @@ use crate::{
4947
ListDatabasesOptions,
5048
ReadPreference,
5149
SelectionCriteria,
50+
ServerAddress,
5251
SessionOptions,
5352
},
5453
results::DatabaseSpecification,
@@ -654,17 +653,20 @@ impl Client {
654653
&self,
655654
criteria: Option<&SelectionCriteria>,
656655
) -> Result<ServerAddress> {
657-
let server = self.select_server(criteria, "Test select server").await?;
656+
let server = self
657+
.select_server(criteria, "Test select server", None)
658+
.await?;
658659
Ok(server.address.clone())
659660
}
660661

661662
/// Select a server using the provided criteria. If none is provided, a primary read preference
662663
/// will be used instead.
663-
#[allow(unused_variables)] // we only use the operation_name for tracing.
664664
async fn select_server(
665665
&self,
666666
criteria: Option<&SelectionCriteria>,
667+
#[allow(unused_variables)] // we only use the operation_name for tracing.
667668
operation_name: &str,
669+
deprioritized: Option<&ServerAddress>,
668670
) -> Result<SelectedServer> {
669671
let criteria =
670672
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
@@ -698,6 +700,7 @@ impl Client {
698700
criteria,
699701
&state.description,
700702
&state.servers(),
703+
deprioritized,
701704
);
702705
match result {
703706
Err(error) => {

src/client/executor.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
time::Instant,
1313
};
1414

15-
use super::{session::TransactionState, Client, ClientSession};
15+
use super::{options::ServerAddress, session::TransactionState, Client, ClientSession};
1616
use crate::{
1717
bson::Document,
1818
change_stream::{
@@ -313,7 +313,14 @@ impl Client {
313313
.and_then(|s| s.transaction.pinned_mongos())
314314
.or_else(|| op.selection_criteria());
315315

316-
let server = match self.select_server(selection_criteria, op.name()).await {
316+
let server = match self
317+
.select_server(
318+
selection_criteria,
319+
op.name(),
320+
retry.as_ref().map(|r| &r.first_server),
321+
)
322+
.await
323+
{
317324
Ok(server) => server,
318325
Err(mut err) => {
319326
retry.first_error()?;
@@ -322,6 +329,7 @@ impl Client {
322329
return Err(err);
323330
}
324331
};
332+
let server_addr = server.address.clone();
325333

326334
let mut conn = match get_connection(&session, &op, &server.pool).await {
327335
Ok(c) => c,
@@ -342,6 +350,7 @@ impl Client {
342350
retry = Some(ExecutionRetry {
343351
prior_txn_number: None,
344352
first_error: err,
353+
first_server: server_addr.clone(),
345354
});
346355
continue;
347356
} else {
@@ -407,7 +416,7 @@ impl Client {
407416
self.inner
408417
.topology
409418
.handle_application_error(
410-
server.address.clone(),
419+
server_addr.clone(),
411420
err.clone(),
412421
HandshakePhase::after_completion(&conn),
413422
)
@@ -433,6 +442,7 @@ impl Client {
433442
retry = Some(ExecutionRetry {
434443
prior_txn_number: txn_number,
435444
first_error: err,
445+
first_server: server_addr.clone(),
436446
});
437447
continue;
438448
} else {
@@ -806,7 +816,9 @@ impl Client {
806816
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
807817
|| server_type.is_data_bearing()
808818
}));
809-
let _: SelectedServer = self.select_server(Some(&criteria), operation_name).await?;
819+
let _: SelectedServer = self
820+
.select_server(Some(&criteria), operation_name, None)
821+
.await?;
810822
Ok(())
811823
}
812824

@@ -1019,6 +1031,7 @@ struct ExecutionDetails<T: Operation> {
10191031
struct ExecutionRetry {
10201032
prior_txn_number: Option<i64>,
10211033
first_error: Error,
1034+
first_server: ServerAddress,
10221035
}
10231036

10241037
trait RetryHelper {

src/sdam/description/topology/server_selection.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ pub(crate) fn attempt_to_select_server<'a>(
6161
criteria: &'a SelectionCriteria,
6262
topology_description: &'a TopologyDescription,
6363
servers: &'a HashMap<ServerAddress, Arc<Server>>,
64+
deprioritized: Option<&ServerAddress>,
6465
) -> Result<Option<SelectedServer>> {
65-
let in_window = topology_description.suitable_servers_in_latency_window(criteria)?;
66+
let mut in_window = topology_description.suitable_servers_in_latency_window(criteria)?;
67+
if let Some(addr) = deprioritized {
68+
if in_window.len() > 1 {
69+
in_window.retain(|d| &d.address != addr);
70+
}
71+
}
6672
let in_window_servers = in_window
6773
.into_iter()
6874
.flat_map(|desc| servers.get(&desc.address))
6975
.collect();
70-
Ok(select_server_in_latency_window(in_window_servers).map(SelectedServer::new))
76+
let selected = select_server_in_latency_window(in_window_servers);
77+
Ok(selected.map(SelectedServer::new))
7178
}
7279

7380
/// Choose a server from several suitable choices within the latency window according to

src/sdam/description/topology/server_selection/test/in_window.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,14 @@ async fn run_test(test_file: TestFile) {
7777
.into();
7878

7979
for _ in 0..test_file.iterations {
80-
let selection =
81-
server_selection::attempt_to_select_server(&read_pref, &topology_description, &servers)
82-
.expect("selection should not fail")
83-
.expect("a server should have been selected");
80+
let selection = server_selection::attempt_to_select_server(
81+
&read_pref,
82+
&topology_description,
83+
&servers,
84+
None,
85+
)
86+
.expect("selection should not fail")
87+
.expect("a server should have been selected");
8488
*tallies.entry(selection.address.clone()).or_insert(0) += 1;
8589
}
8690

src/test/spec/retryable_reads.rs

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
error::Result,
77
event::{
88
cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason},
9-
command::CommandEventHandler,
9+
command::{CommandEvent, CommandEventHandler},
1010
},
1111
runtime,
1212
runtime::AsyncJoinHandle,
@@ -21,6 +21,7 @@ use crate::{
2121
TestClient,
2222
CLIENT_OPTIONS,
2323
},
24+
Client,
2425
};
2526

2627
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
@@ -159,3 +160,123 @@ async fn retry_read_pool_cleared() {
159160

160161
assert_eq!(handler.get_command_started_events(&["find"]).len(), 3);
161162
}
163+
164+
// Retryable Reads Are Retried on a Different mongos if One is Available
165+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
166+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
167+
async fn retry_read_different_mongos() {
168+
let mut client_options = CLIENT_OPTIONS.get().await.clone();
169+
if client_options.repl_set_name.is_some() || client_options.hosts.len() < 2 {
170+
log_uncaptured(
171+
"skipping retry_read_different_mongos: requires sharded cluster with at least two \
172+
hosts",
173+
);
174+
return;
175+
}
176+
client_options.hosts.drain(2..);
177+
client_options.retry_reads = Some(true);
178+
179+
let mut guards = vec![];
180+
for ix in [0, 1] {
181+
let mut opts = client_options.clone();
182+
opts.hosts.remove(ix);
183+
opts.direct_connection = Some(true);
184+
let client = Client::test_builder().options(opts).build().await;
185+
if !client.supports_fail_command() {
186+
log_uncaptured("skipping retry_read_different_mongos: requires failCommand");
187+
return;
188+
}
189+
let fail_opts = FailCommandOptions::builder()
190+
.error_code(6)
191+
.close_connection(true)
192+
.build();
193+
let fp = FailPoint::fail_command(&["find"], FailPointMode::Times(1), Some(fail_opts));
194+
guards.push(client.enable_failpoint(fp, None).await.unwrap());
195+
}
196+
197+
let client = Client::test_builder()
198+
.options(client_options)
199+
.event_client()
200+
.build()
201+
.await;
202+
let result = client
203+
.database("test")
204+
.collection::<bson::Document>("retry_read_different_mongos")
205+
.find(doc! {}, None)
206+
.await;
207+
assert!(result.is_err());
208+
let events = client.get_command_events(&["find"]);
209+
assert!(
210+
matches!(
211+
&events[..],
212+
&[
213+
CommandEvent::Started(_),
214+
CommandEvent::Failed(_),
215+
CommandEvent::Started(_),
216+
CommandEvent::Failed(_),
217+
]
218+
),
219+
"unexpected events: {:#?}",
220+
events,
221+
);
222+
223+
drop(guards); // enforce lifetime
224+
}
225+
226+
// Retryable Reads Are Retried on the Same mongos if No Others are Available
227+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
228+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
229+
async fn retry_read_same_mongos() {
230+
let init_client = Client::test_builder().build().await;
231+
if !init_client.supports_fail_command() {
232+
log_uncaptured("skipping retry_read_same_mongos: requires failCommand");
233+
return;
234+
}
235+
if !init_client.is_sharded() {
236+
log_uncaptured("skipping retry_read_same_mongos: requires sharded cluster");
237+
return;
238+
}
239+
240+
let mut client_options = CLIENT_OPTIONS.get().await.clone();
241+
client_options.hosts.drain(1..);
242+
client_options.retry_reads = Some(true);
243+
let fp_guard = {
244+
let mut client_options = client_options.clone();
245+
client_options.direct_connection = Some(true);
246+
let client = Client::test_builder().options(client_options).build().await;
247+
let fail_opts = FailCommandOptions::builder()
248+
.error_code(6)
249+
.close_connection(true)
250+
.build();
251+
let fp = FailPoint::fail_command(&["find"], FailPointMode::Times(1), Some(fail_opts));
252+
client.enable_failpoint(fp, None).await.unwrap()
253+
};
254+
255+
let client = Client::test_builder()
256+
.options(client_options)
257+
.event_client()
258+
.build()
259+
.await;
260+
let result = client
261+
.database("test")
262+
.collection::<bson::Document>("retry_read_same_mongos")
263+
.find(doc! {}, None)
264+
.await;
265+
assert!(result.is_ok(), "{:?}", result);
266+
let events = client.get_command_events(&["find"]);
267+
assert!(
268+
matches!(
269+
&events[..],
270+
&[
271+
CommandEvent::Started(_),
272+
CommandEvent::Failed(_),
273+
CommandEvent::Started(_),
274+
CommandEvent::Succeeded(_),
275+
]
276+
),
277+
"unexpected events: {:#?}",
278+
events,
279+
);
280+
281+
drop(fp_guard); // enforce lifetime
282+
}

0 commit comments

Comments
 (0)