Skip to content

Commit 2f0e1fb

Browse files
committed
split topology update from retry
1 parent b25aa09 commit 2f0e1fb

File tree

6 files changed

+29
-2
lines changed

6 files changed

+29
-2
lines changed

src/client/executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ impl Client {
309309
let mut retry: Option<ExecutionRetry> = None;
310310
let mut implicit_session: Option<ClientSession> = None;
311311
loop {
312+
op.update_for_topology(&self.inner.topology.description());
313+
312314
if retry.is_some() {
313315
op.update_for_retry();
314316
}

src/operation.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use crate::{
5252
WriteFailure,
5353
},
5454
options::WriteConcern,
55+
sdam::TopologyDescription,
5556
selection_criteria::SelectionCriteria,
5657
BoxFuture,
5758
ClientSession,
@@ -148,6 +149,9 @@ pub(crate) trait Operation {
148149
/// Updates this operation as needed for a retry.
149150
fn update_for_retry(&mut self);
150151

152+
/// Updates this operation based on server topology.
153+
fn update_for_topology(&mut self, _topology: &TopologyDescription);
154+
151155
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
152156

153157
fn name(&self) -> &str;
@@ -235,6 +239,9 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
235239
/// Updates this operation as needed for a retry.
236240
fn update_for_retry(&mut self) {}
237241

242+
/// Updates this operation based on server topology.
243+
fn update_for_topology(&mut self, _topology: &TopologyDescription) {}
244+
238245
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
239246
None
240247
}
@@ -287,6 +294,9 @@ where
287294
fn update_for_retry(&mut self) {
288295
self.update_for_retry()
289296
}
297+
fn update_for_topology(&mut self, topology: &TopologyDescription) {
298+
self.update_for_topology(topology)
299+
}
290300
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
291301
self.pinned_connection()
292302
}

src/operation/aggregate.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cursor::CursorSpecification,
88
error::Result,
99
operation::{append_options, remove_empty_write_concern, Retryability},
10-
options::{AggregateOptions, SelectionCriteria, WriteConcern},
10+
options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern},
1111
Namespace,
1212
};
1313

@@ -109,6 +109,14 @@ impl OperationWithDefaults for Aggregate {
109109
))
110110
}
111111

112+
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
113+
match self.selection_criteria() {
114+
None => return,
115+
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) => return,
116+
_ => (),
117+
}
118+
}
119+
112120
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
113121
self.options
114122
.as_ref()

src/operation/raw_output.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ impl<Op: Operation> Operation for RawOutput<Op> {
6868
self.0.update_for_retry()
6969
}
7070

71+
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
72+
self.0.update_for_topology(topology)
73+
}
74+
7175
fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> {
7276
self.0.pinned_connection()
7377
}

src/operation/run_cursor_command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl Operation for RunCursorCommand<'_> {
7979
self.run_command.update_for_retry()
8080
}
8181

82+
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
83+
self.run_command.update_for_topology(topology)
84+
}
85+
8286
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
8387
self.run_command.pinned_connection()
8488
}

src/sdam/topology.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ impl Topology {
229229
self.watcher.peek_latest().servers()
230230
}
231231

232-
#[cfg(test)]
233232
pub(crate) fn description(&self) -> TopologyDescription {
234233
self.watcher.peek_latest().description.clone()
235234
}

0 commit comments

Comments
 (0)