Skip to content

Commit 13e0635

Browse files
authored
RUST-2030 Add more event fields: lsid, txnNumber and disambiguatedPaths (#1197)
1 parent 6502b5f commit 13e0635

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/change_stream/event.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ pub struct ChangeStreamEvent<T> {
7474
/// The new name for the `ns` collection. Only included for `OperationType::Rename`.
7575
pub to: Option<ChangeNamespace>,
7676

77+
/// The identifier for the session associated with the transaction.
78+
/// Only present if the operation is part of a multi-document transaction.
79+
pub lsid: Option<Document>,
80+
81+
/// Together with the lsid, a number that helps uniquely identify a transaction.
82+
/// Only present if the operation is part of a multi-document transaction.
83+
pub txn_number: Option<i64>,
84+
7785
/// A `Document` that contains the `_id` of the document created or modified by the `insert`,
7886
/// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
7987
/// also displays the full shard key for the document. The `_id` field is not repeated if it is
@@ -126,6 +134,12 @@ pub struct UpdateDescription {
126134

127135
/// Arrays that were truncated in the `Document`.
128136
pub truncated_arrays: Option<Vec<TruncatedArray>>,
137+
138+
/// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
139+
/// document provides the path key with an array listing each path component.
140+
/// Note: The disambiguatedPaths field is only available on change streams started with the
141+
/// showExpandedEvents option
142+
pub disambiguated_paths: Option<Document>,
129143
}
130144

131145
/// Describes an array that has been truncated.

src/test/change_stream.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,58 @@ async fn split_large_event() -> Result<()> {
659659
Ok(())
660660
}
661661

662+
/// Test that transaction fields are parsed correctly
663+
#[tokio::test]
664+
async fn transaction_fields() -> Result<()> {
665+
let (client, coll, mut stream) =
666+
match init_stream("chang_stream_transaction_fields", true).await? {
667+
Some(t) => t,
668+
None => return Ok(()),
669+
};
670+
if client.is_sharded() {
671+
log_uncaptured("skipping change stream test transaction_fields on unsupported topology");
672+
return Ok(());
673+
}
674+
if !VersionReq::parse(">=5.0")
675+
.unwrap()
676+
.matches(&client.server_version)
677+
{
678+
log_uncaptured(format!(
679+
"skipping change stream test transaction_fields on unsupported version {:?}",
680+
client.server_version
681+
));
682+
return Ok(());
683+
}
684+
if !client.supports_transactions() {
685+
log_uncaptured(
686+
"skipping change stream transaction_fields test due to lack of transaction support",
687+
);
688+
return Ok(());
689+
}
690+
691+
let mut session = client.start_session().await.unwrap();
692+
let session_id = session.id().get("id").cloned();
693+
assert!(session_id.is_some());
694+
session.start_transaction().await.unwrap();
695+
coll.insert_one(doc! {"_id": 1})
696+
.session(&mut session)
697+
.await?;
698+
session.commit_transaction().await.unwrap();
699+
700+
let next_event = stream.next().await.transpose()?;
701+
assert!(matches!(next_event,
702+
Some(ChangeStreamEvent {
703+
operation_type: OperationType::Insert,
704+
document_key: Some(key),
705+
lsid: Some(lsid),
706+
txn_number: Some(1),
707+
..
708+
}) if key == doc! { "_id": 1 } && lsid.get("id") == session_id.as_ref()
709+
));
710+
711+
Ok(())
712+
}
713+
662714
// Regression test: `Collection::watch` uses the type parameter. This is not flagged as a test to
663715
// run because it's just asserting that this compiles.
664716
#[allow(unreachable_code, unused_variables, clippy::diverging_sub_expression)]

0 commit comments

Comments
 (0)