Skip to content

Commit bd54750

Browse files
authored
RUST-1328 Only emit SDAM changed events when servers change (#679)
This also adds a test for the fix for RUST-1274.
1 parent 63f9e36 commit bd54750

File tree

7 files changed

+193
-41
lines changed

7 files changed

+193
-41
lines changed

src/sdam/description/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ impl PartialEq for ServerDescription {
122122

123123
self_response == other_response
124124
}
125+
(Err(self_err), Err(other_err)) => self_err == other_err,
125126
_ => false,
126127
}
127128
}

src/sdam/description/topology/test/sdam.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ async fn heartbeat_events() {
646646
.await
647647
.expect("should see server heartbeat succeeded event");
648648

649-
if !client.supports_fail_command_appname() {
649+
if !client.supports_fail_command_appname_initial_handshake() {
650650
return;
651651
}
652652

src/sdam/test.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,10 @@ async fn min_heartbeat_frequency() {
4343

4444
let setup_client = TestClient::with_options(Some(setup_client_options.clone())).await;
4545

46-
if !setup_client.supports_fail_command() {
46+
if !setup_client.supports_fail_command_appname_initial_handshake() {
4747
log_uncaptured(
48-
"skipping min_heartbeat_frequency test due to server not supporting fail points",
49-
);
50-
return;
51-
}
52-
53-
if setup_client.server_version_lt(4, 9) {
54-
log_uncaptured(
55-
"skipping min_heartbeat_frequency test due to server version being less than 4.9",
48+
"skipping min_heartbeat_frequency test due to server not supporting failcommand \
49+
appname",
5650
);
5751
return;
5852
}

src/sdam/topology.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -477,23 +477,25 @@ impl TopologyWorker {
477477
let old_description = latest_state.description.clone();
478478

479479
if let Some(expected_name) = &self.options.repl_set_name {
480-
let got_name = sd.set_name();
481-
if latest_state.description.topology_type() == TopologyType::Single
482-
&& got_name.as_ref().map(|opt| opt.as_ref()) != Ok(Some(expected_name))
483-
{
484-
let got_display = match got_name {
485-
Ok(Some(s)) => format!("{:?}", s),
486-
Ok(None) => "<none>".to_string(),
487-
Err(s) => format!("<error: {}>", s),
488-
};
489-
// Mark server as unknown.
490-
sd = ServerDescription::new(
491-
sd.address,
492-
Some(Err(format!(
493-
"Connection string replicaSet name {:?} does not match actual name {}",
494-
expected_name, got_display,
495-
))),
496-
);
480+
if sd.is_available() {
481+
let got_name = sd.set_name();
482+
if latest_state.description.topology_type() == TopologyType::Single
483+
&& got_name.as_ref().map(|opt| opt.as_ref()) != Ok(Some(expected_name))
484+
{
485+
let got_display = match got_name {
486+
Ok(Some(s)) => format!("{:?}", s),
487+
Ok(None) => "<none>".to_string(),
488+
Err(s) => format!("<error: {}>", s),
489+
};
490+
// Mark server as unknown.
491+
sd = ServerDescription::new(
492+
sd.address,
493+
Some(Err(format!(
494+
"Connection string replicaSet name {:?} does not match actual name {}",
495+
expected_name, got_display,
496+
))),
497+
);
498+
}
497499
}
498500
}
499501

@@ -513,18 +515,18 @@ impl TopologyWorker {
513515
let topology_changed =
514516
self.process_topology_diff(&old_description, &latest_state.description);
515517

516-
if topology_changed {
517-
if server_type.is_data_bearing()
518+
if topology_changed
519+
&& (server_type.is_data_bearing()
518520
|| (server_type != ServerType::Unknown
519-
&& latest_state.description.topology_type() == TopologyType::Single)
520-
{
521-
if let Some(s) = latest_state.servers.get(&server_address) {
522-
s.pool.mark_as_ready().await;
523-
}
521+
&& latest_state.description.topology_type() == TopologyType::Single))
522+
{
523+
if let Some(s) = latest_state.servers.get(&server_address) {
524+
s.pool.mark_as_ready().await;
524525
}
525-
self.publisher.publish_new_state(latest_state)
526526
}
527527

528+
self.publisher.publish_new_state(latest_state);
529+
528530
topology_changed
529531
}
530532

src/test/client.rs

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{borrow::Cow, collections::HashMap, time::Duration};
1+
use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};
22

33
use bson::Document;
44
use serde::Deserialize;
@@ -7,11 +7,25 @@ use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
77
use crate::{
88
bson::{doc, Bson},
99
error::{CommandError, Error, ErrorKind},
10+
hello::LEGACY_HELLO_COMMAND_NAME,
1011
options::{AuthMechanism, ClientOptions, Credential, ListDatabasesOptions, ServerAddress},
1112
runtime,
1213
selection_criteria::{ReadPreference, ReadPreferenceOptions, SelectionCriteria},
13-
test::{log_uncaptured, util::TestClient, CLIENT_OPTIONS, LOCK},
14+
test::{
15+
log_uncaptured,
16+
util::TestClient,
17+
CmapEvent,
18+
Event,
19+
EventHandler,
20+
FailCommandOptions,
21+
FailPoint,
22+
FailPointMode,
23+
SdamEvent,
24+
CLIENT_OPTIONS,
25+
LOCK,
26+
},
1427
Client,
28+
ServerType,
1529
};
1630

1731
#[derive(Debug, Deserialize)]
@@ -691,3 +705,134 @@ async fn plain_auth() {
691705
}
692706
);
693707
}
708+
709+
/// Test verifies that retrying a commitTransaction operation after a checkOut
710+
/// failure works.
711+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
712+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
713+
async fn retry_commit_txn_check_out() {
714+
let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await;
715+
716+
let setup_client = TestClient::new().await;
717+
if !setup_client.is_replica_set() {
718+
log_uncaptured("skipping retry_commit_txn_check_out due to non-replicaset topology");
719+
return;
720+
}
721+
722+
if !setup_client.supports_transactions() {
723+
log_uncaptured("skipping retry_commit_txn_check_out due to lack of transaction support");
724+
return;
725+
}
726+
727+
if !setup_client.supports_fail_command_appname_initial_handshake() {
728+
log_uncaptured(
729+
"skipping retry_commit_txn_check_out due to insufficient failCommand support",
730+
);
731+
return;
732+
}
733+
734+
// ensure namespace exists
735+
setup_client
736+
.database("retry_commit_txn_check_out")
737+
.collection("retry_commit_txn_check_out")
738+
.insert_one(doc! {}, None)
739+
.await
740+
.unwrap();
741+
742+
let mut options = CLIENT_OPTIONS.get().await.clone();
743+
let handler = Arc::new(EventHandler::new());
744+
options.cmap_event_handler = Some(handler.clone());
745+
options.sdam_event_handler = Some(handler.clone());
746+
options.heartbeat_freq = Some(Duration::from_secs(120));
747+
options.app_name = Some("retry_commit_txn_check_out".to_string());
748+
let client = Client::with_options(options).unwrap();
749+
750+
let mut session = client.start_session(None).await.unwrap();
751+
session.start_transaction(None).await.unwrap();
752+
// transition transaction to "in progress" so that the commit
753+
// actually executes an operation.
754+
client
755+
.database("retry_commit_txn_check_out")
756+
.collection("retry_commit_txn_check_out")
757+
.insert_one_with_session(doc! {}, None, &mut session)
758+
.await
759+
.unwrap();
760+
761+
// enable a fail point that clears the connection pools so that
762+
// commitTransaction will create a new connection during check out.
763+
let fp = FailPoint::fail_command(
764+
&["ping"],
765+
FailPointMode::Times(1),
766+
FailCommandOptions::builder().error_code(11600).build(),
767+
);
768+
let _guard = setup_client.enable_failpoint(fp, None).await.unwrap();
769+
770+
let mut subscriber = handler.subscribe();
771+
client
772+
.database("foo")
773+
.run_command(doc! { "ping": 1 }, None)
774+
.await
775+
.unwrap_err();
776+
777+
// failing with a state change error will request an immediate check
778+
// wait for the mark unknown and subsequent succeeded heartbeat
779+
let mut primary = None;
780+
subscriber
781+
.wait_for_event(Duration::from_secs(1), |e| {
782+
if let Event::Sdam(SdamEvent::ServerDescriptionChanged(event)) = e {
783+
if event.is_marked_unknown_event() {
784+
primary = Some(event.address.clone());
785+
return true;
786+
}
787+
}
788+
false
789+
})
790+
.await
791+
.expect("should see marked unknown event");
792+
793+
subscriber
794+
.wait_for_event(Duration::from_secs(1), |e| {
795+
if let Event::Sdam(SdamEvent::ServerDescriptionChanged(event)) = e {
796+
if &event.address == primary.as_ref().unwrap()
797+
&& event.previous_description.server_type() == ServerType::Unknown
798+
{
799+
return true;
800+
}
801+
}
802+
false
803+
})
804+
.await
805+
.expect("should see mark available event");
806+
807+
// enable a failpoint on the handshake to cause check_out
808+
// to fail with a retryable error
809+
let fp = FailPoint::fail_command(
810+
&[LEGACY_HELLO_COMMAND_NAME, "hello"],
811+
FailPointMode::Times(1),
812+
FailCommandOptions::builder()
813+
.error_code(11600)
814+
.app_name("retry_commit_txn_check_out".to_string())
815+
.build(),
816+
);
817+
let _guard2 = setup_client.enable_failpoint(fp, None).await.unwrap();
818+
819+
// finally, attempt the commit.
820+
// this should succeed due to retry
821+
session.commit_transaction().await.unwrap();
822+
823+
// ensure the first check out attempt fails
824+
subscriber
825+
.wait_for_event(Duration::from_secs(1), |e| {
826+
matches!(e, Event::Cmap(CmapEvent::ConnectionCheckOutFailed(_)))
827+
})
828+
.await
829+
.expect("should see check out failed event");
830+
831+
// ensure the second one succeeds
832+
subscriber
833+
.wait_for_event(Duration::from_secs(1), |e| {
834+
matches!(e, Event::Cmap(CmapEvent::ConnectionCheckedOut(_)))
835+
})
836+
.await
837+
.expect("should see checked out event");
838+
}

src/test/util/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ pub struct EventSubscriber<'a> {
394394
receiver: tokio::sync::broadcast::Receiver<Event>,
395395
}
396396

397-
impl<'a> EventSubscriber<'a> {
397+
impl EventSubscriber<'_> {
398398
pub async fn wait_for_event<F>(&mut self, timeout: Duration, mut filter: F) -> Option<Event>
399399
where
400400
F: FnMut(&Event) -> bool,

src/test/util/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,19 @@ impl TestClient {
244244
version.matches(&self.server_version)
245245
}
246246

247-
pub fn supports_fail_command_appname(&self) -> bool {
248-
let version = VersionReq::parse(">= 4.2.9").unwrap();
249-
version.matches(&self.server_version)
247+
/// Whether the deployment supports failing the initial handshake
248+
/// only when it uses a specified appName.
249+
///
250+
/// See SERVER-49336 for more info.
251+
pub fn supports_fail_command_appname_initial_handshake(&self) -> bool {
252+
let requirements = [
253+
VersionReq::parse(">= 4.2.15, < 4.3.0").unwrap(),
254+
VersionReq::parse(">= 4.4.7, < 4.5.0").unwrap(),
255+
VersionReq::parse(">= 4.9.0").unwrap(),
256+
];
257+
requirements
258+
.iter()
259+
.any(|req| req.matches(&self.server_version))
250260
}
251261

252262
pub fn supports_transactions(&self) -> bool {

0 commit comments

Comments
 (0)