Skip to content

Commit b724848

Browse files
RUST-2203 Allow client metadata to be appended post-construction (#1471)
Co-authored-by: Isabel Atkinson <[email protected]>
1 parent 22fdb01 commit b724848

File tree

23 files changed

+1395
-327
lines changed

23 files changed

+1395
-327
lines changed

src/client.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,14 @@ use crate::{
3636
event::command::CommandEvent,
3737
id_set::IdSet,
3838
operation::OverrideCriteriaFn,
39-
options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
39+
options::{
40+
ClientOptions,
41+
DatabaseOptions,
42+
DriverInfo,
43+
ReadPreference,
44+
SelectionCriteria,
45+
ServerAddress,
46+
},
4047
sdam::{
4148
server_selection::{self, attempt_to_select_server},
4249
SelectedServer,
@@ -380,6 +387,16 @@ impl Client {
380387
.map(|db_name| self.database(db_name))
381388
}
382389

390+
/// Append new information to the metadata of the handshake with the server.
391+
pub fn append_metadata(&self, driver_info: DriverInfo) -> Result<()> {
392+
self.inner
393+
.topology
394+
.metadata
395+
.write()
396+
.unwrap()
397+
.append(driver_info)
398+
}
399+
383400
pub(crate) fn register_async_drop(&self) -> AsyncDropToken {
384401
let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
385402
let (id_tx, id_rx) = tokio::sync::oneshot::channel::<crate::id_set::Id>();
@@ -418,7 +435,7 @@ impl Client {
418435
/// Check in a server session to the server session pool. The session will be discarded if it is
419436
/// expired or dirty.
420437
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
421-
let timeout = self.inner.topology.logical_session_timeout();
438+
let timeout = self.inner.topology.watcher().logical_session_timeout();
422439
self.inner.session_pool.check_in(session, timeout).await;
423440
}
424441

@@ -481,12 +498,12 @@ impl Client {
481498
timeout,
482499
);
483500
#[cfg(feature = "tracing-unstable")]
484-
event_emitter.emit_started_event(self.inner.topology.watch().observe_latest().description);
501+
event_emitter.emit_started_event(self.inner.topology.latest().description.clone());
485502
// We only want to emit this message once per operation at most.
486503
#[cfg(feature = "tracing-unstable")]
487504
let mut emitted_waiting_message = false;
488505

489-
let mut watcher = self.inner.topology.watch();
506+
let mut watcher = self.inner.topology.watcher().clone();
490507
loop {
491508
let state = watcher.observe_latest();
492509
let override_slot;
@@ -550,8 +567,7 @@ impl Client {
550567

551568
#[cfg(all(test, feature = "dns-resolver"))]
552569
pub(crate) fn get_hosts(&self) -> Vec<String> {
553-
let watcher = self.inner.topology.watch();
554-
let state = watcher.peek_latest();
570+
let state = self.inner.topology.latest();
555571

556572
state
557573
.servers()
@@ -562,17 +578,12 @@ impl Client {
562578

563579
#[cfg(test)]
564580
pub(crate) async fn sync_workers(&self) {
565-
self.inner.topology.sync_workers().await;
581+
self.inner.topology.updater().sync_workers().await;
566582
}
567583

568584
#[cfg(test)]
569585
pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription {
570-
self.inner
571-
.topology
572-
.watch()
573-
.peek_latest()
574-
.description
575-
.clone()
586+
self.inner.topology.latest().description.clone()
576587
}
577588

578589
#[cfg(test)]
@@ -588,7 +599,7 @@ impl Client {
588599
.options
589600
.server_selection_timeout
590601
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
591-
let mut watcher = self.inner.topology.watch();
602+
let mut watcher = self.inner.topology.watcher().clone();
592603
loop {
593604
let topology = watcher.observe_latest();
594605
if let Some(desc) = topology.description.primary() {
@@ -628,7 +639,7 @@ impl Client {
628639
// The maximum number of session IDs that should be sent in a single endSessions command.
629640
const MAX_END_SESSIONS_BATCH_SIZE: usize = 10_000;
630641

631-
let mut watcher = self.inner.topology.watch();
642+
let mut watcher = self.inner.topology.watcher().clone();
632643
let selection_criteria =
633644
SelectionCriteria::from(ReadPreference::PrimaryPreferred { options: None });
634645

src/client/action/perf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ impl<'a> Action for crate::action::WarmConnectionPool<'a> {
1515
// No-op when min_pool_size is zero.
1616
return;
1717
}
18-
self.client.inner.topology.warm_pool().await;
18+
self.client.inner.topology.updater().fill_pool().await;
1919
}
2020
}

src/client/action/shutdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl Action for crate::action::Shutdown {
2828
if !self.client.inner.shutdown.executed.load(Ordering::SeqCst) {
2929
self.client.end_all_sessions().await;
3030
}
31-
self.client.inner.topology.shutdown().await;
31+
self.client.inner.topology.updater().shutdown().await;
3232
// This has to happen last to allow pending cleanup to execute commands.
3333
self.client
3434
.inner

src/client/executor.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ impl Client {
433433

434434
self.inner
435435
.topology
436+
.updater()
436437
.handle_application_error(
437438
server_addr.clone(),
438439
err.clone(),
@@ -651,7 +652,7 @@ impl Client {
651652
let stream_description = connection.stream_description()?;
652653
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
653654
let mut cmd = op.build(stream_description)?;
654-
self.inner.topology.update_command_with_read_pref(
655+
self.inner.topology.watcher().update_command_with_read_pref(
655656
connection.address(),
656657
&mut cmd,
657658
&effective_criteria,
@@ -749,7 +750,7 @@ impl Client {
749750
}
750751

751752
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
752-
let client_cluster_time = self.inner.topology.cluster_time();
753+
let client_cluster_time = self.inner.topology.watcher().cluster_time();
753754
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
754755
if let Some(cluster_time) = max_cluster_time {
755756
cmd.set_cluster_time(cluster_time);
@@ -883,7 +884,7 @@ impl Client {
883884
}
884885

885886
async fn select_data_bearing_server(&self, operation_name: &str) -> Result<()> {
886-
let topology_type = self.inner.topology.topology_type();
887+
let topology_type = self.inner.topology.watcher().topology_type();
887888
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
888889
let server_type = server_info.server_type();
889890
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
@@ -899,15 +900,15 @@ impl Client {
899900
/// topology supports transactions, this method will perform a server selection that will force
900901
/// that determination to be made.
901902
pub(crate) async fn transaction_support_status(&self) -> Result<TransactionSupportStatus> {
902-
let initial_status = self.inner.topology.transaction_support_status();
903+
let initial_status = self.inner.topology.watcher().transaction_support_status();
903904

904905
// Need to guarantee that we're connected to at least one server that can determine if
905906
// sessions are supported or not.
906907
match initial_status {
907908
TransactionSupportStatus::Undetermined => {
908909
self.select_data_bearing_server("Check transactions support status")
909910
.await?;
910-
Ok(self.inner.topology.transaction_support_status())
911+
Ok(self.inner.topology.watcher().transaction_support_status())
911912
}
912913
_ => Ok(initial_status),
913914
}
@@ -953,6 +954,7 @@ impl Client {
953954
if let Some(ref cluster_time) = cluster_time {
954955
self.inner
955956
.topology
957+
.updater()
956958
.advance_cluster_time(cluster_time.clone())
957959
.await;
958960
if let Some(ref mut session) = session {

src/client/options.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,9 @@ pub(crate) struct TestOptions {
656656

657657
/// Async-capable command event listener.
658658
pub(crate) async_event_listener: Option<TestEventSender>,
659+
660+
/// Callback to receive hello commands.
661+
pub(crate) hello_cb: Option<EventHandler<crate::cmap::Command>>,
659662
}
660663

661664
pub(crate) type TestEventSender = tokio::sync::mpsc::Sender<
@@ -1112,7 +1115,7 @@ impl TlsOptions {
11121115

11131116
/// Extra information to append to the driver version in the metadata of the handshake with the
11141117
/// server. This should be used by libraries wrapping the driver, e.g. ODMs.
1115-
#[derive(Clone, Debug, Deserialize, TypedBuilder, PartialEq)]
1118+
#[derive(Clone, Debug, Deserialize, TypedBuilder, Eq)]
11161119
#[builder(field_defaults(default, setter(into)))]
11171120
#[non_exhaustive]
11181121
pub struct DriverInfo {
@@ -1127,6 +1130,32 @@ pub struct DriverInfo {
11271130
pub platform: Option<String>,
11281131
}
11291132

1133+
impl DriverInfo {
1134+
pub(crate) fn spec_version(&self) -> &str {
1135+
self.version.as_deref().unwrap_or("")
1136+
}
1137+
1138+
pub(crate) fn spec_platform(&self) -> &str {
1139+
self.platform.as_deref().unwrap_or("")
1140+
}
1141+
}
1142+
1143+
impl PartialEq for DriverInfo {
1144+
fn eq(&self, other: &Self) -> bool {
1145+
self.name == other.name
1146+
&& self.spec_version() == other.spec_version()
1147+
&& self.spec_platform() == other.spec_platform()
1148+
}
1149+
}
1150+
1151+
impl Hash for DriverInfo {
1152+
fn hash<H: Hasher>(&self, state: &mut H) {
1153+
self.name.hash(state);
1154+
self.spec_version().hash(state);
1155+
self.spec_platform().hash(state);
1156+
}
1157+
}
1158+
11301159
impl ClientOptions {
11311160
/// Creates a new ClientOptions with the `original_srv_hostname` field set to the testing value
11321161
/// used in the SRV tests.

src/client/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl ClientSession {
212212
options: Option<SessionOptions>,
213213
is_implicit: bool,
214214
) -> Self {
215-
let timeout = client.inner.topology.logical_session_timeout();
215+
let timeout = client.inner.topology.watcher().logical_session_timeout();
216216
let server_session = client.inner.session_pool.check_out(timeout).await;
217217
Self {
218218
drop_token: client.register_async_drop(),

src/cmap/establish.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ use super::{
1313
Connection,
1414
PoolGeneration,
1515
};
16+
#[cfg(test)]
17+
use crate::options::ClientOptions;
1618
use crate::{
1719
client::{
1820
auth::Credential,
19-
options::{ClientOptions, ServerAddress, TlsOptions},
21+
options::{ServerAddress, TlsOptions},
2022
},
2123
error::{Error as MongoError, ErrorKind, Result},
2224
hello::HelloReply,
2325
runtime::{self, stream::DEFAULT_CONNECT_TIMEOUT, AsyncStream, TlsConfig},
24-
sdam::HandshakePhase,
26+
sdam::{topology::TopologySpec, HandshakePhase},
2527
};
2628

2729
/// Contains the logic to establish a connection, including handshaking, authenticating, and
@@ -48,18 +50,25 @@ pub(crate) struct EstablisherOptions {
4850
pub(crate) test_patch_reply: Option<fn(&mut Result<HelloReply>)>,
4951
}
5052

51-
impl From<&ClientOptions> for EstablisherOptions {
52-
fn from(opts: &ClientOptions) -> Self {
53+
impl From<&TopologySpec> for EstablisherOptions {
54+
fn from(spec: &TopologySpec) -> Self {
5355
Self {
54-
handshake_options: HandshakerOptions::from(opts),
55-
tls_options: opts.tls_options(),
56-
connect_timeout: opts.connect_timeout,
56+
handshake_options: HandshakerOptions::from(spec),
57+
tls_options: spec.options.tls_options(),
58+
connect_timeout: spec.options.connect_timeout,
5759
#[cfg(test)]
5860
test_patch_reply: None,
5961
}
6062
}
6163
}
6264

65+
#[cfg(test)]
66+
impl From<&ClientOptions> for EstablisherOptions {
67+
fn from(options: &ClientOptions) -> Self {
68+
Self::from(&TopologySpec::try_from(options.clone()).unwrap())
69+
}
70+
}
71+
6372
impl ConnectionEstablisher {
6473
/// Creates a new ConnectionEstablisher from the given options.
6574
pub(crate) fn new(options: EstablisherOptions) -> Result<Self> {

0 commit comments

Comments
 (0)