Skip to content

Commit 1c75a3f

Browse files
RUST-232 SDAM Monitoring (#404)
1 parent 8781988 commit 1c75a3f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1781
-336
lines changed

src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ struct ClientInner {
9090

9191
impl Drop for ClientInner {
9292
fn drop(&mut self) {
93-
self.topology.mark_closed()
93+
self.topology.close()
9494
}
9595
}
9696

src/client/options/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
client::auth::{AuthMechanism, Credential},
4444
concern::{Acknowledgment, ReadConcern, WriteConcern},
4545
error::{ErrorKind, Result},
46-
event::{cmap::CmapEventHandler, command::CommandEventHandler},
46+
event::{cmap::CmapEventHandler, command::CommandEventHandler, sdam::SdamEventHandler},
4747
options::ReadConcernLevel,
4848
sdam::MIN_HEARTBEAT_FREQUENCY,
4949
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
@@ -492,6 +492,13 @@ pub struct ClientOptions {
492492
#[builder(default)]
493493
pub retry_writes: Option<bool>,
494494

495+
/// The handler that should process all Server Discovery and Monitoring events. See the
496+
/// [`SdamEventHandler`] type documentation for more details.
497+
#[derivative(Debug = "ignore", PartialEq = "ignore")]
498+
#[builder(default)]
499+
#[serde(skip)]
500+
pub sdam_event_handler: Option<Arc<dyn SdamEventHandler>>,
501+
495502
/// The default selection criteria for operations performed on the Client. See the
496503
/// SelectionCriteria type documentation for more details.
497504
#[builder(default)]
@@ -934,6 +941,7 @@ impl From<ClientOptionsParser> for ClientOptions {
934941
heartbeat_freq_test: None,
935942
allow_load_balanced: false,
936943
load_balanced: parser.load_balanced,
944+
sdam_event_handler: None,
937945
}
938946
}
939947
}

src/cmap/conn/command.rs

Lines changed: 27 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
use std::time::Duration;
2+
13
use serde::{de::DeserializeOwned, Serialize};
24

35
use super::wire::Message;
46
use crate::{
57
bson::Document,
68
client::{options::ServerApi, ClusterTime},
79
error::{Error, ErrorKind, Result},
8-
operation::{CommandErrorBody, CommandResponse},
10+
is_master::{IsMasterCommandResponse, IsMasterReply},
11+
operation::{CommandErrorBody, CommandResponse, Response},
912
options::{ReadConcern, ServerAddress},
1013
selection_criteria::ReadPreference,
1114
ClientSession,
@@ -168,53 +171,33 @@ impl RawCommandResponse {
168171
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
169172
}
170173

171-
/// Deserialize the raw bytes into a response backed by a `Document` for further processing.
172-
pub(crate) fn into_document_response(self) -> Result<DocumentCommandResponse> {
173-
let response: CommandResponse<Document> = self.body()?;
174-
Ok(DocumentCommandResponse { response })
174+
pub(crate) fn to_is_master_response(&self, round_trip_time: Duration) -> Result<IsMasterReply> {
175+
match self.body::<CommandResponse<IsMasterCommandResponse>>() {
176+
Ok(response) if response.is_success() => {
177+
let server_address = self.source_address().clone();
178+
let cluster_time = response.cluster_time().cloned();
179+
Ok(IsMasterReply {
180+
server_address,
181+
command_response: response.body,
182+
round_trip_time,
183+
cluster_time,
184+
})
185+
}
186+
_ => match self.body::<CommandResponse<CommandErrorBody>>() {
187+
Ok(command_error_body) => Err(Error::new(
188+
ErrorKind::Command(command_error_body.body.command_error),
189+
command_error_body.body.error_labels,
190+
)),
191+
Err(_) => Err(ErrorKind::InvalidResponse {
192+
message: "invalid server response".into(),
193+
}
194+
.into()),
195+
},
196+
}
175197
}
176198

177199
/// The address of the server that sent this response.
178200
pub(crate) fn source_address(&self) -> &ServerAddress {
179201
&self.source
180202
}
181203
}
182-
183-
/// A command response backed by a `Document` rather than raw bytes.
184-
/// Use this for simple command responses where deserialization performance is not a high priority.
185-
pub(crate) struct DocumentCommandResponse {
186-
response: CommandResponse<Document>,
187-
}
188-
189-
impl DocumentCommandResponse {
190-
/// Returns a result indicating whether this response corresponds to a command failure.
191-
pub(crate) fn validate(&self) -> Result<()> {
192-
if !self.response.is_success() {
193-
let error_response: CommandErrorBody = bson::from_document(self.response.body.clone())
194-
.map_err(|_| ErrorKind::InvalidResponse {
195-
message: "invalid server response".to_string(),
196-
})?;
197-
Err(Error::new(
198-
ErrorKind::Command(error_response.command_error),
199-
error_response.error_labels,
200-
))
201-
} else {
202-
Ok(())
203-
}
204-
}
205-
206-
/// Deserialize the body of the response.
207-
pub(crate) fn body<T: DeserializeOwned>(self) -> Result<T> {
208-
match bson::from_document(self.response.body) {
209-
Ok(body) => Ok(body),
210-
Err(e) => Err(ErrorKind::InvalidResponse {
211-
message: format!("{}", e),
212-
}
213-
.into()),
214-
}
215-
}
216-
217-
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
218-
self.response.cluster_time.as_ref()
219-
}
220-
}

src/cmap/conn/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub struct ConnectionInfo {
4949
#[derivative(Debug)]
5050
pub(crate) struct Connection {
5151
pub(super) id: u32,
52-
pub(super) address: ServerAddress,
52+
pub(crate) address: ServerAddress,
5353
pub(crate) generation: ConnectionGeneration,
5454

5555
/// The cached StreamDescription from the connection's handshake.

src/cmap/establish/handshake/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#[cfg(test)]
22
mod test;
33

4+
use std::sync::Arc;
5+
46
use lazy_static::lazy_static;
57
use os_info::{Type, Version};
68

@@ -9,8 +11,10 @@ use crate::{
911
client::auth::{ClientFirst, FirstRound},
1012
cmap::{options::ConnectionPoolOptions, Command, Connection, StreamDescription},
1113
error::{ErrorKind, Result},
14+
event::sdam::SdamEventHandler,
1215
is_master::{is_master_command, run_is_master, IsMasterReply},
1316
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
17+
sdam::WeakTopology,
1418
};
1519

1620
#[cfg(feature = "tokio-runtime")]
@@ -133,7 +137,7 @@ lazy_static! {
133137
}
134138

135139
/// Contains the logic needed to handshake a connection.
136-
#[derive(Debug, Clone)]
140+
#[derive(Clone, Debug)]
137141
pub(crate) struct Handshaker {
138142
/// The `isMaster` command to send when handshaking. This will always be identical
139143
/// given the same pool options, so it can be created at the time the Handshaker is created.
@@ -192,12 +196,17 @@ impl Handshaker {
192196
}
193197

194198
/// Handshakes a connection.
195-
pub(crate) async fn handshake(&self, conn: &mut Connection) -> Result<HandshakeResult> {
199+
pub(crate) async fn handshake(
200+
&self,
201+
conn: &mut Connection,
202+
topology: Option<&WeakTopology>,
203+
handler: &Option<Arc<dyn SdamEventHandler>>,
204+
) -> Result<HandshakeResult> {
196205
let mut command = self.command.clone();
197206

198207
let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;
199208

200-
let mut is_master_reply = run_is_master(command, conn).await?;
209+
let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
201210
if self.command.body.contains_key("loadBalanced")
202211
&& is_master_reply.command_response.service_id.is_none()
203212
{

src/cmap/establish/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl ConnectionEstablisher {
5252

5353
let handshake = self
5454
.handshaker
55-
.handshake(&mut connection)
55+
.handshake(&mut connection, None, &None)
5656
.await
5757
.map_err(|e| EstablishError::pre_hello(e, pool_gen.clone()))?;
5858
let service_id = handshake.is_master_reply.command_response.service_id;

src/cmap/establish/test.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use tokio::sync::RwLockWriteGuard;
22

33
use crate::{
4-
bson::{doc, Bson},
4+
bson::{doc, Bson, Document},
55
cmap::{establish::Handshaker, Command, Connection, ConnectionPoolOptions},
6+
operation::CommandResponse,
67
options::{AuthMechanism, ClientOptions, Credential, ReadPreference},
78
test::{TestClient, CLIENT_OPTIONS, LOCK},
89
};
@@ -47,7 +48,11 @@ async fn speculative_auth_test(
4748
.await
4849
.unwrap();
4950

50-
let first_round = handshaker.handshake(&mut conn).await.unwrap().first_round;
51+
let first_round = handshaker
52+
.handshake(&mut conn, None, &None)
53+
.await
54+
.unwrap()
55+
.first_round;
5156

5257
// We expect that the server will return a response with the `speculativeAuthenticate` field if
5358
// and only if it's new enough to support it.
@@ -70,11 +75,9 @@ async fn speculative_auth_test(
7075
});
7176

7277
let response = conn.send_command(command, None).await.unwrap();
73-
let doc_response = response.into_document_response().unwrap();
78+
let doc_response: CommandResponse<Document> = response.body().unwrap();
7479

75-
doc_response
76-
.validate()
77-
.expect("response should be successful");
80+
assert!(doc_response.is_success());
7881
}
7982

8083
#[cfg_attr(feature = "tokio-runtime", tokio::test)]

src/cmap/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ impl ConnectionPool {
7272
options.clone(),
7373
);
7474

75-
let event_handler = options.as_ref().and_then(|opts| opts.event_handler.clone());
75+
let event_handler = options
76+
.as_ref()
77+
.and_then(|opts| opts.cmap_event_handler.clone());
7678

7779
if let Some(ref handler) = event_handler {
7880
handler.handle_pool_created_event(PoolCreatedEvent {

src/cmap/options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub(crate) struct ConnectionPoolOptions {
4141
/// Processes all events generated by the pool.
4242
#[derivative(Debug = "ignore", PartialEq = "ignore")]
4343
#[serde(skip)]
44-
pub(crate) event_handler: Option<Arc<dyn CmapEventHandler>>,
44+
pub(crate) cmap_event_handler: Option<Arc<dyn CmapEventHandler>>,
4545

4646
/// Interval between background thread maintenance runs (e.g. ensure minPoolSize).
4747
#[cfg(test)]
@@ -103,7 +103,7 @@ impl ConnectionPoolOptions {
103103
server_api: options.server_api.clone(),
104104
tls_options: options.tls_options(),
105105
credential: options.credential.clone(),
106-
event_handler: options.cmap_event_handler.clone(),
106+
cmap_event_handler: options.cmap_event_handler.clone(),
107107
#[cfg(test)]
108108
background_thread_interval: None,
109109
#[cfg(test)]
@@ -163,7 +163,7 @@ impl From<ConnectionPoolOptions> for ConnectionOptions {
163163
Self {
164164
connect_timeout: pool_options.connect_timeout,
165165
tls_options: pool_options.tls_options,
166-
event_handler: pool_options.event_handler,
166+
event_handler: pool_options.cmap_event_handler,
167167
}
168168
}
169169
}

src/cmap/test/integration.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use super::{
66
EVENT_TIMEOUT,
77
};
88
use crate::{
9-
bson::doc,
9+
bson::{doc, Document},
1010
cmap::{options::ConnectionPoolOptions, Command, ConnectionPool},
1111
event::cmap::{CmapEventHandler, ConnectionClosedReason},
12+
operation::CommandResponse,
1213
sdam::ServerUpdateSender,
1314
selection_criteria::ReadPreference,
1415
test::{FailCommandOptions, FailPoint, FailPointMode, TestClient, CLIENT_OPTIONS, LOCK},
@@ -55,13 +56,11 @@ async fn acquire_connection_and_send_command() {
5556
}
5657

5758
let response = connection.send_command(cmd, None).await.unwrap();
58-
let doc_response = response.into_document_response().unwrap();
59+
let doc_response: CommandResponse<Document> = response.body().unwrap();
5960

60-
doc_response
61-
.validate()
62-
.expect("response should be successful");
61+
assert!(doc_response.is_success());
6362

64-
let response: ListDatabasesResponse = doc_response.body().unwrap();
63+
let response: ListDatabasesResponse = bson::from_document(doc_response.body).unwrap();
6564

6665
let names: Vec<_> = response
6766
.databases
@@ -107,7 +106,7 @@ async fn concurrent_connections() {
107106
let handler = Arc::new(EventHandler::new());
108107
let client_options = CLIENT_OPTIONS.clone();
109108
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
110-
options.event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
109+
options.cmap_event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
111110
options.ready = Some(true);
112111

113112
let pool = ConnectionPool::new(
@@ -191,7 +190,7 @@ async fn connection_error_during_establishment() {
191190

192191
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
193192
options.ready = Some(true);
194-
options.event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
193+
options.cmap_event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
195194
let pool = ConnectionPool::new(
196195
client_options.hosts[0].clone(),
197196
Default::default(),

0 commit comments

Comments
 (0)