Skip to content

Commit fa952f4

Browse files
authored
RUST-1587 Implement server selection tracing events (#805)
1 parent 2931c6a commit fa952f4

24 files changed

+2913
-94
lines changed

src/client/executor.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ impl Client {
312312
.and_then(|s| s.transaction.pinned_mongos())
313313
.or_else(|| op.selection_criteria());
314314

315-
let server = match self.select_server(selection_criteria).await {
315+
let server = match self.select_server(selection_criteria, op.name()).await {
316316
Ok(server) => server,
317317
Err(mut err) => {
318318
retry.first_error()?;
@@ -803,14 +803,14 @@ impl Client {
803803
}
804804
}
805805

806-
async fn select_data_bearing_server(&self) -> Result<()> {
806+
async fn select_data_bearing_server(&self, operation_name: &str) -> Result<()> {
807807
let topology_type = self.inner.topology.topology_type();
808808
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
809809
let server_type = server_info.server_type();
810810
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
811811
|| server_type.is_data_bearing()
812812
}));
813-
let _: SelectedServer = self.select_server(Some(&criteria)).await?;
813+
let _: SelectedServer = self.select_server(Some(&criteria), operation_name).await?;
814814
Ok(())
815815
}
816816

@@ -824,7 +824,8 @@ impl Client {
824824
// sessions are supported or not.
825825
match initial_status {
826826
SessionSupportStatus::Undetermined => {
827-
self.select_data_bearing_server().await?;
827+
self.select_data_bearing_server(crate::client::SESSIONS_SUPPORT_OP_NAME)
828+
.await?;
828829
Ok(self.inner.topology.session_support_status())
829830
}
830831
_ => Ok(initial_status),
@@ -841,7 +842,8 @@ impl Client {
841842
// sessions are supported or not.
842843
match initial_status {
843844
TransactionSupportStatus::Undetermined => {
844-
self.select_data_bearing_server().await?;
845+
self.select_data_bearing_server("Check transactions support status")
846+
.await?;
845847
Ok(self.inner.topology.transaction_support_status())
846848
}
847849
_ => Ok(initial_status),

src/client/mod.rs

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::options::ServerAddress;
1919
#[cfg(feature = "tracing-unstable")]
2020
use crate::trace::{
2121
command::CommandTracingEventEmitter,
22+
server_selection::ServerSelectionTracingEventEmitter,
2223
trace_or_log_enabled,
2324
TracingOrLogLevel,
2425
COMMAND_TRACING_EVENT_TARGET,
@@ -33,7 +34,7 @@ use crate::{
3334
},
3435
concern::{ReadConcern, WriteConcern},
3536
db::Database,
36-
error::{ErrorKind, Result},
37+
error::{Error, ErrorKind, Result},
3738
event::command::{handle_command_event, CommandEvent},
3839
operation::{AggregateTarget, ListDatabases},
3940
options::{
@@ -55,6 +56,8 @@ pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
5556
use session::{ServerSession, ServerSessionPool};
5657

5758
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
59+
// TODO: RUST-1585 Remove this constant.
60+
pub(crate) const SESSIONS_SUPPORT_OP_NAME: &str = "Check sessions support status";
5861

5962
/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
6063
/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
@@ -480,13 +483,18 @@ impl Client {
480483
&self,
481484
criteria: Option<&SelectionCriteria>,
482485
) -> Result<ServerAddress> {
483-
let server = self.select_server(criteria).await?;
486+
let server = self.select_server(criteria, "Test select server").await?;
484487
Ok(server.address.clone())
485488
}
486489

487490
/// Select a server using the provided criteria. If none is provided, a primary read preference
488491
/// will be used instead.
489-
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<SelectedServer> {
492+
#[allow(unused_variables)] // we only use the operation_name for tracing.
493+
async fn select_server(
494+
&self,
495+
criteria: Option<&SelectionCriteria>,
496+
operation_name: &str,
497+
) -> Result<SelectedServer> {
490498
let criteria =
491499
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
492500

@@ -497,31 +505,70 @@ impl Client {
497505
.server_selection_timeout
498506
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
499507

508+
#[cfg(feature = "tracing-unstable")]
509+
let event_emitter = ServerSelectionTracingEventEmitter::new(
510+
self.inner.topology.id,
511+
criteria,
512+
operation_name,
513+
start_time,
514+
timeout,
515+
);
516+
#[cfg(feature = "tracing-unstable")]
517+
event_emitter.emit_started_event(self.inner.topology.watch().observe_latest().description);
518+
// We only want to emit this message once per operation at most.
519+
#[cfg(feature = "tracing-unstable")]
520+
let mut emitted_waiting_message = false;
521+
500522
let mut watcher = self.inner.topology.watch();
501523
loop {
502524
let state = watcher.observe_latest();
503525

504-
if let Some(server) = server_selection::attempt_to_select_server(
526+
let result = server_selection::attempt_to_select_server(
505527
criteria,
506528
&state.description,
507529
&state.servers(),
508-
)? {
509-
return Ok(server);
510-
}
530+
);
531+
match result {
532+
Err(error) => {
533+
#[cfg(feature = "tracing-unstable")]
534+
event_emitter.emit_failed_event(&state.description, &error);
511535

512-
watcher.request_immediate_check();
513-
514-
let change_occurred = start_time.elapsed() < timeout
515-
&& watcher
516-
.wait_for_update(timeout - start_time.elapsed())
517-
.await;
518-
if !change_occurred {
519-
return Err(ErrorKind::ServerSelection {
520-
message: state
521-
.description
522-
.server_selection_timeout_error_message(criteria),
536+
return Err(error);
537+
}
538+
Ok(result) => {
539+
if let Some(server) = result {
540+
#[cfg(feature = "tracing-unstable")]
541+
event_emitter.emit_succeeded_event(&state.description, &server);
542+
543+
return Ok(server);
544+
} else {
545+
#[cfg(feature = "tracing-unstable")]
546+
if !emitted_waiting_message {
547+
event_emitter.emit_waiting_event(&state.description);
548+
emitted_waiting_message = true;
549+
}
550+
551+
watcher.request_immediate_check();
552+
553+
let change_occurred = start_time.elapsed() < timeout
554+
&& watcher
555+
.wait_for_update(timeout - start_time.elapsed())
556+
.await;
557+
if !change_occurred {
558+
let error: Error = ErrorKind::ServerSelection {
559+
message: state
560+
.description
561+
.server_selection_timeout_error_message(criteria),
562+
}
563+
.into();
564+
565+
#[cfg(feature = "tracing-unstable")]
566+
event_emitter.emit_failed_event(&state.description, &error);
567+
568+
return Err(error);
569+
}
570+
}
523571
}
524-
.into());
525572
}
526573
}
527574
}

src/event/sdam/topology_description.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use crate::{
1111

1212
/// A description of the most up-to-date information known about a topology. Further details can
1313
/// be found in the [Server Discovery and Monitoring specification](https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst).
14-
#[derive(Clone)]
14+
#[derive(Clone, derive_more::Display)]
15+
#[display(fmt = "{}", description)]
1516
pub struct TopologyDescription {
1617
pub(crate) description: crate::sdam::TopologyDescription,
1718
}
@@ -102,38 +103,3 @@ impl fmt::Debug for TopologyDescription {
102103
.finish()
103104
}
104105
}
105-
106-
impl fmt::Display for TopologyDescription {
107-
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
108-
write!(f, "{{ Type: {:?}", self.description.topology_type)?;
109-
110-
if let Some(ref set_name) = self.description.set_name {
111-
write!(f, ", Set Name: {}", set_name)?;
112-
}
113-
114-
if let Some(max_set_version) = self.description.max_set_version {
115-
write!(f, ", Max Set Version: {}", max_set_version)?;
116-
}
117-
118-
if let Some(max_election_id) = self.description.max_election_id {
119-
write!(f, ", Max Election ID: {}", max_election_id)?;
120-
}
121-
122-
if let Some(ref compatibility_error) = self.description.compatibility_error {
123-
write!(f, ", Compatibility Error: {}", compatibility_error)?;
124-
}
125-
126-
if !self.description.servers.is_empty() {
127-
write!(f, ", Servers: ")?;
128-
let mut iter = self.description.servers.values();
129-
if let Some(server) = iter.next() {
130-
write!(f, "{}", ServerInfo::new_borrowed(server))?;
131-
}
132-
for server in iter {
133-
write!(f, ", {}", ServerInfo::new_borrowed(server))?;
134-
}
135-
}
136-
137-
write!(f, " }}")
138-
}
139-
}

src/sdam/description/topology/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
use self::server_selection::IDLE_WRITE_PERIOD;
2626

2727
/// The possible types for a topology.
28-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
28+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, derive_more::Display)]
2929
#[non_exhaustive]
3030
pub enum TopologyType {
3131
/// A single mongod server.

src/sdam/description/topology/server_selection/mod.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ impl SelectedServer {
3434
server.increment_operation_count();
3535
Self { server }
3636
}
37+
38+
#[cfg(feature = "tracing-unstable")]
39+
pub(crate) fn address(&self) -> &ServerAddress {
40+
&self.server.address
41+
}
3742
}
3843

3944
impl Deref for SelectedServer {
@@ -362,12 +367,38 @@ impl TopologyDescription {
362367
}
363368

364369
impl fmt::Display for TopologyDescription {
365-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366-
write!(f, "{{ Type: {:?}, Servers: [ ", self.topology_type)?;
367-
for server_info in self.servers.values().map(ServerInfo::new_borrowed) {
368-
write!(f, "{}, ", server_info)?;
370+
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
371+
write!(f, "{{ Type: {}", self.topology_type)?;
372+
373+
if let Some(ref set_name) = self.set_name {
374+
write!(f, ", Set Name: {}", set_name)?;
375+
}
376+
377+
if let Some(max_set_version) = self.max_set_version {
378+
write!(f, ", Max Set Version: {}", max_set_version)?;
379+
}
380+
381+
if let Some(max_election_id) = self.max_election_id {
382+
write!(f, ", Max Election ID: {}", max_election_id)?;
369383
}
370-
write!(f, "] }}")
384+
385+
if let Some(ref compatibility_error) = self.compatibility_error {
386+
write!(f, ", Compatibility Error: {}", compatibility_error)?;
387+
}
388+
389+
if !self.servers.is_empty() {
390+
write!(f, ", Servers: [ ")?;
391+
let mut iter = self.servers.values();
392+
if let Some(server) = iter.next() {
393+
write!(f, "{}", ServerInfo::new_borrowed(server))?;
394+
}
395+
for server in iter {
396+
write!(f, ", {}", ServerInfo::new_borrowed(server))?;
397+
}
398+
write!(f, " ]")?;
399+
}
400+
401+
write!(f, " }}")
371402
}
372403
}
373404

src/selection_criteria.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ use crate::{
1313
};
1414

1515
/// Describes which servers are suitable for a given operation.
16-
#[derive(Clone, Derivative)]
16+
#[derive(Clone, Derivative, derive_more::Display)]
1717
#[derivative(Debug)]
1818
#[non_exhaustive]
1919
pub enum SelectionCriteria {
2020
/// A read preference that describes the suitable servers based on the server type, max
2121
/// staleness, and server tags.
2222
///
2323
/// See the documentation [here](https://www.mongodb.com/docs/manual/core/read-preference/) for more details.
24+
#[display(fmt = "ReadPreference {}", _0)]
2425
ReadPreference(ReadPreference),
2526

2627
/// A predicate used to filter servers that are considered suitable. A `server` will be
2728
/// considered suitable by a `predicate` if `predicate(server)` returns true.
29+
#[display(fmt = "Custom predicate")]
2830
Predicate(#[derivative(Debug = "ignore")] Predicate),
2931
}
3032

@@ -129,6 +131,48 @@ pub enum ReadPreference {
129131
Nearest { options: ReadPreferenceOptions },
130132
}
131133

134+
impl std::fmt::Display for ReadPreference {
135+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136+
write!(f, "{{ Mode: ")?;
137+
let opts_ref = match self {
138+
ReadPreference::Primary => {
139+
write!(f, "Primary")?;
140+
None
141+
}
142+
ReadPreference::Secondary { options } => {
143+
write!(f, "Secondary")?;
144+
Some(options)
145+
}
146+
ReadPreference::PrimaryPreferred { options } => {
147+
write!(f, "PrimaryPreferred")?;
148+
Some(options)
149+
}
150+
ReadPreference::SecondaryPreferred { options } => {
151+
write!(f, "SecondaryPreferred")?;
152+
Some(options)
153+
}
154+
ReadPreference::Nearest { options } => {
155+
write!(f, "Nearest")?;
156+
Some(options)
157+
}
158+
};
159+
if let Some(opts) = opts_ref {
160+
if !opts.is_default() {
161+
if let Some(ref tag_sets) = opts.tag_sets {
162+
write!(f, ", Tag Sets: {:?}", tag_sets)?;
163+
}
164+
if let Some(ref max_staleness) = opts.max_staleness {
165+
write!(f, ", Max Staleness: {:?}", max_staleness)?;
166+
}
167+
if let Some(ref hedge) = opts.hedge {
168+
write!(f, ", Hedge: {}", hedge.enabled)?;
169+
}
170+
}
171+
}
172+
write!(f, " }}")
173+
}
174+
}
175+
132176
impl<'de> Deserialize<'de> for ReadPreference {
133177
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
134178
where

0 commit comments

Comments
 (0)