Skip to content

Commit 0de2f2b

Browse files
authored
RUST-952 Emit SDAM events for load balanced topologies (#470)
1 parent 9c5eede commit 0de2f2b

File tree

3 files changed

+27
-29
lines changed

3 files changed

+27
-29
lines changed

src/sdam/description/server.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,6 @@ impl ServerDescription {
177177
description
178178
}
179179

180-
pub(crate) fn new_load_balancer(mut address: ServerAddress) -> Self {
181-
address = ServerAddress::Tcp {
182-
host: address.host().to_lowercase(),
183-
port: address.port(),
184-
};
185-
Self {
186-
address,
187-
server_type: ServerType::LoadBalancer,
188-
last_update_time: None,
189-
reply: Ok(None),
190-
average_round_trip_time: None,
191-
}
192-
}
193-
194180
/// Whether this server is "available" as per the definition in the server selection spec.
195181
pub(crate) fn is_available(&self) -> bool {
196182
self.server_type.is_available()

src/sdam/description/topology/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,7 @@ impl TopologyDescription {
140140
let servers: HashMap<_, _> = options
141141
.hosts
142142
.into_iter()
143-
.map(|address| {
144-
let description = if topology_type == TopologyType::LoadBalanced {
145-
ServerDescription::new_load_balancer(address.clone())
146-
} else {
147-
ServerDescription::new(address.clone(), None)
148-
};
149-
150-
(address, description)
151-
})
143+
.map(|address| (address.clone(), ServerDescription::new(address, None)))
152144
.collect();
153145

154146
let session_support_status = if topology_type == TopologyType::LoadBalanced {

src/sdam/state/mod.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,6 @@ impl Topology {
204204
topology_state.add_new_server(address, options.clone(), &topology.downgrade());
205205
}
206206

207-
// When the client is in load balanced mode, it doesn't poll for changes in the SRV record.
208-
if !is_load_balanced {
209-
SrvPollingMonitor::start(topology.downgrade());
210-
}
211-
212207
if let Some(ref handler) = options.sdam_event_handler {
213208
let event = TopologyDescriptionChangedEvent {
214209
topology_id: id,
@@ -226,7 +221,23 @@ impl Topology {
226221
}
227222
}
228223

229-
SrvPollingMonitor::start(topology.downgrade());
224+
if is_load_balanced {
225+
for server_address in &options.hosts {
226+
// Load-balanced clients don't have a heartbeat monitor, so we synthesize
227+
// updating the server to `ServerType::LoadBalancer`.
228+
let new_desc = ServerDescription {
229+
server_type: ServerType::LoadBalancer,
230+
..ServerDescription::new(server_address.clone(), None)
231+
};
232+
topology_state
233+
.update(new_desc, &options, topology.downgrade())
234+
.map_err(Error::internal)?;
235+
}
236+
} else {
237+
// When the client is in load balanced mode, it doesn't poll for changes in the SRV
238+
// record.
239+
SrvPollingMonitor::start(topology.downgrade());
240+
}
230241

231242
drop(topology_state);
232243
Ok(topology)
@@ -235,6 +246,15 @@ impl Topology {
235246
pub(crate) fn close(&self) {
236247
self.common.is_alive.store(false, Ordering::SeqCst);
237248
if let Some(ref handler) = self.common.options.sdam_event_handler {
249+
if self.common.options.load_balanced.unwrap_or(false) {
250+
for host in &self.common.options.hosts {
251+
let event = ServerClosedEvent {
252+
address: host.clone(),
253+
topology_id: self.common.id,
254+
};
255+
handler.handle_server_closed_event(event);
256+
}
257+
}
238258
let event = TopologyClosedEvent {
239259
topology_id: self.common.id,
240260
};

0 commit comments

Comments
 (0)