Skip to content

Commit d33f38d

Browse files
authored
RUST-951 Enable sessions on load balancer connections (#421)
1 parent 2b7175e commit d33f38d

File tree

6 files changed

+34
-12
lines changed

6 files changed

+34
-12
lines changed

src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl Client {
255255
/// available, a new one will be created.
256256
pub(crate) async fn start_session_with_timeout(
257257
&self,
258-
logical_session_timeout: Duration,
258+
logical_session_timeout: Option<Duration>,
259259
options: Option<SessionOptions>,
260260
is_implicit: bool,
261261
) -> ClientSession {

src/client/session/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,8 +600,12 @@ impl ServerSession {
600600
}
601601

602602
/// Determines if this server session is about to expire in a short amount of time (1 minute).
603-
fn is_about_to_expire(&self, logical_session_timeout: Duration) -> bool {
604-
let expiration_date = self.last_use + logical_session_timeout;
603+
fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
604+
let timeout = match logical_session_timeout {
605+
Some(t) => t,
606+
None => return false,
607+
};
608+
let expiration_date = self.last_use + timeout;
605609
expiration_date < Instant::now() + Duration::from_secs(60)
606610
}
607611
}

src/client/session/pool.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ impl ServerSessionPool {
2121
/// Checks out a server session from the pool. Before doing so, it first clears out all the
2222
/// expired sessions. If there are no sessions left in the pool after clearing expired ones
2323
/// out, a new session will be created.
24-
pub(crate) async fn check_out(&self, logical_session_timeout: Duration) -> ServerSession {
24+
pub(crate) async fn check_out(
25+
&self,
26+
logical_session_timeout: Option<Duration>,
27+
) -> ServerSession {
2528
let mut pool = self.pool.lock().await;
2629
while let Some(session) = pool.pop_front() {
2730
// If a session is about to expire within the next minute, remove it from pool.
@@ -37,7 +40,11 @@ impl ServerSessionPool {
3740
/// discarded.
3841
///
3942
/// This method will also clear out any expired session from the pool before checking in.
40-
pub(crate) async fn check_in(&self, session: ServerSession, logical_session_timeout: Duration) {
43+
pub(crate) async fn check_in(
44+
&self,
45+
session: ServerSession,
46+
logical_session_timeout: Option<Duration>,
47+
) {
4148
let mut pool = self.pool.lock().await;
4249
while let Some(pooled_session) = pool.pop_back() {
4350
if session.is_about_to_expire(logical_session_timeout) {

src/sdam/description/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl ServerType {
4444
| ServerType::RsPrimary
4545
| ServerType::RsSecondary
4646
| ServerType::Mongos
47+
| ServerType::LoadBalancer
4748
)
4849
}
4950
}

src/sdam/description/topology/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,22 @@ impl TopologyDescription {
124124
})
125125
.collect();
126126

127+
let session_support_status = if topology_type == TopologyType::LoadBalanced {
128+
SessionSupportStatus::Supported {
129+
logical_session_timeout: None,
130+
}
131+
} else {
132+
SessionSupportStatus::Undetermined
133+
};
134+
127135
Ok(Self {
128136
single_seed: servers.len() == 1,
129137
topology_type,
130138
set_name: options.repl_set_name,
131139
max_set_version: None,
132140
max_election_id: None,
133141
compatibility_error: None,
134-
session_support_status: SessionSupportStatus::Undetermined,
142+
session_support_status,
135143
transaction_support_status: TransactionSupportStatus::Undetermined,
136144
cluster_time: None,
137145
local_threshold: options.local_threshold,
@@ -279,12 +287,12 @@ impl TopologyDescription {
279287
logical_session_timeout: topology_timeout,
280288
} => {
281289
self.session_support_status = SessionSupportStatus::Supported {
282-
logical_session_timeout: std::cmp::min(timeout, topology_timeout),
290+
logical_session_timeout: std::cmp::min(Some(timeout), topology_timeout),
283291
};
284292
}
285293
SessionSupportStatus::Undetermined => {
286294
self.session_support_status = SessionSupportStatus::Supported {
287-
logical_session_timeout: timeout,
295+
logical_session_timeout: Some(timeout),
288296
}
289297
}
290298
SessionSupportStatus::Unsupported { .. } => {
@@ -301,7 +309,7 @@ impl TopologyDescription {
301309
match min_timeout {
302310
Some(timeout) => {
303311
self.session_support_status = SessionSupportStatus::Supported {
304-
logical_session_timeout: timeout,
312+
logical_session_timeout: Some(timeout),
305313
}
306314
}
307315
None => {
@@ -721,7 +729,9 @@ pub(crate) enum SessionSupportStatus {
721729

722730
/// Sessions are supported by this topology. This is the minimum timeout of all data-bearing
723731
/// servers in the deployment.
724-
Supported { logical_session_timeout: Duration },
732+
Supported {
733+
logical_session_timeout: Option<Duration>,
734+
},
725735
}
726736

727737
impl Default for SessionSupportStatus {
@@ -740,7 +750,7 @@ impl SessionSupportStatus {
740750
} => *logical_session_timeout,
741751
Self::Supported {
742752
logical_session_timeout,
743-
} => Some(*logical_session_timeout),
753+
} => *logical_session_timeout,
744754
}
745755
}
746756
}

src/test/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl TestClient {
7373
// To avoid populating the session pool with leftover implicit sessions, we check out a
7474
// session here and immediately mark it as dirty, then use it with any operations we need.
7575
let mut session = client
76-
.start_session_with_timeout(Duration::from_secs(60 * 60), None, true)
76+
.start_session_with_timeout(Some(Duration::from_secs(60 * 60)), None, true)
7777
.await;
7878
session.mark_dirty();
7979

0 commit comments

Comments
 (0)