Skip to content

Commit e4863f0

Browse files
RUST-1437 Send endSessions on client shutdown (#1216)
1 parent 850e7b2 commit e4863f0

File tree

13 files changed

+254
-44
lines changed

13 files changed

+254
-44
lines changed

src/client.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ pub mod options;
77
pub mod session;
88

99
use std::{
10-
sync::{atomic::AtomicBool, Mutex as SyncMutex},
10+
sync::{
11+
atomic::{AtomicBool, Ordering},
12+
Mutex as SyncMutex,
13+
},
1114
time::{Duration, Instant},
1215
};
1316

@@ -26,13 +29,18 @@ use crate::trace::{
2629
COMMAND_TRACING_EVENT_TARGET,
2730
};
2831
use crate::{
32+
bson::doc,
2933
concern::{ReadConcern, WriteConcern},
3034
db::Database,
3135
error::{Error, ErrorKind, Result},
3236
event::command::CommandEvent,
3337
id_set::IdSet,
3438
options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
35-
sdam::{server_selection, SelectedServer, Topology},
39+
sdam::{
40+
server_selection::{self, attempt_to_select_server},
41+
SelectedServer,
42+
Topology,
43+
},
3644
tracking_arc::TrackingArc,
3745
BoxFuture,
3846
ClientSession,
@@ -123,6 +131,7 @@ struct ClientInner {
123131
options: ClientOptions,
124132
session_pool: ServerSessionPool,
125133
shutdown: Shutdown,
134+
dropped: AtomicBool,
126135
#[cfg(feature = "in-use-encryption")]
127136
csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
128137
#[cfg(test)]
@@ -159,6 +168,7 @@ impl Client {
159168
pending_drops: SyncMutex::new(IdSet::new()),
160169
executed: AtomicBool::new(false),
161170
},
171+
dropped: AtomicBool::new(false),
162172
#[cfg(feature = "in-use-encryption")]
163173
csfle: Default::default(),
164174
#[cfg(test)]
@@ -591,6 +601,40 @@ impl Client {
591601
pub(crate) fn options(&self) -> &ClientOptions {
592602
&self.inner.options
593603
}
604+
605+
/// Ends all sessions contained in this client's session pool on the server.
606+
pub(crate) async fn end_all_sessions(&self) {
607+
// The maximum number of session IDs that should be sent in a single endSessions command.
608+
const MAX_END_SESSIONS_BATCH_SIZE: usize = 10_000;
609+
610+
let mut watcher = self.inner.topology.watch();
611+
let selection_criteria =
612+
SelectionCriteria::from(ReadPreference::PrimaryPreferred { options: None });
613+
614+
let session_ids = self.inner.session_pool.get_session_ids().await;
615+
for chunk in session_ids.chunks(MAX_END_SESSIONS_BATCH_SIZE) {
616+
let state = watcher.observe_latest();
617+
let Ok(Some(_)) = attempt_to_select_server(
618+
&selection_criteria,
619+
&state.description,
620+
&state.servers(),
621+
None,
622+
) else {
623+
// If a suitable server is not available, do not proceed with the operation to avoid
624+
// spinning for server_selection_timeout.
625+
return;
626+
};
627+
628+
let end_sessions = doc! {
629+
"endSessions": chunk,
630+
};
631+
let _ = self
632+
.database("admin")
633+
.run_command(end_sessions)
634+
.selection_criteria(selection_criteria.clone())
635+
.await;
636+
}
637+
}
594638
}
595639

596640
#[derive(Clone, Debug)]
@@ -625,3 +669,24 @@ impl AsyncDropToken {
625669
Self { tx: self.tx.take() }
626670
}
627671
}
672+
673+
impl Drop for Client {
674+
fn drop(&mut self) {
675+
if !self.inner.shutdown.executed.load(Ordering::SeqCst)
676+
&& !self.inner.dropped.load(Ordering::SeqCst)
677+
&& TrackingArc::strong_count(&self.inner) == 1
678+
{
679+
// We need an owned copy of the client to move into the spawned future. However, if this
680+
// call to drop completes before the spawned future completes, the number of strong
681+
// references to the inner client will again be 1 when the cloned client drops, and thus
682+
// end_all_sessions will be called continuously until the runtime shuts down. Storing a
683+
// flag indicating whether end_all_sessions has already been called breaks
684+
// this cycle.
685+
self.inner.dropped.store(true, Ordering::SeqCst);
686+
let client = self.clone();
687+
crate::runtime::spawn(async move {
688+
client.end_all_sessions().await;
689+
});
690+
}
691+
}
692+
}

src/client/action/shutdown.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ impl Action for crate::action::Shutdown {
2323
.extract();
2424
join_all(pending).await;
2525
}
26+
// If shutdown has already been called on a different copy of the client, don't call
27+
// end_all_sessions again.
28+
if !self.client.inner.shutdown.executed.load(Ordering::SeqCst) {
29+
self.client.end_all_sessions().await;
30+
}
2631
self.client.inner.topology.shutdown().await;
2732
// This has to happen last to allow pending cleanup to execute commands.
2833
self.client

src/client/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ impl Drop for ClientSession {
401401
#[derive(Clone, Debug)]
402402
pub(crate) struct ServerSession {
403403
/// The id of the server session to which this corresponds.
404-
id: Document,
404+
pub(crate) id: Document,
405405

406406
/// The last time an operation was executed with this session.
407407
last_use: std::time::Instant,

src/client/session/pool.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{collections::VecDeque, time::Duration};
33
use tokio::sync::Mutex;
44

55
use super::ServerSession;
6-
#[cfg(test)]
76
use crate::bson::Document;
87

98
#[derive(Debug)]
@@ -68,4 +67,10 @@ impl ServerSessionPool {
6867
pub(crate) async fn contains(&self, id: &Document) -> bool {
6968
self.pool.lock().await.iter().any(|s| &s.id == id)
7069
}
70+
71+
/// Returns a list of the IDs of the sessions contained in the pool.
72+
pub(crate) async fn get_session_ids(&self) -> Vec<Document> {
73+
let sessions = self.pool.lock().await;
74+
sessions.iter().map(|session| session.id.clone()).collect()
75+
}
7176
}

src/gridfs/upload.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ impl GridFsUploadStream {
261261
}
262262

263263
impl Drop for GridFsUploadStream {
264-
// TODO RUST-1493: pre-create this task
265264
fn drop(&mut self) {
266265
if !matches!(self.state, State::Closed) {
267266
let chunks = self.bucket.chunks().clone();

src/test/client.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
get_client_options,
1616
log_uncaptured,
1717
util::{
18-
event_buffer::EventBuffer,
18+
event_buffer::{EventBuffer, EventStream},
1919
fail_point::{FailPoint, FailPointMode},
2020
TestClient,
2121
},
@@ -930,3 +930,55 @@ async fn warm_connection_pool() {
930930
// Validate that a command executes.
931931
client.list_database_names().await.unwrap();
932932
}
933+
934+
async fn get_end_session_event_count<'a>(event_stream: &mut EventStream<'a, Event>) -> usize {
935+
// Use collect_successful_command_execution to assert that the call to endSessions succeeded.
936+
event_stream
937+
.collect_successful_command_execution(Duration::from_millis(500), "endSessions")
938+
.await
939+
.len()
940+
}
941+
942+
#[tokio::test]
943+
async fn end_sessions_on_drop() {
944+
let client1 = Client::for_test().monitor_events().await;
945+
let client2 = client1.clone();
946+
let events = client1.events.clone();
947+
let mut event_stream = events.stream();
948+
949+
// Run an operation to populate the session pool.
950+
client1
951+
.database("db")
952+
.collection::<Document>("coll")
953+
.find(doc! {})
954+
.await
955+
.unwrap();
956+
957+
drop(client1);
958+
assert_eq!(get_end_session_event_count(&mut event_stream).await, 0);
959+
960+
drop(client2);
961+
assert_eq!(get_end_session_event_count(&mut event_stream).await, 1);
962+
}
963+
964+
#[tokio::test]
965+
async fn end_sessions_on_shutdown() {
966+
let client1 = Client::for_test().monitor_events().await;
967+
let client2 = client1.clone();
968+
let events = client1.events.clone();
969+
let mut event_stream = events.stream();
970+
971+
// Run an operation to populate the session pool.
972+
client1
973+
.database("db")
974+
.collection::<Document>("coll")
975+
.find(doc! {})
976+
.await
977+
.unwrap();
978+
979+
client1.into_client().shutdown().await;
980+
assert_eq!(get_end_session_event_count(&mut event_stream).await, 1);
981+
982+
client2.into_client().shutdown().await;
983+
assert_eq!(get_end_session_event_count(&mut event_stream).await, 0);
984+
}

src/test/spec/json/connection-monitoring-and-pooling/README.rst

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.json

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,73 @@
201201
}
202202
}
203203
},
204+
{
205+
"level": "debug",
206+
"component": "connection",
207+
"data": {
208+
"message": "Connection checkout started",
209+
"serverHost": {
210+
"$$type": "string"
211+
},
212+
"serverPort": {
213+
"$$type": [
214+
"int",
215+
"long"
216+
]
217+
}
218+
}
219+
},
220+
{
221+
"level": "debug",
222+
"component": "connection",
223+
"data": {
224+
"message": "Connection checked out",
225+
"driverConnectionId": {
226+
"$$type": [
227+
"int",
228+
"long"
229+
]
230+
},
231+
"serverHost": {
232+
"$$type": "string"
233+
},
234+
"serverPort": {
235+
"$$type": [
236+
"int",
237+
"long"
238+
]
239+
},
240+
"durationMS": {
241+
"$$type": [
242+
"double",
243+
"int",
244+
"long"
245+
]
246+
}
247+
}
248+
},
249+
{
250+
"level": "debug",
251+
"component": "connection",
252+
"data": {
253+
"message": "Connection checked in",
254+
"driverConnectionId": {
255+
"$$type": [
256+
"int",
257+
"long"
258+
]
259+
},
260+
"serverHost": {
261+
"$$type": "string"
262+
},
263+
"serverPort": {
264+
"$$type": [
265+
"int",
266+
"long"
267+
]
268+
}
269+
}
270+
},
204271
{
205272
"level": "debug",
206273
"component": "connection",

src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,31 @@ tests:
8585
serverHost: { $$type: string }
8686
serverPort: { $$type: [int, long] }
8787

88+
# The next three expected logs are for ending a session.
89+
- level: debug
90+
component: connection
91+
data:
92+
message: "Connection checkout started"
93+
serverHost: { $$type: string }
94+
serverPort: { $$type: [int, long] }
95+
96+
- level: debug
97+
component: connection
98+
data:
99+
message: "Connection checked out"
100+
driverConnectionId: { $$type: [int, long] }
101+
serverHost: { $$type: string }
102+
serverPort: { $$type: [int, long] }
103+
durationMS: { $$type: [double, int, long] }
104+
105+
- level: debug
106+
component: connection
107+
data:
108+
message: "Connection checked in"
109+
driverConnectionId: { $$type: [int, long] }
110+
serverHost: { $$type: string }
111+
serverPort: { $$type: [int, long] }
112+
88113
- level: debug
89114
component: connection
90115
data:

src/test/spec/unified_runner/operation.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2182,7 +2182,13 @@ impl TestOperation for Close {
21822182
Entity::Client(_) => {
21832183
let client = entities.get_mut(id).unwrap().as_mut_client();
21842184
let closed_client_topology_id = client.topology_id;
2185-
client.client = None;
2185+
client
2186+
.client
2187+
.take()
2188+
.unwrap()
2189+
.shutdown()
2190+
.immediate(true)
2191+
.await;
21862192

21872193
let mut entities_to_remove = vec![];
21882194
for (key, value) in entities.iter() {

0 commit comments

Comments
 (0)