Skip to content

Commit e619a0c

Browse files
authored
RUST-1510 Implement connection pool tracing messages (#766)
1 parent 6e3d43d commit e619a0c

File tree

102 files changed

+2320
-527
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+2320
-527
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ bitflags = "1.1.0"
8282
bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" }
8383
chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] }
8484
derivative = "2.1.1"
85+
derive_more = "0.99.17"
8586
flate2 = { version = "1.0", optional = true }
8687
futures-io = "0.3.21"
8788
futures-core = "0.3.14"
@@ -169,7 +170,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
169170
approx = "0.5.1"
170171
async_once = "0.2.6"
171172
ctrlc = "3.2.2"
172-
derive_more = "0.99.13"
173173
function_name = "0.2.1"
174174
futures = "0.3"
175175
hex = "0.4"

src/change_stream/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ where
180180
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
181181
self.cursor.current_batch()
182182
}
183+
184+
#[cfg(test)]
185+
pub(crate) fn client(&self) -> &crate::Client {
186+
self.cursor.client()
187+
}
183188
}
184189

185190
/// Arguments passed to a `watch` method, captured to allow resume.

src/client/options/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub use resolver_config::ResolverConfig;
4646
#[cfg(feature = "csfle")]
4747
pub use crate::client::csfle::options::AutoEncryptionOptions;
4848

49-
const DEFAULT_PORT: u16 = 27017;
49+
pub(crate) const DEFAULT_PORT: u16 = 27017;
5050

5151
const URI_OPTIONS: &[&str] = &[
5252
"appname",

src/cmap/conn/mod.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
compression::Compressor,
2323
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
2424
event::cmap::{
25-
CmapEventHandler,
25+
CmapEventEmitter,
2626
ConnectionCheckedInEvent,
2727
ConnectionCheckedOutEvent,
2828
ConnectionClosedEvent,
@@ -81,10 +81,10 @@ pub(crate) struct Connection {
8181
/// been read.
8282
command_executing: bool,
8383

84-
/// Whether or not this connection has experienced a network error while reading or writing.
85-
/// Once the connection has received an error, it should not be used again or checked back
86-
/// into a pool.
87-
error: bool,
84+
/// Stores a network error encountered while reading or writing. Once the connection has
85+
/// received an error, it should not be used again and will be closed upon check-in to the
86+
/// pool.
87+
error: Option<Error>,
8888

8989
/// Whether the most recently received message included the moreToCome flag, indicating the
9090
/// server may send more responses without any additional requests. Attempting to send new
@@ -106,8 +106,10 @@ pub(crate) struct Connection {
106106
/// connection to the pin holder.
107107
pinned_sender: Option<mpsc::Sender<Connection>>,
108108

109+
/// Type responsible for emitting events related to this connection. This is None for
110+
/// monitoring connections as we do not emit events for those.
109111
#[derivative(Debug = "ignore")]
110-
handler: Option<Arc<dyn CmapEventHandler>>,
112+
event_emitter: Option<CmapEventEmitter>,
111113
}
112114

113115
impl Connection {
@@ -126,9 +128,9 @@ impl Connection {
126128
ready_and_available_time: None,
127129
stream: BufStream::new(stream),
128130
address,
129-
handler: None,
131+
event_emitter: None,
130132
stream_description: None,
131-
error: false,
133+
error: None,
132134
pinned_sender: None,
133135
compressor: None,
134136
more_to_come: false,
@@ -149,7 +151,7 @@ impl Connection {
149151
pending_connection.id,
150152
generation,
151153
);
152-
conn.handler = pending_connection.event_handler;
154+
conn.event_emitter = Some(pending_connection.event_emitter);
153155
conn
154156
}
155157

@@ -211,7 +213,7 @@ impl Connection {
211213

212214
/// Checks if the connection experienced a network error and should be closed.
213215
pub(super) fn has_errored(&self) -> bool {
214-
self.error
216+
self.error.is_some()
215217
}
216218

217219
/// Helper to create a `ConnectionCheckedOutEvent` for the connection.
@@ -244,6 +246,8 @@ impl Connection {
244246
address: self.address.clone(),
245247
connection_id: self.id,
246248
reason,
249+
#[cfg(feature = "tracing-unstable")]
250+
error: self.error.clone(),
247251
}
248252
}
249253

@@ -272,7 +276,9 @@ impl Connection {
272276
_ => message.write_to(&mut self.stream).await,
273277
};
274278

275-
self.error = write_result.is_err();
279+
if let Err(ref err) = write_result {
280+
self.error = Some(err.clone());
281+
}
276282
write_result?;
277283

278284
let response_message_result = Message::read_from(
@@ -283,7 +289,9 @@ impl Connection {
283289
)
284290
.await;
285291
self.command_executing = false;
286-
self.error = response_message_result.is_err();
292+
if let Err(ref err) = response_message_result {
293+
self.error = Some(err.clone());
294+
}
287295

288296
let response_message = response_message_result?;
289297
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
@@ -342,7 +350,9 @@ impl Connection {
342350
)
343351
.await;
344352
self.command_executing = false;
345-
self.error = response_message_result.is_err();
353+
if let Err(ref err) = response_message_result {
354+
self.error = Some(err.clone());
355+
}
346356

347357
let response_message = response_message_result?;
348358
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
@@ -390,8 +400,8 @@ impl Connection {
390400
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
391401
fn close(&mut self, reason: ConnectionClosedReason) {
392402
self.pool_manager.take();
393-
if let Some(ref handler) = self.handler {
394-
handler.handle_connection_closed_event(self.closed_event(reason));
403+
if let Some(ref event_emitter) = self.event_emitter {
404+
event_emitter.emit_event(|| self.closed_event(reason).into());
395405
}
396406
}
397407

@@ -404,10 +414,10 @@ impl Connection {
404414
address: self.address.clone(),
405415
generation: self.generation,
406416
stream: std::mem::replace(&mut self.stream, BufStream::new(AsyncStream::Null)),
407-
handler: self.handler.take(),
417+
event_emitter: self.event_emitter.take(),
408418
stream_description: self.stream_description.take(),
409419
command_executing: self.command_executing,
410-
error: self.error,
420+
error: self.error.take(),
411421
pool_manager: None,
412422
ready_and_available_time: None,
413423
pinned_sender: self.pinned_sender.clone(),
@@ -571,7 +581,7 @@ pub(crate) struct PendingConnection {
571581
pub(crate) id: u32,
572582
pub(crate) address: ServerAddress,
573583
pub(crate) generation: PoolGeneration,
574-
pub(crate) event_handler: Option<Arc<dyn CmapEventHandler>>,
584+
pub(crate) event_emitter: CmapEventEmitter,
575585
}
576586

577587
impl PendingConnection {

src/cmap/mod.rs

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ pub(crate) mod options;
99
mod status;
1010
mod worker;
1111

12-
use std::sync::Arc;
13-
1412
use derivative::Derivative;
1513
#[cfg(test)]
1614
use tokio::sync::oneshot;
@@ -30,7 +28,8 @@ use crate::{
3028
bson::oid::ObjectId,
3129
error::{Error, Result},
3230
event::cmap::{
33-
CmapEventHandler,
31+
CmapEvent,
32+
CmapEventEmitter,
3433
ConnectionCheckoutFailedEvent,
3534
ConnectionCheckoutFailedReason,
3635
ConnectionCheckoutStartedEvent,
@@ -60,40 +59,44 @@ pub(crate) struct ConnectionPool {
6059
generation_subscriber: PoolGenerationSubscriber,
6160

6261
#[derivative(Debug = "ignore")]
63-
event_handler: Option<Arc<dyn CmapEventHandler>>,
62+
event_emitter: CmapEventEmitter,
6463
}
6564

6665
impl ConnectionPool {
6766
pub(crate) fn new(
6867
address: ServerAddress,
6968
connection_establisher: ConnectionEstablisher,
7069
server_updater: TopologyUpdater,
70+
topology_id: ObjectId,
7171
options: Option<ConnectionPoolOptions>,
7272
) -> Self {
73+
let event_handler = options
74+
.as_ref()
75+
.and_then(|opts| opts.cmap_event_handler.clone());
76+
77+
let event_emitter = CmapEventEmitter::new(event_handler, topology_id);
78+
7379
let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
7480
address.clone(),
7581
connection_establisher,
7682
server_updater,
83+
event_emitter.clone(),
7784
options.clone(),
7885
);
7986

80-
let event_handler = options
81-
.as_ref()
82-
.and_then(|opts| opts.cmap_event_handler.clone());
83-
84-
if let Some(ref handler) = event_handler {
85-
handler.handle_pool_created_event(PoolCreatedEvent {
87+
event_emitter.emit_event(|| {
88+
CmapEvent::PoolCreated(PoolCreatedEvent {
8689
address: address.clone(),
8790
options: options.map(|o| o.to_event_options()),
88-
});
89-
};
91+
})
92+
});
9093

9194
Self {
9295
address,
9396
manager,
9497
connection_requester,
9598
generation_subscriber,
96-
event_handler,
99+
event_emitter,
97100
}
98101
}
99102

@@ -109,29 +112,19 @@ impl ConnectionPool {
109112
manager,
110113
connection_requester,
111114
generation_subscriber,
112-
event_handler: None,
113-
}
114-
}
115-
116-
fn emit_event<F>(&self, emit: F)
117-
where
118-
F: FnOnce(&Arc<dyn CmapEventHandler>),
119-
{
120-
if let Some(ref handler) = self.event_handler {
121-
emit(handler);
115+
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
122116
}
123117
}
124118

125119
/// Checks out a connection from the pool. This method will yield until this thread is at the
126120
/// front of the wait queue, and then will block again if no available connections are in the
127121
/// pool and the total number of connections is not less than the max pool size.
128122
pub(crate) async fn check_out(&self) -> Result<Connection> {
129-
self.emit_event(|handler| {
130-
let event = ConnectionCheckoutStartedEvent {
123+
self.event_emitter.emit_event(|| {
124+
ConnectionCheckoutStartedEvent {
131125
address: self.address.clone(),
132-
};
133-
134-
handler.handle_connection_checkout_started_event(event);
126+
}
127+
.into()
135128
});
136129

137130
let response = self.connection_requester.request().await;
@@ -146,16 +139,28 @@ impl ConnectionPool {
146139

147140
match conn {
148141
Ok(ref conn) => {
149-
self.emit_event(|handler| {
150-
handler.handle_connection_checked_out_event(conn.checked_out_event());
142+
self.event_emitter
143+
.emit_event(|| conn.checked_out_event().into());
144+
}
145+
#[cfg(feature = "tracing-unstable")]
146+
Err(ref err) => {
147+
self.event_emitter.emit_event(|| {
148+
ConnectionCheckoutFailedEvent {
149+
address: self.address.clone(),
150+
reason: ConnectionCheckoutFailedReason::ConnectionError,
151+
error: Some(err.clone()),
152+
}
153+
.into()
151154
});
152155
}
156+
#[cfg(not(feature = "tracing-unstable"))]
153157
Err(_) => {
154-
self.emit_event(|handler| {
155-
handler.handle_connection_checkout_failed_event(ConnectionCheckoutFailedEvent {
158+
self.event_emitter.emit_event(|| {
159+
ConnectionCheckoutFailedEvent {
156160
address: self.address.clone(),
157161
reason: ConnectionCheckoutFailedReason::ConnectionError,
158-
})
162+
}
163+
.into()
159164
});
160165
}
161166
}

0 commit comments

Comments
 (0)