Skip to content

Commit 32d67de

Browse files
authored
RUST-598 Perform handshake when establishing monitoring connections (#271)
1 parent a1f4616 commit 32d67de

File tree

6 files changed

+173
-117
lines changed

6 files changed

+173
-117
lines changed

src/cmap/establish/handshake/mod.rs

Lines changed: 125 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use crate::{
1010
bson::{doc, Bson, Document},
1111
client::auth::{ClientFirst, FirstRound},
1212
cmap::{options::ConnectionPoolOptions, Command, Connection, StreamDescription},
13-
error::Result,
13+
error::{ErrorKind, Result},
1414
is_master::{IsMasterCommandResponse, IsMasterReply},
15-
options::{AuthMechanism, Credential},
15+
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
1616
};
1717

1818
#[cfg(feature = "tokio-runtime")]
@@ -136,95 +136,167 @@ lazy_static! {
136136

137137
/// Contains the logic needed to handshake a connection.
138138
#[derive(Debug, Clone)]
139-
pub(super) struct Handshaker {
139+
pub(crate) struct Handshaker {
140140
/// The `isMaster` command to send when handshaking. This will always be identical
141141
/// given the same pool options, so it can be created at the time the Handshaker is created.
142142
command: Command,
143+
credential: Option<Credential>,
143144
}
144145

145146
impl Handshaker {
146147
/// Creates a new Handshaker.
147-
pub(super) fn new(options: Option<&ConnectionPoolOptions>) -> Self {
148+
pub(crate) fn new(options: Option<HandshakerOptions>) -> Self {
148149
let mut metadata = BASE_CLIENT_METADATA.clone();
150+
let mut credential = None;
151+
let mut db = None;
152+
let mut server_api = None;
149153

150-
if let Some(app_name) = options.as_ref().and_then(|opts| opts.app_name.as_ref()) {
151-
metadata.application = Some(AppMetadata {
152-
name: app_name.to_string(),
153-
});
154-
}
155-
156-
if let Some(driver_info) = options.as_ref().and_then(|opts| opts.driver_info.as_ref()) {
157-
metadata.driver.name.push('|');
158-
metadata.driver.name.push_str(&driver_info.name);
154+
let mut body = doc! {
155+
"isMaster": 1,
156+
};
159157

160-
if let Some(ref version) = driver_info.version {
161-
metadata.driver.version.push('|');
162-
metadata.driver.version.push_str(version);
158+
if let Some(options) = options {
159+
if let Some(app_name) = options.app_name {
160+
metadata.application = Some(AppMetadata { name: app_name });
163161
}
164162

165-
if let Some(ref mut platform) = metadata.platform {
166-
if let Some(ref driver_info_platform) = driver_info.platform {
167-
platform.push('|');
168-
platform.push_str(driver_info_platform);
163+
if let Some(driver_info) = options.driver_info {
164+
metadata.driver.name.push('|');
165+
metadata.driver.name.push_str(&driver_info.name);
166+
167+
if let Some(ref version) = driver_info.version {
168+
metadata.driver.version.push('|');
169+
metadata.driver.version.push_str(version);
170+
}
171+
172+
if let Some(ref mut platform) = metadata.platform {
173+
if let Some(ref driver_info_platform) = driver_info.platform {
174+
platform.push('|');
175+
platform.push_str(driver_info_platform);
176+
}
169177
}
170178
}
179+
180+
if let Some(cred) = options.credential {
181+
cred.append_needed_mechanism_negotiation(&mut body);
182+
db = Some(cred.resolved_source().to_string());
183+
credential = Some(cred);
184+
}
185+
186+
server_api = options.server_api;
171187
}
172188

173-
let mut db = "admin";
189+
body.insert("client", metadata);
174190

175-
let mut body = doc! {
176-
"isMaster": 1,
177-
"client": metadata,
178-
};
191+
let mut command = Command::new_read(
192+
"isMaster".to_string(),
193+
db.unwrap_or_else(|| "admin".to_string()),
194+
None,
195+
body,
196+
);
179197

180-
if let Some(credential) = options.as_ref().and_then(|opts| opts.credential.as_ref()) {
181-
credential.append_needed_mechanism_negotiation(&mut body);
182-
db = credential.resolved_source();
198+
if let Some(server_api) = server_api {
199+
command.set_server_api(&server_api)
183200
}
184201

185-
let mut command = Command::new_read("isMaster".to_string(), db.to_string(), None, body);
186-
if let Some(server_api) = options.as_ref().and_then(|opts| opts.server_api.as_ref()) {
187-
command.set_server_api(server_api)
202+
Self {
203+
command,
204+
credential,
188205
}
189-
190-
Self { command }
191206
}
192207

193208
/// Handshakes a connection.
194-
pub(super) async fn handshake(
195-
&self,
196-
conn: &mut Connection,
197-
credential: Option<&Credential>,
198-
) -> Result<Option<FirstRound>> {
209+
pub(crate) async fn handshake(&self, conn: &mut Connection) -> Result<HandshakeResult> {
199210
let mut command = self.command.clone();
200211

201-
let client_first = set_speculative_auth_info(&mut command.body, credential)?;
202-
203-
let start_time = PreciseTime::now();
204-
let response = conn.send_command(command, None).await?;
205-
let end_time = PreciseTime::now();
212+
let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;
206213

207-
response.validate()?;
208-
let mut command_response: IsMasterCommandResponse = response.body()?;
214+
let mut is_master_reply = is_master(command, conn).await?;
215+
conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply.clone()));
209216

210217
// Record the client's message and the server's response from speculative authentication if
211218
// the server did send a response.
212219
let first_round = client_first.and_then(|client_first| {
213-
command_response
220+
is_master_reply
221+
.command_response
214222
.speculative_authenticate
215223
.take()
216224
.map(|server_first| client_first.into_first_round(server_first))
217225
});
218226

219-
let is_master_reply = IsMasterReply {
220-
command_response,
221-
round_trip_time: Some(start_time.to(end_time).to_std().unwrap()),
222-
cluster_time: None,
223-
};
227+
Ok(HandshakeResult {
228+
first_round,
229+
is_master_reply,
230+
})
231+
}
232+
}
233+
234+
/// The information returned from the server as part of the handshake.
235+
///
236+
/// Also optionally includes the first round of speculative authentication
237+
/// if applicable.
238+
#[derive(Debug)]
239+
pub(crate) struct HandshakeResult {
240+
/// The response from the server.
241+
pub(crate) is_master_reply: IsMasterReply,
242+
243+
/// The first round of speculative authentication, if applicable.
244+
pub(crate) first_round: Option<FirstRound>,
245+
}
246+
247+
#[derive(Debug)]
248+
pub(crate) struct HandshakerOptions {
249+
app_name: Option<String>,
250+
credential: Option<Credential>,
251+
driver_info: Option<DriverInfo>,
252+
server_api: Option<ServerApi>,
253+
}
254+
255+
impl From<ConnectionPoolOptions> for HandshakerOptions {
256+
fn from(options: ConnectionPoolOptions) -> Self {
257+
Self {
258+
app_name: options.app_name,
259+
credential: options.credential,
260+
driver_info: options.driver_info,
261+
server_api: options.server_api,
262+
}
263+
}
264+
}
265+
266+
impl From<ClientOptions> for HandshakerOptions {
267+
fn from(options: ClientOptions) -> Self {
268+
Self {
269+
app_name: options.app_name,
270+
credential: options.credential,
271+
driver_info: options.driver_info,
272+
server_api: options.server_api,
273+
}
274+
}
275+
}
224276

225-
conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply));
226-
Ok(first_round)
277+
/// Run the given isMaster command.
278+
///
279+
/// If the given command is not an isMaster, this function will return an error.
280+
pub(crate) async fn is_master(command: Command, conn: &mut Connection) -> Result<IsMasterReply> {
281+
if !command.name.eq_ignore_ascii_case("ismaster") {
282+
return Err(ErrorKind::OperationError {
283+
message: format!("invalid ismaster command: {}", command.name),
284+
}
285+
.into());
227286
}
287+
let start_time = PreciseTime::now();
288+
let response = conn.send_command(command, None).await?;
289+
let end_time = PreciseTime::now();
290+
291+
response.validate()?;
292+
let cluster_time = response.cluster_time().cloned();
293+
let command_response: IsMasterCommandResponse = response.body()?;
294+
295+
Ok(IsMasterReply {
296+
command_response,
297+
round_trip_time: Some(start_time.to(end_time).to_std().unwrap()),
298+
cluster_time,
299+
})
228300
}
229301

230302
/// Updates the handshake command document with the speculative authenitication info.

src/cmap/establish/handshake/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn metadata_with_options() {
3434
)
3535
.build();
3636

37-
let handshaker = Handshaker::new(Some(&options));
37+
let handshaker = Handshaker::new(Some(options.into()));
3838

3939
let metadata = handshaker.command.body.get_document("client").unwrap();
4040
assert_eq!(

src/cmap/establish/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod handshake;
1+
pub(super) mod handshake;
22
#[cfg(test)]
33
mod test;
44

@@ -24,7 +24,7 @@ pub(super) struct ConnectionEstablisher {
2424
impl ConnectionEstablisher {
2525
/// Creates a new ConnectionEstablisher from the given options.
2626
pub(super) fn new(http_client: HttpClient, options: Option<&ConnectionPoolOptions>) -> Self {
27-
let handshaker = Handshaker::new(options);
27+
let handshaker = Handshaker::new(options.cloned().map(Into::into));
2828

2929
Self {
3030
handshaker,
@@ -43,8 +43,9 @@ impl ConnectionEstablisher {
4343

4444
let first_round = self
4545
.handshaker
46-
.handshake(&mut connection, self.credential.as_ref())
47-
.await?;
46+
.handshake(&mut connection)
47+
.await?
48+
.first_round;
4849

4950
if let Some(ref credential) = self.credential {
5051
credential

src/cmap/establish/test.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,13 @@ async fn speculative_auth_test(
3939
.tls_options(CLIENT_OPTIONS.tls_options())
4040
.build();
4141

42-
let handshaker = Handshaker::new(Some(&pool_options));
42+
let handshaker = Handshaker::new(Some(pool_options.clone().into()));
4343

4444
let mut conn = Connection::new_testing(1, Default::default(), 1, Some(pool_options.into()))
4545
.await
4646
.unwrap();
4747

48-
let first_round = handshaker
49-
.handshake(&mut conn, Some(&credential))
50-
.await
51-
.unwrap();
48+
let first_round = handshaker.handshake(&mut conn).await.unwrap().first_round;
5249

5350
// We expect that the server will return a response with the `speculativeAuthenticate` field if
5451
// and only if it's new enough to support it.

src/cmap/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ use std::{sync::Arc, time::Duration};
1313
use derivative::Derivative;
1414

1515
pub use self::conn::ConnectionInfo;
16-
pub(crate) use self::conn::{Command, CommandResponse, Connection, StreamDescription};
1716
use self::options::ConnectionPoolOptions;
17+
pub(crate) use self::{
18+
conn::{Command, CommandResponse, Connection, StreamDescription},
19+
establish::handshake::{is_master, Handshaker},
20+
};
1821
use crate::{
1922
error::{ErrorKind, Result},
2023
event::cmap::{

0 commit comments

Comments
 (0)