Skip to content

Commit 6c1f687

Browse files
authored
RUST-1047 Ensure TopologyClosedEvent is the last SDAM event emitted (#485)
1 parent 4d78548 commit 6c1f687

File tree

7 files changed

+44
-34
lines changed

7 files changed

+44
-34
lines changed

.evergreen/check-rustfmt.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ set -o errexit
44

55
. ~/.cargo/env
66
rustfmt +nightly --unstable-features --check src/**/*.rs
7+
rustfmt +nightly --unstable-features --check src/*.rs

rustfmt.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ use_try_shorthand = true
88
wrap_comments = true
99
imports_layout = "HorizontalVertical"
1010
imports_granularity = "Crate"
11+
ignore = ["src/lib.rs"]

src/cmap/establish/handshake/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
event::sdam::SdamEventHandler,
1616
is_master::{is_master_command, run_is_master, IsMasterReply},
1717
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
18-
sdam::WeakTopology,
18+
sdam::Topology,
1919
};
2020

2121
#[cfg(feature = "tokio-runtime")]
@@ -227,7 +227,7 @@ impl Handshaker {
227227
pub(crate) async fn handshake(
228228
&self,
229229
conn: &mut Connection,
230-
topology: Option<&WeakTopology>,
230+
topology: Option<&Topology>,
231231
handler: &Option<Arc<dyn SdamEventHandler>>,
232232
) -> Result<HandshakeResult> {
233233
let mut command = self.command.clone();

src/is_master.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
ServerHeartbeatStartedEvent,
2020
ServerHeartbeatSucceededEvent,
2121
},
22-
sdam::{ServerType, WeakTopology},
22+
sdam::{ServerType, Topology},
2323
selection_criteria::TagSet,
2424
};
2525

@@ -37,10 +37,15 @@ pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command {
3737
command
3838
}
3939

40+
/// Execute an isMaster command, emiting events if a reference to the topology and a handler are
41+
/// provided.
42+
///
43+
/// A strong reference to the topology is used here to ensure it is still in scope and has not yet
44+
/// emitted a `TopologyClosedEvent`.
4045
pub(crate) async fn run_is_master(
4146
conn: &mut Connection,
4247
command: Command,
43-
topology: Option<&WeakTopology>,
48+
topology: Option<&Topology>,
4449
handler: &Option<Arc<dyn SdamEventHandler>>,
4550
) -> Result<IsMasterReply> {
4651
emit_event(topology, handler, |handler| {
@@ -91,18 +96,13 @@ pub(crate) async fn run_is_master(
9196
}
9297
}
9398

94-
fn emit_event<F>(
95-
topology: Option<&WeakTopology>,
96-
handler: &Option<Arc<dyn SdamEventHandler>>,
97-
emit: F,
98-
) where
99+
fn emit_event<F>(topology: Option<&Topology>, handler: &Option<Arc<dyn SdamEventHandler>>, emit: F)
100+
where
99101
F: FnOnce(&Arc<dyn SdamEventHandler>),
100102
{
101103
if let Some(handler) = handler {
102-
if let Some(topology) = topology {
103-
if topology.is_alive() {
104-
emit(handler);
105-
}
104+
if topology.is_some() {
105+
emit(handler);
106106
}
107107
}
108108
}

src/sdam/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,5 @@ pub(crate) use self::{
2626
server::{Server, ServerUpdate, ServerUpdateReceiver, ServerUpdateSender},
2727
HandshakePhase,
2828
Topology,
29-
WeakTopology,
3029
},
3130
};

src/sdam/monitor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl HeartbeatMonitor {
160160
/// Returns true if the topology has changed and false otherwise.
161161
async fn check_server(&mut self, topology: &Topology, server: &Server) -> bool {
162162
let mut retried = false;
163-
let check_result = match self.perform_is_master().await {
163+
let check_result = match self.perform_is_master(topology).await {
164164
Ok(reply) => Ok(reply),
165165
Err(e) => {
166166
let previous_description = topology.get_server_description(&server.address).await;
@@ -171,7 +171,7 @@ impl HeartbeatMonitor {
171171
{
172172
self.handle_error(e, topology, server).await;
173173
retried = true;
174-
self.perform_is_master().await
174+
self.perform_is_master(topology).await
175175
} else {
176176
Err(e)
177177
}
@@ -188,14 +188,14 @@ impl HeartbeatMonitor {
188188
}
189189
}
190190

191-
async fn perform_is_master(&mut self) -> Result<IsMasterReply> {
191+
async fn perform_is_master(&mut self, topology: &Topology) -> Result<IsMasterReply> {
192192
let result = match self.connection {
193193
Some(ref mut conn) => {
194194
let command = is_master_command(self.client_options.server_api.as_ref());
195195
run_is_master(
196196
conn,
197197
command,
198-
Some(&self.topology),
198+
Some(topology),
199199
&self.client_options.sdam_event_handler,
200200
)
201201
.await
@@ -212,7 +212,7 @@ impl HeartbeatMonitor {
212212
.handshaker
213213
.handshake(
214214
&mut connection,
215-
Some(&self.topology),
215+
Some(topology),
216216
&self.client_options.sdam_event_handler,
217217
)
218218
.await

src/sdam/state/mod.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ struct TopologyState {
8181
http_client: HttpClient,
8282
description: TopologyDescription,
8383
servers: HashMap<ServerAddress, Arc<Server>>,
84+
options: ClientOptions,
85+
id: ObjectId,
8486
}
8587

8688
impl Topology {
@@ -108,6 +110,8 @@ impl Topology {
108110
description,
109111
servers: Default::default(),
110112
http_client,
113+
options: options.clone(),
114+
id,
111115
};
112116

113117
let state = Arc::new(RwLock::new(topology_state));
@@ -169,21 +173,6 @@ impl Topology {
169173

170174
pub(crate) fn close(&self) {
171175
self.common.is_alive.store(false, Ordering::SeqCst);
172-
if let Some(ref handler) = self.common.options.sdam_event_handler {
173-
if self.common.options.load_balanced.unwrap_or(false) {
174-
for host in &self.common.options.hosts {
175-
let event = ServerClosedEvent {
176-
address: host.clone(),
177-
topology_id: self.common.id,
178-
};
179-
handler.handle_server_closed_event(event);
180-
}
181-
}
182-
let event = TopologyClosedEvent {
183-
topology_id: self.common.id,
184-
};
185-
handler.handle_topology_closed_event(event);
186-
}
187176
}
188177

189178
/// Gets the addresses of the servers in the cluster.
@@ -485,6 +474,26 @@ impl Topology {
485474
}
486475
}
487476

477+
impl Drop for TopologyState {
478+
fn drop(&mut self) {
479+
if let Some(ref handler) = self.options.sdam_event_handler {
480+
if matches!(self.description.topology_type, TopologyType::LoadBalanced) {
481+
for host in self.servers.keys() {
482+
let event = ServerClosedEvent {
483+
address: host.clone(),
484+
topology_id: self.id,
485+
};
486+
handler.handle_server_closed_event(event);
487+
}
488+
}
489+
let event = TopologyClosedEvent {
490+
topology_id: self.id,
491+
};
492+
handler.handle_topology_closed_event(event);
493+
}
494+
}
495+
}
496+
488497
impl WeakTopology {
489498
/// Attempts to convert the WeakTopology to a strong reference.
490499
pub(crate) fn upgrade(&self) -> Option<Topology> {

0 commit comments

Comments
 (0)