Skip to content

Commit b843bfa

Browse files
author
Andrew Witten
authored
RUST-48 Causal consistency support (#493)
1 parent 492dbb1 commit b843bfa

File tree

21 files changed

+820
-48
lines changed

21 files changed

+820
-48
lines changed

src/client/executor.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,31 @@ impl Client {
417417
}
418418
cmd.set_snapshot_read_concern(session);
419419
}
420+
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
421+
// Causal consistency defaults to true, unless snapshot is true.
422+
else if session
423+
.options()
424+
.and_then(|opts| opts.causal_consistency)
425+
.unwrap_or(true)
426+
&& matches!(
427+
session.transaction.state,
428+
TransactionState::None | TransactionState::Starting
429+
)
430+
&& op.supports_read_concern(stream_description)
431+
{
432+
cmd.set_after_cluster_time(session);
433+
}
434+
420435
match session.transaction.state {
421436
TransactionState::Starting => {
422437
cmd.set_start_transaction();
423438
cmd.set_autocommit();
424-
cmd.set_txn_read_concern(*session);
439+
440+
if let Some(ref options) = session.transaction.options {
441+
if let Some(ref read_concern) = options.read_concern {
442+
cmd.set_read_concern_level(read_concern.level.clone());
443+
}
444+
}
425445
if self.is_load_balanced() {
426446
session.pin_connection(connection.pin()?);
427447
} else if is_sharded {
@@ -512,6 +532,9 @@ impl Client {
512532
Ok(response) => {
513533
match T::Response::deserialize_response(&response) {
514534
Ok(r) => {
535+
if let (Some(session), Some(ts)) = (session.as_mut(), r.operation_time()) {
536+
session.advance_operation_time(ts);
537+
}
515538
self.update_cluster_time(&r, session).await;
516539
if r.is_success() {
517540
// Retrieve recovery token from successful response.
@@ -542,6 +565,11 @@ impl Client {
542565
// a generic command response without the operation's body.
543566
match response.body::<CommandResponse<Option<CommandErrorBody>>>() {
544567
Ok(error_response) => {
568+
if let (Some(session), Some(ts)) =
569+
(session.as_mut(), error_response.operation_time())
570+
{
571+
session.advance_operation_time(ts);
572+
}
545573
self.update_cluster_time(&error_response, session).await;
546574
match error_response.body {
547575
// if the response was ok: 0, return the command error.

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ impl Client {
242242

243243
/// Starts a new `ClientSession`.
244244
pub async fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
245+
if let Some(ref options) = options {
246+
options.validate()?;
247+
}
245248
match self.get_session_support_status().await? {
246249
SessionSupportStatus::Supported {
247250
logical_session_timeout,

src/client/options/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2400,12 +2400,32 @@ pub struct SessionOptions {
24002400
/// associated with the operations within the transaction.
24012401
pub default_transaction_options: Option<TransactionOptions>,
24022402

2403+
/// If true, all operations performed in the context of this session
2404+
/// will be [causally consistent](https://docs.mongodb.com/manual/core/causal-consistency-read-write-concerns/).
2405+
///
2406+
/// Defaults to true if [`SessionOptions::snapshot`] is unspecified.
2407+
pub causal_consistency: Option<bool>,
2408+
24032409
/// If true, all read operations performed using this client session will share the same
24042410
/// snapshot. Defaults to false.
2405-
// TODO RUST-18 enforce snapshot exclusivity with causalConsistency.
24062411
pub snapshot: Option<bool>,
24072412
}
24082413

2414+
impl SessionOptions {
2415+
pub(crate) fn validate(&self) -> Result<()> {
2416+
if let (Some(causal_consistency), Some(snapshot)) = (self.causal_consistency, self.snapshot)
2417+
{
2418+
if causal_consistency && snapshot {
2419+
return Err(ErrorKind::InvalidArgument {
2420+
message: "snapshot and causal consistency are mutually exclusive".to_string(),
2421+
}
2422+
.into());
2423+
}
2424+
}
2425+
Ok(())
2426+
}
2427+
}
2428+
24092429
/// Contains the options that can be used for a transaction.
24102430
#[skip_serializing_none]
24112431
#[derive(Debug, Default, Serialize, Deserialize, TypedBuilder, Clone)]

src/client/session/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub struct ClientSession {
109109
options: Option<SessionOptions>,
110110
pub(crate) transaction: Transaction,
111111
pub(crate) snapshot_time: Option<Timestamp>,
112+
pub(crate) operation_time: Option<Timestamp>,
112113
}
113114

114115
#[derive(Debug)]
@@ -214,6 +215,7 @@ impl ClientSession {
214215
options,
215216
transaction: Default::default(),
216217
snapshot_time: None,
218+
operation_time: None,
217219
}
218220
}
219221

@@ -257,6 +259,21 @@ impl ClientSession {
257259
}
258260
}
259261

262+
/// Advance operation time for this session. If the provided timestamp is earlier than this
263+
/// session's current operation time, then the operation time is unchanged.
264+
pub fn advance_operation_time(&mut self, ts: Timestamp) {
265+
self.operation_time = match self.operation_time {
266+
Some(current_op_time) if current_op_time < ts => Some(ts),
267+
None => Some(ts),
268+
_ => self.operation_time,
269+
}
270+
}
271+
272+
/// The operation time returned by the last operation executed in this session.
273+
pub fn operation_time(&self) -> Option<Timestamp> {
274+
self.operation_time
275+
}
276+
260277
/// Mark this session (and the underlying server session) as dirty.
261278
pub(crate) fn mark_dirty(&mut self) {
262279
self.server_session.dirty = true;
@@ -559,6 +576,7 @@ struct DroppedClientSession {
559576
options: Option<SessionOptions>,
560577
transaction: Transaction,
561578
snapshot_time: Option<Timestamp>,
579+
operation_time: Option<Timestamp>,
562580
}
563581

564582
impl From<DroppedClientSession> for ClientSession {
@@ -571,6 +589,7 @@ impl From<DroppedClientSession> for ClientSession {
571589
options: dropped_session.options,
572590
transaction: dropped_session.transaction,
573591
snapshot_time: dropped_session.snapshot_time,
592+
operation_time: dropped_session.operation_time,
574593
}
575594
}
576595
}
@@ -586,6 +605,7 @@ impl Drop for ClientSession {
586605
options: self.options.clone(),
587606
transaction: self.transaction.take(),
588607
snapshot_time: self.snapshot_time,
608+
operation_time: self.operation_time,
589609
};
590610
RUNTIME.execute(async move {
591611
let mut session: ClientSession = dropped_session.into();

0 commit comments

Comments
 (0)