Skip to content

Commit d6689bb

Browse files
RUST-1405 Expose full server response in Error (#1474)
1 parent 3f43723 commit d6689bb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+231
-103
lines changed

src/bson_compat.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub(crate) trait RawDocumentBufExt: Sized {
3636
key: impl AsRef<CStr>,
3737
value: impl Into<crate::bson::raw::RawBsonRef<'a>> + 'a,
3838
);
39+
40+
fn to_document(&self) -> crate::error::Result<crate::bson::Document>;
3941
}
4042

4143
#[cfg(feature = "bson-3")]
@@ -47,6 +49,10 @@ impl RawDocumentBufExt for crate::bson::RawDocumentBuf {
4749
) {
4850
self.append(key, value);
4951
}
52+
53+
fn to_document(&self) -> crate::error::Result<crate::bson::Document> {
54+
self.try_into().map_err(Into::into)
55+
}
5056
}
5157

5258
#[cfg(feature = "bson-3")]

src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ impl Client {
496496
operation_name,
497497
start_time,
498498
timeout,
499+
self.options().tracing_max_document_length_bytes,
499500
);
500501
#[cfg(feature = "tracing-unstable")]
501502
event_emitter.emit_started_event(self.inner.topology.latest().description.clone());

src/client/csfle/state_machine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tokio::{
1717
use crate::{
1818
client::{csfle::options::KmsProvidersTlsOptions, options::ServerAddress, WeakClient},
1919
error::{Error, Result},
20-
operation::{run_command::RunCommand, RawOutput},
20+
operation::{raw_output::RawOutput, run_command::RunCommand},
2121
options::ReadConcern,
2222
runtime::{process::Process, AsyncStream, TlsConfig},
2323
Client,

src/client/executor.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,13 @@ impl Client {
532532
let start_time = Instant::now();
533533
let command_result = match connection.send_message(message).await {
534534
Ok(response) => {
535-
let is_sharded =
536-
connection.stream_description()?.initial_server_type == ServerType::Mongos;
537-
self.parse_response(op, session, is_sharded, response).await
535+
match self
536+
.parse_response(op, session, connection.is_sharded()?, &response)
537+
.await
538+
{
539+
Ok(()) => Ok(response),
540+
Err(error) => Err(error.with_server_response(&response)),
541+
}
538542
}
539543
Err(err) => Err(err),
540544
};
@@ -612,15 +616,15 @@ impl Client {
612616
effective_criteria: effective_criteria.clone(),
613617
};
614618

615-
match op.handle_response(response, context).await {
619+
match op.handle_response(&response, context).await {
616620
Ok(response) => Ok(response),
617621
Err(mut err) => {
618622
err.add_labels_and_update_pin(
619623
Some(connection.stream_description()?),
620624
session,
621625
Some(retryability),
622626
);
623-
Err(err)
627+
Err(err.with_server_response(&response))
624628
}
625629
}
626630
}
@@ -820,8 +824,8 @@ impl Client {
820824
op: &T,
821825
session: &mut Option<&mut ClientSession>,
822826
is_sharded: bool,
823-
response: RawCommandResponse,
824-
) -> Result<RawCommandResponse> {
827+
response: &RawCommandResponse,
828+
) -> Result<()> {
825829
let raw_doc = RawDocument::from_bytes(response.as_bytes())?;
826830

827831
let ok = match raw_doc.get("ok")? {
@@ -870,7 +874,7 @@ impl Client {
870874
}
871875
}
872876

873-
Ok(response)
877+
Ok(())
874878
} else {
875879
Err(response
876880
.body::<CommandErrorBody>()

src/cmap.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,23 @@ impl ConnectionPool {
6969
address: ServerAddress,
7070
connection_establisher: ConnectionEstablisher,
7171
server_updater: TopologyUpdater,
72-
topology_id: ObjectId,
7372
options: Option<ConnectionPoolOptions>,
73+
#[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
7474
) -> Self {
7575
let event_handler = options
7676
.as_ref()
7777
.and_then(|opts| opts.cmap_event_handler.clone());
7878

79-
let event_emitter = CmapEventEmitter::new(event_handler, topology_id);
79+
#[cfg(feature = "tracing-unstable")]
80+
let event_emitter = CmapEventEmitter::new(
81+
event_handler,
82+
topology_id,
83+
options
84+
.as_ref()
85+
.and_then(|options| options.max_document_length_bytes),
86+
);
87+
#[cfg(not(feature = "tracing-unstable"))]
88+
let event_emitter = CmapEventEmitter::new(event_handler);
8089

8190
let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
8291
address.clone(),
@@ -114,7 +123,10 @@ impl ConnectionPool {
114123
manager,
115124
connection_requester,
116125
generation_subscriber,
117-
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
126+
#[cfg(feature = "tracing-unstable")]
127+
event_emitter: CmapEventEmitter::new(None, ObjectId::new(), None),
128+
#[cfg(not(feature = "tracing-unstable"))]
129+
event_emitter: CmapEventEmitter::new(None),
118130
}
119131
}
120132

src/cmap/conn/pooled.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{
3030
ConnectionReadyEvent,
3131
},
3232
runtime::AsyncStream,
33+
ServerType,
3334
};
3435

3536
/// A wrapper around the [`Connection`] type that represents a connection within a connection pool.
@@ -205,6 +206,11 @@ impl PooledConnection {
205206
Instant::now().duration_since(available_time) >= max_idle_time
206207
}
207208

209+
/// Whether this connection is to a mongos.
210+
pub(crate) fn is_sharded(&self) -> Result<bool> {
211+
Ok(self.stream_description()?.initial_server_type == ServerType::Mongos)
212+
}
213+
208214
/// Nullifies the internal state of this connection and returns it in a new [PooledConnection]
209215
/// with the given state.
210216
fn take(&mut self, new_state: PooledConnectionState) -> Self {

src/cmap/options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ pub(crate) struct ConnectionPoolOptions {
7070
///
7171
/// The default is 2.
7272
pub(crate) max_connecting: Option<u32>,
73+
74+
/// The maximum length for documents in tracing events.
75+
#[cfg(feature = "tracing-unstable")]
76+
pub(crate) max_document_length_bytes: Option<usize>,
7377
}
7478

7579
impl ConnectionPoolOptions {
@@ -86,6 +90,8 @@ impl ConnectionPoolOptions {
8690
load_balanced: options.load_balanced,
8791
credential: options.credential.clone(),
8892
max_connecting: options.max_connecting,
93+
#[cfg(feature = "tracing-unstable")]
94+
max_document_length_bytes: options.tracing_max_document_length_bytes,
8995
}
9096
}
9197

src/cmap/test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ impl Executor {
165165
ConnectionEstablisher::new(EstablisherOptions::from(get_client_options().await))
166166
.unwrap(),
167167
updater,
168-
crate::bson::oid::ObjectId::new(),
169168
Some(self.pool_options),
169+
#[cfg(feature = "tracing-unstable")]
170+
crate::bson::oid::ObjectId::new(),
170171
);
171172

172173
// Mock a monitoring task responding to errors reported by the pool.

src/cmap/test/integration.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ async fn acquire_connection_and_send_command() {
4949
client_options.hosts[0].clone(),
5050
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
5151
TopologyUpdater::channel().0,
52-
crate::bson::oid::ObjectId::new(),
5352
Some(pool_options),
53+
#[cfg(feature = "tracing-unstable")]
54+
crate::bson::oid::ObjectId::new(),
5455
);
5556
let mut connection = pool.check_out().await.unwrap();
5657

@@ -116,8 +117,9 @@ async fn concurrent_connections() {
116117
get_client_options().await.hosts[0].clone(),
117118
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
118119
TopologyUpdater::channel().0,
119-
crate::bson::oid::ObjectId::new(),
120120
Some(options),
121+
#[cfg(feature = "tracing-unstable")]
122+
crate::bson::oid::ObjectId::new(),
121123
);
122124

123125
let tasks = (0..2).map(|_| {
@@ -191,8 +193,9 @@ async fn connection_error_during_establishment() {
191193
client_options.hosts[0].clone(),
192194
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
193195
TopologyUpdater::channel().0,
194-
crate::bson::oid::ObjectId::new(),
195196
Some(options),
197+
#[cfg(feature = "tracing-unstable")]
198+
crate::bson::oid::ObjectId::new(),
196199
);
197200

198201
pool.check_out().await.expect_err("check out should fail");

src/error.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use serde::{Deserialize, Serialize};
1313
use thiserror::Error;
1414

1515
use crate::{
16-
bson::{Bson, Document},
16+
bson::{doc, rawdoc, Bson, Document, RawDocumentBuf},
17+
cmap::RawCommandResponse,
1718
options::ServerAddress,
1819
sdam::{ServerType, TopologyVersion},
1920
};
@@ -52,20 +53,32 @@ pub type Result<T> = std::result::Result<T, Error>;
5253
#[derive(Clone, Debug, Error)]
5354
#[cfg_attr(
5455
test,
55-
error("Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}")
56+
error(
57+
"Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}, server response: \
58+
{server_response:?}"
59+
)
5660
)]
5761
#[cfg_attr(
5862
not(test),
59-
error("Kind: {kind}, labels: {labels:?}, source: {source:?}")
63+
error(
64+
"Kind: {kind}, labels: {labels:?}, source: {source:?}, server response: \
65+
{server_response:?}"
66+
)
6067
)]
6168
#[non_exhaustive]
6269
pub struct Error {
6370
/// The type of error that occurred.
6471
pub kind: Box<ErrorKind>,
72+
6573
labels: HashSet<String>,
74+
6675
pub(crate) wire_version: Option<i32>,
76+
6777
#[source]
6878
pub(crate) source: Option<Box<Error>>,
79+
80+
pub(crate) server_response: Option<Box<RawDocumentBuf>>,
81+
6982
#[cfg(test)]
7083
bt: Arc<std::backtrace::Backtrace>,
7184
}
@@ -99,6 +112,7 @@ impl Error {
99112
labels,
100113
wire_version: None,
101114
source: None,
115+
server_response: None,
102116
#[cfg(test)]
103117
bt: Arc::new(std::backtrace::Backtrace::capture()),
104118
}
@@ -288,6 +302,20 @@ impl Error {
288302
self.labels.insert(label);
289303
}
290304

305+
/// The full response returned from the server. This can be used to inspect error fields that
306+
/// are not represented in the `Error` type.
307+
pub fn server_response(&self) -> Option<&RawDocumentBuf> {
308+
self.server_response.as_deref()
309+
}
310+
311+
/// Adds the server's response to this error if it is not already present.
312+
pub(crate) fn with_server_response(mut self, response: &RawCommandResponse) -> Self {
313+
if self.server_response.is_none() {
314+
self.server_response = Some(Box::new(response.raw_body().to_owned()));
315+
}
316+
self
317+
}
318+
291319
#[cfg(feature = "dns-resolver")]
292320
pub(crate) fn from_resolve_error(error: hickory_resolver::error::ResolveError) -> Self {
293321
ErrorKind::DnsResolve {
@@ -496,6 +524,10 @@ impl Error {
496524
source.redact();
497525
}
498526

527+
if self.server_response.is_some() {
528+
self.server_response = Some(Box::new(rawdoc! { "redacted": true }));
529+
}
530+
499531
// This is intentionally written without a catch-all branch so that if new error
500532
// kinds are added we remember to reason about whether they need to be redacted.
501533
match *self.kind {

0 commit comments

Comments
 (0)