Skip to content

Commit e2d0f44

Browse files
authored
RUST-1608 Clean shutdown for Client (#920)
1 parent eb0fca1 commit e2d0f44

23 files changed

+699
-53
lines changed

.evergreen/check-cargo-deny.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/bin/bash
22

33
set -o errexit
4+
set -o xtrace
45

56
source ./.evergreen/env.sh
67

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ features = ["v4"]
180180
anyhow = { version = "1.0", features = ["backtrace"] }
181181
approx = "0.5.1"
182182
async_once = "0.2.6"
183+
backtrace = { version = "0.3.68" }
183184
ctrlc = "3.2.2"
184185
function_name = "0.2.1"
185186
futures = "0.3"

src/client.rs

Lines changed: 180 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@ pub mod options;
66
pub mod session;
77

88
use std::{
9-
sync::Arc,
9+
sync::{
10+
atomic::{AtomicBool, Ordering},
11+
Mutex as SyncMutex,
12+
},
1013
time::{Duration, Instant},
1114
};
1215

1316
#[cfg(feature = "in-use-encryption-unstable")]
1417
pub use self::csfle::client_builder::*;
1518
use derivative::Derivative;
19+
use futures_core::{future::BoxFuture, Future};
20+
use futures_util::{future::join_all, FutureExt};
1621

1722
#[cfg(test)]
1823
use crate::options::ServerAddress;
@@ -36,6 +41,7 @@ use crate::{
3641
db::Database,
3742
error::{Error, ErrorKind, Result},
3843
event::command::{handle_command_event, CommandEvent},
44+
id_set::IdSet,
3945
operation::{AggregateTarget, ListDatabases},
4046
options::{
4147
ClientOptions,
@@ -47,6 +53,7 @@ use crate::{
4753
},
4854
results::DatabaseSpecification,
4955
sdam::{server_selection, SelectedServer, Topology},
56+
tracking_arc::TrackingArc,
5057
ClientSession,
5158
};
5259

@@ -103,9 +110,25 @@ const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
103110
/// driver does not set ``tcp_keepalive_intvl``. See the
104111
/// [MongoDB Diagnostics FAQ keepalive section](https://www.mongodb.com/docs/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments)
105112
/// for instructions on setting these values at the system level.
106-
#[derive(Clone, Debug)]
113+
///
114+
/// ## Clean shutdown
115+
/// Because Rust has no async equivalent of `Drop`, values that require server-side cleanup when
116+
/// dropped spawn a new async task to perform that cleanup. This can cause two potential issues:
117+
///
118+
/// * Drop tasks pending or in progress when the async runtime shuts down may not complete, causing
119+
/// server-side resources to not be freed.
120+
/// * Drop tasks may run at an arbitrary time even after no `Client` values exist, making it hard to
121+
/// reason about associated resources (e.g. event handlers).
122+
///
123+
/// To address these issues, we highly recommend you use [`Client::shutdown`] in the termination
124+
/// path of your application. This will ensure that outstanding resources have been cleaned up and
125+
/// terminate internal worker tasks before returning. Please note that `shutdown` will wait for
126+
/// _all_ outstanding resource handles to be dropped, so they must either have been dropped before
127+
/// calling `shutdown` or in a concurrent task; see the documentation of `shutdown` for more
128+
/// details.
129+
#[derive(Debug, Clone)]
107130
pub struct Client {
108-
inner: Arc<ClientInner>,
131+
inner: TrackingArc<ClientInner>,
109132
}
110133

111134
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
@@ -124,10 +147,17 @@ struct ClientInner {
124147
topology: Topology,
125148
options: ClientOptions,
126149
session_pool: ServerSessionPool,
150+
shutdown: Shutdown,
127151
#[cfg(feature = "in-use-encryption-unstable")]
128152
csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
129153
}
130154

155+
#[derive(Debug)]
156+
struct Shutdown {
157+
pending_drops: SyncMutex<IdSet<crate::runtime::AsyncJoinHandle<()>>>,
158+
executed: AtomicBool,
159+
}
160+
131161
impl Client {
132162
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
133163
/// MongoDB connection string.
@@ -144,12 +174,16 @@ impl Client {
144174
pub fn with_options(options: ClientOptions) -> Result<Self> {
145175
options.validate()?;
146176

147-
let inner = Arc::new(ClientInner {
177+
let inner = TrackingArc::new(ClientInner {
148178
topology: Topology::new(options.clone())?,
149179
session_pool: ServerSessionPool::new(),
180+
options,
181+
shutdown: Shutdown {
182+
pending_drops: SyncMutex::new(IdSet::new()),
183+
executed: AtomicBool::new(false),
184+
},
150185
#[cfg(feature = "in-use-encryption-unstable")]
151186
csfle: Default::default(),
152-
options,
153187
});
154188
Ok(Self { inner })
155189
}
@@ -461,6 +495,123 @@ impl Client {
461495
.await
462496
}
463497

498+
pub(crate) fn register_async_drop(&self) -> AsyncDropToken {
499+
let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
500+
let (id_tx, id_rx) = tokio::sync::oneshot::channel::<crate::id_set::Id>();
501+
let weak = self.weak();
502+
let handle = crate::runtime::spawn(async move {
503+
// Unwrap safety: the id is sent immediately after task creation, with no
504+
// await points in between.
505+
let id = id_rx.await.unwrap();
506+
// If the cleanup channel is closed, that task was dropped.
507+
let cleanup = if let Ok(f) = cleanup_rx.await {
508+
f
509+
} else {
510+
return;
511+
};
512+
cleanup.await;
513+
if let Some(client) = weak.upgrade() {
514+
client
515+
.inner
516+
.shutdown
517+
.pending_drops
518+
.lock()
519+
.unwrap()
520+
.remove(&id);
521+
}
522+
});
523+
let id = self
524+
.inner
525+
.shutdown
526+
.pending_drops
527+
.lock()
528+
.unwrap()
529+
.insert(handle);
530+
let _ = id_tx.send(id);
531+
AsyncDropToken {
532+
tx: Some(cleanup_tx),
533+
}
534+
}
535+
536+
/// Shut down this `Client`, terminating background thread workers and closing connections.
537+
/// This will wait for any live handles to server-side resources (see below) to be
538+
/// dropped and any associated server-side operations to finish.
539+
///
540+
/// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait
541+
/// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by
542+
/// only using those types in shorter-lived scopes than the `Client`. If this is not possible,
543+
/// see [`shutdown_immediate`](Client::shutdown_immediate). For example:
544+
///
545+
/// ```rust
546+
/// # use mongodb::{Client, GridFsBucket, error::Result};
547+
/// async fn upload_data(bucket: &GridFsBucket) {
548+
/// let stream = bucket.open_upload_stream("test", None);
549+
/// // .. write to the stream ..
550+
/// }
551+
///
552+
/// # async fn run() -> Result<()> {
553+
/// let client = Client::with_uri_str("mongodb://example.com").await?;
554+
/// let bucket = client.database("test").gridfs_bucket(None);
555+
/// upload_data(&bucket).await;
556+
/// client.shutdown().await;
557+
/// // Background cleanup work from `upload_data` is guaranteed to have run.
558+
/// # Ok(())
559+
/// # }
560+
/// ```
561+
///
562+
/// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed:
563+
///
564+
/// ```rust
565+
/// # use mongodb::{Client, error::Result};
566+
/// # async fn run() -> Result<()> {
567+
/// let client = Client::with_uri_str("mongodb://example.com").await?;
568+
/// let bucket = client.database("test").gridfs_bucket(None);
569+
/// let stream = bucket.open_upload_stream("test", None);
570+
/// // .. write to the stream ..
571+
/// drop(stream);
572+
/// client.shutdown().await;
573+
/// // Background cleanup work for `stream` is guaranteed to have run.
574+
/// # Ok(())
575+
/// # }
576+
/// ```
577+
///
578+
/// Calling any methods on clones of this `Client` or derived handles after this will return
579+
/// errors.
580+
///
581+
/// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or
582+
/// `GridFsUploadStream`.
583+
pub async fn shutdown(self) {
584+
// Subtle bug: if this is inlined into the `join_all(..)` call, Rust will extend the
585+
// lifetime of the temporary unnamed `MutexLock` until the end of the *statement*,
586+
// causing the lock to be held for the duration of the join, which deadlocks.
587+
let pending = self.inner.shutdown.pending_drops.lock().unwrap().extract();
588+
join_all(pending).await;
589+
self.shutdown_immediate().await;
590+
}
591+
592+
/// Shut down this `Client`, terminating background thread workers and closing connections.
593+
/// This does *not* wait for other pending resources to be cleaned up, which may cause both
594+
/// client-side errors and server-side resource leaks. Calling any methods on clones of this
595+
/// `Client` or derived handles after this will return errors.
596+
///
597+
/// ```rust
598+
/// # use mongodb::{Client, error::Result};
599+
/// # async fn run() -> Result<()> {
600+
/// let client = Client::with_uri_str("mongodb://example.com").await?;
601+
/// let bucket = client.database("test").gridfs_bucket(None);
602+
/// let stream = bucket.open_upload_stream("test", None);
603+
/// // .. write to the stream ..
604+
/// client.shutdown_immediate().await;
605+
/// // Background cleanup work for `stream` may or may not have run.
606+
/// # Ok(())
607+
/// # }
608+
/// ```
609+
pub async fn shutdown_immediate(self) {
610+
self.inner.topology.shutdown().await;
611+
// This has to happen last to allow pending cleanup to execute commands.
612+
self.inner.shutdown.executed.store(true, Ordering::SeqCst);
613+
}
614+
464615
/// Check in a server session to the server session pool. The session will be discarded if it is
465616
/// expired or dirty.
466617
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
@@ -630,10 +781,9 @@ impl Client {
630781
}
631782
}
632783

633-
#[cfg(feature = "in-use-encryption-unstable")]
634784
pub(crate) fn weak(&self) -> WeakClient {
635785
WeakClient {
636-
inner: Arc::downgrade(&self.inner),
786+
inner: TrackingArc::downgrade(&self.inner),
637787
}
638788
}
639789

@@ -653,16 +803,35 @@ impl Client {
653803
}
654804
}
655805

656-
#[cfg(feature = "in-use-encryption-unstable")]
657806
#[derive(Clone, Debug)]
658807
pub(crate) struct WeakClient {
659-
inner: std::sync::Weak<ClientInner>,
808+
inner: crate::tracking_arc::Weak<ClientInner>,
660809
}
661810

662-
#[cfg(feature = "in-use-encryption-unstable")]
663811
impl WeakClient {
664-
#[allow(dead_code)]
665812
pub(crate) fn upgrade(&self) -> Option<Client> {
666813
self.inner.upgrade().map(|inner| Client { inner })
667814
}
668815
}
816+
817+
#[derive(Derivative)]
818+
#[derivative(Debug)]
819+
pub(crate) struct AsyncDropToken {
820+
#[derivative(Debug = "ignore")]
821+
tx: Option<tokio::sync::oneshot::Sender<BoxFuture<'static, ()>>>,
822+
}
823+
824+
impl AsyncDropToken {
825+
pub(crate) fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
826+
if let Some(tx) = self.tx.take() {
827+
let _ = tx.send(fut.boxed());
828+
} else {
829+
#[cfg(debug_assertions)]
830+
panic!("exhausted AsyncDropToken");
831+
}
832+
}
833+
834+
pub(crate) fn take(&mut self) -> Self {
835+
Self { tx: self.tx.take() }
836+
}
837+
}

src/client/executor.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ use futures_core::future::BoxFuture;
66
use lazy_static::lazy_static;
77
use serde::de::DeserializeOwned;
88

9-
use std::{collections::HashSet, sync::Arc, time::Instant};
9+
use std::{
10+
collections::HashSet,
11+
sync::{atomic::Ordering, Arc},
12+
time::Instant,
13+
};
1014

1115
use super::{session::TransactionState, Client, ClientSession};
1216
use crate::{
@@ -53,6 +57,7 @@ use crate::{
5357
options::{ChangeStreamOptions, SelectionCriteria},
5458
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
5559
selection_criteria::ReadPreference,
60+
tracking_arc::TrackingArc,
5661
ClusterTime,
5762
};
5863

@@ -99,6 +104,9 @@ impl Client {
99104
op: T,
100105
session: impl Into<Option<&mut ClientSession>>,
101106
) -> Result<ExecutionDetails<T>> {
107+
if self.inner.shutdown.executed.load(Ordering::SeqCst) {
108+
return Err(ErrorKind::Shutdown.into());
109+
}
102110
Box::pin(async {
103111
// TODO RUST-9: allow unacknowledged write concerns
104112
if !op.is_acknowledged() {
@@ -109,7 +117,7 @@ impl Client {
109117
}
110118
let session = session.into();
111119
if let Some(session) = &session {
112-
if !Arc::ptr_eq(&self.inner, &session.client().inner) {
120+
if !TrackingArc::ptr_eq(&self.inner, &session.client().inner) {
113121
return Err(ErrorKind::InvalidArgument {
114122
message: "the session provided to an operation must be created from the \
115123
same client as the collection/database"

src/client/session.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@ use crate::{
1818
error::{ErrorKind, Result},
1919
operation::{AbortTransaction, CommitTransaction, Operation},
2020
options::{SessionOptions, TransactionOptions},
21-
runtime,
2221
sdam::{ServerInfo, TransactionSupportStatus},
2322
selection_criteria::SelectionCriteria,
2423
Client,
2524
};
2625
pub use cluster_time::ClusterTime;
2726
pub(super) use pool::ServerSessionPool;
2827

29-
use super::options::ServerAddress;
28+
use super::{options::ServerAddress, AsyncDropToken};
3029

3130
lazy_static! {
3231
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
@@ -107,6 +106,7 @@ pub struct ClientSession {
107106
client: Client,
108107
is_implicit: bool,
109108
options: Option<SessionOptions>,
109+
drop_token: AsyncDropToken,
110110
pub(crate) transaction: Transaction,
111111
pub(crate) snapshot_time: Option<Timestamp>,
112112
pub(crate) operation_time: Option<Timestamp>,
@@ -212,6 +212,7 @@ impl ClientSession {
212212
let timeout = client.inner.topology.logical_session_timeout();
213213
let server_session = client.inner.session_pool.check_out(timeout).await;
214214
Self {
215+
drop_token: client.register_async_drop(),
215216
client,
216217
server_session,
217218
cluster_time: None,
@@ -694,6 +695,7 @@ impl From<DroppedClientSession> for ClientSession {
694695
Self {
695696
cluster_time: dropped_session.cluster_time,
696697
server_session: dropped_session.server_session,
698+
drop_token: dropped_session.client.register_async_drop(),
697699
client: dropped_session.client,
698700
is_implicit: dropped_session.is_implicit,
699701
options: dropped_session.options,
@@ -719,14 +721,14 @@ impl Drop for ClientSession {
719721
snapshot_time: self.snapshot_time,
720722
operation_time: self.operation_time,
721723
};
722-
runtime::execute(async move {
724+
self.drop_token.spawn(async move {
723725
let mut session: ClientSession = dropped_session.into();
724726
let _result = session.abort_transaction().await;
725727
});
726728
} else {
727729
let client = self.client.clone();
728730
let server_session = self.server_session.clone();
729-
runtime::execute(async move {
731+
self.drop_token.spawn(async move {
730732
client.check_in_server_session(server_session).await;
731733
});
732734
}

0 commit comments

Comments
 (0)