Skip to content

Commit d66a2dc

Browse files
shsaskinpatrickfreed
authored andcommitted
RUST-185: Implement command monitoring (#57)
1 parent ed3d87a commit d66a2dc

File tree

44 files changed

+859
-283
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+859
-283
lines changed

src/bson_util.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::Duration;
22

33
use bson::{bson, doc, oid::ObjectId, Bson, Document};
4-
use serde::{ser, Serialize, Serializer};
4+
use serde::{ser, Deserialize, Deserializer, Serialize, Serializer};
55

66
use crate::error::{ErrorKind, Result};
77

@@ -69,6 +69,16 @@ pub(crate) fn serialize_duration_as_i64_millis<S: Serializer>(
6969
}
7070
}
7171

72+
pub(crate) fn deserialize_duration_from_u64_millis<'de, D>(
73+
deserializer: D,
74+
) -> std::result::Result<Option<Duration>, D::Error>
75+
where
76+
D: Deserializer<'de>,
77+
{
78+
let millis = Option::<u64>::deserialize(deserializer)?;
79+
Ok(millis.map(Duration::from_millis))
80+
}
81+
7282
#[allow(clippy::trivially_copy_pass_by_ref)]
7383
pub(crate) fn serialize_u32_as_i32<S: Serializer>(
7484
val: &Option<u32>,

src/client/auth/scram.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl ScramVersion {
9898
client_first.to_command(self),
9999
);
100100

101-
let server_first_response = conn.send_command(command)?;
101+
let server_first_response = conn.send_command(command, None)?;
102102
let server_first = ServerFirst::parse(server_first_response.raw_response)?;
103103
server_first.validate(nonce.as_str())?;
104104

@@ -135,7 +135,7 @@ impl ScramVersion {
135135
client_final.to_command(),
136136
);
137137

138-
let server_final_response = conn.send_command(command)?;
138+
let server_final_response = conn.send_command(command, None)?;
139139
let server_final = ServerFinal::parse(server_final_response.raw_response)?;
140140
server_final.validate(salted_password.as_slice(), &client_final, self)?;
141141

@@ -149,7 +149,7 @@ impl ScramVersion {
149149
};
150150
let command = Command::new("saslContinue".into(), source.into(), noop);
151151

152-
let server_noop_response = conn.send_command(command)?;
152+
let server_noop_response = conn.send_command(command, None)?;
153153

154154
if server_noop_response
155155
.raw_response

src/client/executor.rs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,35 @@
11
use super::Client;
2+
3+
use std::collections::HashSet;
4+
5+
use bson::Document;
6+
use lazy_static::lazy_static;
7+
use time::PreciseTime;
8+
29
use crate::{
310
cmap::Connection,
411
error::Result,
12+
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
513
operation::Operation,
614
sdam::{update_topology, ServerDescription},
715
};
816

17+
lazy_static! {
18+
static ref REDACTED_COMMANDS: HashSet<&'static str> = {
19+
let mut hash_set = HashSet::new();
20+
hash_set.insert("authenticate");
21+
hash_set.insert("saslstart");
22+
hash_set.insert("saslcontinue");
23+
hash_set.insert("getnonce");
24+
hash_set.insert("createuser");
25+
hash_set.insert("updateuser");
26+
hash_set.insert("copydbgetnonce");
27+
hash_set.insert("copydbsaslstart");
28+
hash_set.insert("copydb");
29+
hash_set
30+
};
31+
}
32+
933
impl Client {
1034
/// Executes an operation and returns the connection used to do so along with the result of the
1135
/// operation. This will be used primarily for the opening of exhaust cursors.
@@ -83,13 +107,75 @@ impl Client {
83107
connection: &mut Connection,
84108
) -> Result<T::O> {
85109
let mut cmd = op.build(connection.stream_description()?)?;
86-
87110
self.topology()
88111
.read()
89112
.unwrap()
90113
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria());
91114

92-
let response = connection.send_command(cmd)?;
93-
op.handle_response(response)
115+
let connection_info = connection.info();
116+
let request_id = crate::cmap::conn::next_request_id();
117+
118+
let should_redact = REDACTED_COMMANDS.contains(cmd.name.to_lowercase().as_str());
119+
120+
let command_body = if should_redact {
121+
Document::new()
122+
} else {
123+
cmd.body.clone()
124+
};
125+
let command_started_event = CommandStartedEvent {
126+
command: command_body,
127+
db: cmd.target_db.clone(),
128+
command_name: cmd.name.clone(),
129+
request_id,
130+
connection: connection_info.clone(),
131+
};
132+
133+
self.send_command_started_event(command_started_event);
134+
135+
let start_time = PreciseTime::now();
136+
137+
let response_result =
138+
connection
139+
.send_command(cmd.clone(), request_id)
140+
.and_then(|response| {
141+
if !op.handles_command_errors() {
142+
response.validate()?;
143+
}
144+
Ok(response)
145+
});
146+
147+
let end_time = PreciseTime::now();
148+
let duration = start_time.to(end_time).to_std()?;
149+
150+
match response_result {
151+
Err(error) => {
152+
let command_failed_event = CommandFailedEvent {
153+
duration,
154+
command_name: cmd.name.clone(),
155+
failure: error.clone(),
156+
request_id,
157+
connection: connection_info,
158+
};
159+
self.send_command_failed_event(command_failed_event);
160+
Err(error)
161+
}
162+
Ok(response) => {
163+
let reply = if should_redact {
164+
Document::new()
165+
} else {
166+
response.raw_response.clone()
167+
};
168+
169+
let command_succeeded_event = CommandSucceededEvent {
170+
duration,
171+
reply,
172+
command_name: cmd.name.clone(),
173+
request_id,
174+
connection: connection_info,
175+
};
176+
self.send_command_succeeded_event(command_succeeded_event);
177+
op.handle_response(response)
178+
}
179+
}
94180
}
95181
}

src/cmap/conn/command.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,7 @@ impl CommandResponse {
104104
}
105105

106106
/// Deserialize the body of the response.
107-
/// If this response corresponds to a command failure, an appropriate CommandError result will
108-
/// be returned.
109107
pub(crate) fn body<T: DeserializeOwned>(&self) -> Result<T> {
110-
self.validate()?;
111108
match bson::from_bson(Bson::Document(self.raw_response.clone())) {
112109
Ok(body) => Ok(body),
113110
Err(e) => Err(ErrorKind::ResponseError {

src/cmap/conn/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
};
2828
pub(crate) use command::{Command, CommandResponse};
2929
pub(crate) use stream_description::StreamDescription;
30+
pub(crate) use wire::next_request_id;
3031

3132
/// User-facing information about a connection to the database.
3233
#[derive(Clone, Debug)]
@@ -88,6 +89,13 @@ impl Connection {
8889
Ok(conn)
8990
}
9091

92+
pub(crate) fn info(&self) -> ConnectionInfo {
93+
ConnectionInfo {
94+
id: self.id,
95+
address: self.address.clone(),
96+
}
97+
}
98+
9199
pub(crate) fn address(&self) -> &StreamAddress {
92100
&self.address
93101
}
@@ -189,8 +197,12 @@ impl Connection {
189197
/// An `Ok(...)` result simply means the server received the command and that the driver
190198
/// driver received the response; it does not imply anything about the success of the command
191199
/// itself.
192-
pub(crate) fn send_command(&mut self, command: Command) -> Result<CommandResponse> {
193-
let message = Message::from_command(command);
200+
pub(crate) fn send_command(
201+
&mut self,
202+
command: Command,
203+
request_id: impl Into<Option<i32>>,
204+
) -> Result<CommandResponse> {
205+
let message = Message::with_command(command, request_id.into());
194206
message.write_to(&mut self.stream)?;
195207

196208
let response_message = Message::read_from(&mut self.stream)?;

src/cmap/conn/wire/message.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ pub(crate) struct Message {
2020
pub(crate) flags: MessageFlags,
2121
pub(crate) sections: Vec<MessageSection>,
2222
pub(crate) checksum: Option<u32>,
23+
pub(crate) request_id: Option<i32>,
2324
}
2425

2526
impl Message {
2627
/// Creates a `Message` from a given `Command`.
2728
///
2829
/// Note that `response_to` will need to be set manually.
29-
pub(crate) fn from_command(mut command: Command) -> Self {
30+
pub(crate) fn with_command(mut command: Command, request_id: Option<i32>) -> Self {
3031
command.body.insert("$db", command.target_db);
3132

3233
if let Some(read_pref) = command.read_pref {
@@ -40,6 +41,7 @@ impl Message {
4041
flags: MessageFlags::empty(),
4142
sections: vec![MessageSection::Document(command.body)],
4243
checksum: None,
44+
request_id,
4345
}
4446
}
4547

@@ -110,6 +112,7 @@ impl Message {
110112
flags,
111113
sections,
112114
checksum,
115+
request_id: None,
113116
})
114117
}
115118

@@ -132,7 +135,7 @@ impl Message {
132135

133136
let header = Header {
134137
length: total_length as i32,
135-
request_id: super::util::next_request_id(),
138+
request_id: self.request_id.unwrap_or_else(super::util::next_request_id),
136139
response_to: self.response_to,
137140
op_code: OpCode::Message,
138141
};

src/cmap/conn/wire/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ mod message;
44
mod test;
55
mod util;
66

7-
pub(super) use message::Message;
7+
pub(crate) use self::{message::Message, util::next_request_id};

src/cmap/conn/wire/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ fn basic() {
2323
doc! { "isMaster": 1, "$db": "admin" },
2424
)],
2525
checksum: None,
26+
request_id: None,
2627
};
2728

2829
let StreamAddress { ref hostname, port } = CLIENT_OPTIONS.hosts[0];

src/cmap/conn/wire/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::error::Result;
88
use lazy_static::lazy_static;
99

1010
/// Closure to obtain a new, unique request ID.
11-
pub(super) fn next_request_id() -> i32 {
11+
pub(crate) fn next_request_id() -> i32 {
1212
lazy_static! {
1313
static ref REQUEST_ID: AtomicI32 = AtomicI32::new(0);
1414
}

src/cmap/establish/handshake.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl Handshaker {
8383

8484
/// Handshakes a connection.
8585
pub(super) fn handshake(&self, conn: &mut Connection) -> Result<()> {
86-
let response = conn.send_command(self.command.clone())?;
86+
let response = conn.send_command(self.command.clone(), None)?;
8787
let command_response = response.body()?;
8888

8989
// TODO RUST-192: Calculate round trip time.

0 commit comments

Comments
 (0)