Skip to content

Commit b69e2e1

Browse files
authored
RUST-289 Implement implicit sessions (#168)
1 parent ef3c50a commit b69e2e1

File tree

45 files changed

+2160
-435
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2160
-435
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ sha-1 = "0.8.1"
3838
sha2 = "0.8.0"
3939
stringprep = "0.1.2"
4040
strsim = "0.10.0"
41+
take_mut = "0.2.2"
4142
time = "0.1.42"
4243
trust-dns-proto = "0.19.4"
4344
trust-dns-resolver = "0.19.0"
@@ -70,6 +71,10 @@ features = ["io-util", "sync"]
7071
version = "0.13.0"
7172
features = ["dangerous_configuration"]
7273

74+
[dependencies.uuid]
75+
version = "0.8.1"
76+
features = ["v4"]
77+
7378
[dev-dependencies]
7479
approx = "0.3.2"
7580
derive_more = "0.15.0"

src/client/auth/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl Credential {
252252
let stream_description = conn.stream_description()?;
253253

254254
// Verify server can authenticate.
255-
if !stream_description.server_type.can_auth() {
255+
if !stream_description.initial_server_type.can_auth() {
256256
return Ok(());
257257
};
258258

src/client/executor.rs

Lines changed: 152 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
use super::Client;
1+
use super::{Client, ClientSession};
22

3-
use std::collections::HashSet;
3+
use std::{collections::HashSet, sync::Arc};
44

55
use bson::Document;
66
use lazy_static::lazy_static;
77
use time::PreciseTime;
88

99
use crate::{
1010
cmap::Connection,
11-
error::Result,
11+
error::{ErrorKind, Result},
1212
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
1313
operation::Operation,
14+
options::SelectionCriteria,
15+
sdam::{Server, SessionSupportStatus},
1416
};
1517

1618
lazy_static! {
@@ -30,68 +32,84 @@ lazy_static! {
3032
}
3133

3234
impl Client {
33-
/// Executes an operation and returns the connection used to do so along with the result of the
34-
/// operation. This will be used primarily for the opening of exhaust cursors.
35-
#[allow(dead_code)]
36-
pub(crate) async fn execute_exhaust_operation<T: Operation>(
35+
/// Execute the given operation.
36+
///
37+
/// Server selection will performed using the criteria specified on the operation, if any, and
38+
/// an implicit session will be created if the operation and write concern are compatible with
39+
/// sessions.
40+
pub(crate) async fn execute_operation<T: Operation>(&self, op: T) -> Result<T::O> {
41+
let mut implicit_session = self.start_implicit_session(&op).await?;
42+
self.select_server_and_execute_operation(op, implicit_session.as_mut())
43+
.await
44+
}
45+
46+
/// Execute the given operation, returning the implicit session created for it if one was.
47+
///
48+
/// Server selection be will performed using the criteria specified on the operation, if any.
49+
pub(crate) async fn execute_cursor_operation<T: Operation>(
3750
&self,
38-
op: &T,
39-
) -> Result<(T::O, Connection)> {
40-
let server = self.select_server(op.selection_criteria()).await?;
41-
let mut conn = server.checkout_connection().await?;
42-
self.execute_operation_on_connection(op, &mut conn)
51+
op: T,
52+
) -> Result<(T::O, Option<ClientSession>)> {
53+
let mut implicit_session = self.start_implicit_session(&op).await?;
54+
self.select_server_and_execute_operation(op, implicit_session.as_mut())
4355
.await
44-
.map(|r| (r, conn))
56+
.map(|result| (result, implicit_session))
4557
}
4658

47-
pub(crate) async fn execute_operation_owned<T: Operation>(self, op: T) -> Result<T::O> {
48-
self.execute_operation(&op, None).await
59+
/// Execute the given operation with the given session.
60+
/// Server selection will performed using the criteria specified on the operation, if any.
61+
pub(crate) async fn execute_operation_with_session<T: Operation>(
62+
&self,
63+
op: T,
64+
session: &mut ClientSession,
65+
) -> Result<T::O> {
66+
self.select_server_and_execute_operation(op, Some(session))
67+
.await
4968
}
5069

51-
/// Execute the given operation, optionally specifying a connection used to do so.
52-
/// If no connection is provided, server selection will performed using the criteria specified
53-
/// on the operation, if any.
54-
pub(crate) async fn execute_operation<T: Operation>(
70+
/// Selects a server and executes the given operation on it, optionally using a provided
71+
/// session.
72+
///
73+
/// TODO: RUST-128: replace this with `execute_operation_with_retry` when implemented.
74+
async fn select_server_and_execute_operation<T: Operation>(
5575
&self,
56-
op: &T,
57-
connection: Option<&mut Connection>,
76+
op: T,
77+
session: Option<&mut ClientSession>,
5878
) -> Result<T::O> {
59-
// if no connection provided, select one.
60-
match connection {
61-
Some(conn) => self.execute_operation_on_connection(op, conn).await,
62-
None => {
63-
let server = self.select_server(op.selection_criteria()).await?;
64-
65-
let mut conn = match server.checkout_connection().await {
66-
Ok(conn) => conn,
67-
Err(err) => {
68-
self.inner
69-
.topology
70-
.handle_pre_handshake_error(err.clone(), server.address.clone())
71-
.await;
72-
return Err(err);
73-
}
74-
};
75-
76-
match self.execute_operation_on_connection(op, &mut conn).await {
77-
Ok(result) => Ok(result),
78-
Err(err) => {
79-
self.inner
80-
.topology
81-
.handle_post_handshake_error(err.clone(), conn, server)
82-
.await;
83-
Err(err)
84-
}
85-
}
79+
let server = self.select_server(op.selection_criteria()).await?;
80+
81+
let mut conn = match server.checkout_connection().await {
82+
Ok(conn) => conn,
83+
Err(err) => {
84+
self.inner
85+
.topology
86+
.handle_pre_handshake_error(err.clone(), server.address.clone())
87+
.await;
88+
return Err(err);
89+
}
90+
};
91+
92+
match self
93+
.execute_operation_on_connection(op, &mut conn, session)
94+
.await
95+
{
96+
Ok(result) => Ok(result),
97+
Err(err) => {
98+
self.inner
99+
.topology
100+
.handle_post_handshake_error(err.clone(), conn, server)
101+
.await;
102+
Err(err)
86103
}
87104
}
88105
}
89106

90-
/// Executes an operation on a given connection.
107+
/// Executes an operation on a given connection, optionally using a provided session.
91108
async fn execute_operation_on_connection<T: Operation>(
92109
&self,
93-
op: &T,
110+
op: T,
94111
connection: &mut Connection,
112+
mut session: Option<&mut ClientSession>,
95113
) -> Result<T::O> {
96114
if let Some(wc) = op.write_concern() {
97115
wc.validate()?;
@@ -103,6 +121,34 @@ impl Client {
103121
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
104122
.await;
105123

124+
match session {
125+
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
126+
cmd.set_session(session);
127+
session.update_last_use();
128+
}
129+
Some(ref session) if !op.supports_sessions() && !session.is_implicit() => {
130+
return Err(ErrorKind::ArgumentError {
131+
message: format!("{} does not support sessions", cmd.name),
132+
}
133+
.into());
134+
}
135+
Some(ref session) if !op.is_acknowledged() && !session.is_implicit() => {
136+
return Err(ErrorKind::ArgumentError {
137+
message: "Cannot use ClientSessions with unacknowledged write concern"
138+
.to_string(),
139+
}
140+
.into());
141+
}
142+
_ => {}
143+
}
144+
145+
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
146+
let client_cluster_time = self.inner.topology.cluster_time().await;
147+
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
148+
if let Some(cluster_time) = max_cluster_time {
149+
cmd.set_cluster_time(cluster_time);
150+
}
151+
106152
let connection_info = connection.info();
107153
let request_id = crate::cmap::conn::next_request_id();
108154

@@ -127,15 +173,23 @@ impl Client {
127173

128174
let start_time = PreciseTime::now();
129175

130-
let response_result = connection
131-
.send_command(cmd.clone(), request_id)
132-
.await
133-
.and_then(|response| {
176+
let response_result = match connection.send_command(cmd.clone(), request_id).await {
177+
Ok(response) => {
178+
if let Some(cluster_time) = response.cluster_time() {
179+
self.inner.topology.advance_cluster_time(cluster_time).await;
180+
if let Some(ref mut session) = session {
181+
session.advance_cluster_time(cluster_time)
182+
}
183+
}
184+
134185
if !op.handles_command_errors() {
135-
response.validate()?;
186+
response.validate().map(|_| response)
187+
} else {
188+
Ok(response)
136189
}
137-
Ok(response)
138-
});
190+
}
191+
err => err,
192+
};
139193

140194
let end_time = PreciseTime::now();
141195
let duration = start_time.to(end_time).to_std()?;
@@ -153,6 +207,13 @@ impl Client {
153207

154208
handler.handle_command_failed_event(command_failed_event);
155209
});
210+
211+
if let Some(session) = session {
212+
if error.is_network_error() {
213+
session.mark_dirty();
214+
}
215+
}
216+
156217
Err(error)
157218
}
158219
Ok(response) => {
@@ -174,8 +235,42 @@ impl Client {
174235
};
175236
handler.handle_command_succeeded_event(command_succeeded_event);
176237
});
238+
177239
op.handle_response(response)
178240
}
179241
}
180242
}
243+
244+
/// Start an implicit session if the operation and write concern are compatible with sessions.
245+
async fn start_implicit_session<T: Operation>(&self, op: &T) -> Result<Option<ClientSession>> {
246+
match self.get_session_support_status().await? {
247+
SessionSupportStatus::Supported {
248+
logical_session_timeout,
249+
} if op.supports_sessions() && op.is_acknowledged() => Ok(Some(
250+
self.start_implicit_session_with_timeout(logical_session_timeout)
251+
.await,
252+
)),
253+
_ => Ok(None),
254+
}
255+
}
256+
257+
/// Gets whether the topology supports sessions, and if so, returns the topology's logical
258+
/// session timeout. If it has yet to be determined if the topology supports sessions, this
259+
/// method will perform a server selection that will force that determination to be made.
260+
async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
261+
let initial_status = self.inner.topology.session_support_status().await;
262+
263+
// Need to guarantee that we're connected to at least one server that can determine if
264+
// sessions are supported or not.
265+
match initial_status {
266+
SessionSupportStatus::Undetermined => {
267+
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
268+
server_info.server_type().is_data_bearing()
269+
}));
270+
let _: Arc<Server> = self.select_server(Some(&criteria)).await?;
271+
Ok(self.inner.topology.session_support_status().await)
272+
}
273+
_ => Ok(initial_status),
274+
}
275+
}
181276
}

0 commit comments

Comments
 (0)