Skip to content

Commit 02a8b13

Browse files
committed
plumbing
1 parent 6271807 commit 02a8b13

File tree

8 files changed

+49
-41
lines changed

8 files changed

+49
-41
lines changed

src/client.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ impl Client {
446446
&self,
447447
criteria: Option<&SelectionCriteria>,
448448
) -> Result<ServerAddress> {
449-
let server = self
449+
let (server, _) = self
450450
.select_server(criteria, "Test select server", None)
451451
.await?;
452452
Ok(server.address.clone())
@@ -460,7 +460,7 @@ impl Client {
460460
#[allow(unused_variables)] // we only use the operation_name for tracing.
461461
operation_name: &str,
462462
deprioritized: Option<&ServerAddress>,
463-
) -> Result<SelectedServer> {
463+
) -> Result<(SelectedServer, SelectionCriteria)> {
464464
let criteria =
465465
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
466466

@@ -488,9 +488,12 @@ impl Client {
488488
let mut watcher = self.inner.topology.watch();
489489
loop {
490490
let state = watcher.observe_latest();
491-
491+
for server in state.description.servers.values() {
492+
eprintln!("at selection: {:?}", server.hello_response());
493+
}
494+
let effective_criteria = criteria; // TODO
492495
let result = server_selection::attempt_to_select_server(
493-
criteria,
496+
effective_criteria,
494497
&state.description,
495498
&state.servers(),
496499
deprioritized,
@@ -507,7 +510,7 @@ impl Client {
507510
#[cfg(feature = "tracing-unstable")]
508511
event_emitter.emit_succeeded_event(&state.description, &server);
509512

510-
return Ok(server);
513+
return Ok((server, effective_criteria.clone()));
511514
} else {
512515
#[cfg(feature = "tracing-unstable")]
513516
if !emitted_waiting_message {

src/client/executor.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
wire::{next_request_id, Message},
3030
PinnedConnectionHandle,
3131
},
32+
Command,
3233
ConnectionPool,
3334
RawCommandResponse,
3435
StreamDescription,
@@ -59,7 +60,7 @@ use crate::{
5960
Retryability,
6061
},
6162
options::{ChangeStreamOptions, SelectionCriteria},
62-
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
63+
sdam::{HandshakePhase, ServerType, TopologyType, TransactionSupportStatus},
6364
selection_criteria::ReadPreference,
6465
tracking_arc::TrackingArc,
6566
ClusterTime,
@@ -309,7 +310,7 @@ impl Client {
309310
let mut retry: Option<ExecutionRetry> = None;
310311
let mut implicit_session: Option<ClientSession> = None;
311312
loop {
312-
op.update_for_topology(&self.inner.topology.description());
313+
//op.update_for_topology(&self.inner.topology.description());
313314

314315
if retry.is_some() {
315316
op.update_for_retry();
@@ -320,15 +321,15 @@ impl Client {
320321
.and_then(|s| s.transaction.pinned_mongos())
321322
.or_else(|| op.selection_criteria());
322323

323-
let server = match self
324+
let (server, effective_criteria) = match self
324325
.select_server(
325326
selection_criteria,
326327
op.name(),
327328
retry.as_ref().map(|r| &r.first_server),
328329
)
329330
.await
330331
{
331-
Ok(server) => server,
332+
Ok(out) => out,
332333
Err(mut err) => {
333334
retry.first_error()?;
334335

@@ -394,13 +395,7 @@ impl Client {
394395
};
395396

396397
let details = match self
397-
.execute_operation_on_connection(
398-
op,
399-
&mut conn,
400-
&mut session,
401-
txn_number,
402-
retryability,
403-
)
398+
.execute_command_on_connection(cmd, op, &mut conn, &mut session, retryability)
404399
.await
405400
{
406401
Ok(output) => ExecutionDetails {
@@ -866,7 +861,7 @@ impl Client {
866861
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
867862
|| server_type.is_data_bearing()
868863
}));
869-
let _: SelectedServer = self
864+
let _ = self
870865
.select_server(Some(&criteria), operation_name, None)
871866
.await?;
872867
Ok(())

src/operation.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use crate::{
5252
WriteFailure,
5353
},
5454
options::WriteConcern,
55-
sdam::TopologyDescription,
5655
selection_criteria::SelectionCriteria,
5756
BoxFuture,
5857
ClientSession,
@@ -77,6 +76,7 @@ pub(crate) use update::{Update, UpdateOrReplace};
7776

7877
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
7978
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79+
const _SERVER_5_0_0_WIRE_VERSION: i32 = 13;
8080
const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
8181
// The maximum number of bytes that may be included in a write payload when auto-encryption is
8282
// enabled.
@@ -149,9 +149,6 @@ pub(crate) trait Operation {
149149
/// Updates this operation as needed for a retry.
150150
fn update_for_retry(&mut self);
151151

152-
/// Updates this operation based on server topology.
153-
fn update_for_topology(&mut self, _topology: &TopologyDescription);
154-
155152
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
156153

157154
fn name(&self) -> &str;
@@ -239,9 +236,6 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
239236
/// Updates this operation as needed for a retry.
240237
fn update_for_retry(&mut self) {}
241238

242-
/// Updates this operation based on server topology.
243-
fn update_for_topology(&mut self, _topology: &TopologyDescription) {}
244-
245239
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
246240
None
247241
}
@@ -294,9 +288,6 @@ where
294288
fn update_for_retry(&mut self) {
295289
self.update_for_retry()
296290
}
297-
fn update_for_topology(&mut self, topology: &TopologyDescription) {
298-
self.update_for_topology(topology)
299-
}
300291
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
301292
self.pinned_connection()
302293
}

src/operation/aggregate.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cursor::CursorSpecification,
88
error::Result,
99
operation::{append_options, remove_empty_write_concern, Retryability},
10-
options::{AggregateOptions, ReadPreference, SelectionCriteria, WriteConcern},
10+
options::{AggregateOptions, SelectionCriteria, WriteConcern},
1111
Namespace,
1212
};
1313

@@ -109,13 +109,32 @@ impl OperationWithDefaults for Aggregate {
109109
))
110110
}
111111

112-
fn update_for_topology(&mut self, _topology: &crate::sdam::TopologyDescription) {
113-
match self.selection_criteria() {
114-
None => return,
115-
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)) => return,
116-
_ => (),
112+
/*
113+
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
114+
eprintln!("aggregate: update_for_topology");
115+
if !self.is_out_or_merge()
116+
|| matches!(
117+
self.selection_criteria(),
118+
None | Some(SelectionCriteria::ReadPreference(ReadPreference::Primary))
119+
)
120+
|| topology.topology_type() == TopologyType::LoadBalanced
121+
{
122+
eprintln!("aggregate: skipping topology update");
123+
return;
124+
}
125+
for server in topology.servers.values() {
126+
let _ = dbg!(server.hello_response());
127+
if let Ok(Some(v)) = server.max_wire_version() {
128+
if v < SERVER_5_0_0_WIRE_VERSION {
129+
eprintln!("aggregate: updating topology");
130+
self.options.get_or_insert_default().selection_criteria =
131+
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary));
132+
break;
133+
}
134+
}
117135
}
118136
}
137+
*/
119138

120139
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
121140
self.options

src/operation/raw_output.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ impl<Op: Operation> Operation for RawOutput<Op> {
6868
self.0.update_for_retry()
6969
}
7070

71-
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
72-
self.0.update_for_topology(topology)
73-
}
74-
7571
fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> {
7672
self.0.pinned_connection()
7773
}

src/operation/run_cursor_command.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ impl Operation for RunCursorCommand<'_> {
7979
self.run_command.update_for_retry()
8080
}
8181

82-
fn update_for_topology(&mut self, topology: &crate::sdam::TopologyDescription) {
83-
self.run_command.update_for_topology(topology)
84-
}
85-
8682
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
8783
self.run_command.pinned_connection()
8884
}

src/sdam/description/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,13 @@ impl ServerDescription {
386386
Ok(me)
387387
}
388388

389+
pub(crate) fn hello_response(&self) -> Result<Option<&HelloCommandResponse>> {
390+
self.reply
391+
.as_ref()
392+
.map_err(Clone::clone)
393+
.map(|o| o.as_ref().map(|r| &r.command_response))
394+
}
395+
389396
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
390397
match self.reply {
391398
Ok(None) => Ok(None),

src/sdam/topology.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ impl Topology {
229229
self.watcher.peek_latest().servers()
230230
}
231231

232+
#[cfg(test)]
232233
pub(crate) fn description(&self) -> TopologyDescription {
233234
self.watcher.peek_latest().description.clone()
234235
}

0 commit comments

Comments
 (0)