Skip to content

Commit 3ffc906

Browse files
authored
DRIVERS-2617 Test splitting of large changestream events (#878)
1 parent 8cb9096 commit 3ffc906

File tree

1 file changed

+67
-2
lines changed

1 file changed

+67
-2
lines changed

src/test/change_stream.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use bson::{doc, Bson, Document};
2-
use futures_util::StreamExt;
2+
use futures_util::{StreamExt, TryStreamExt};
33
use semver::VersionReq;
44

55
use crate::{
66
change_stream::{
77
event::{ChangeStreamEvent, OperationType},
8-
options::ChangeStreamOptions,
8+
options::{ChangeStreamOptions, FullDocumentBeforeChangeType},
99
ChangeStream,
1010
},
1111
coll::options::CollectionOptions,
1212
db::options::{ChangeStreamPreAndPostImages, CreateCollectionOptions},
1313
event::command::{CommandEvent, CommandStartedEvent, CommandSucceededEvent},
1414
options::{Acknowledgment, WriteConcern},
1515
test::{FailCommandOptions, FailPoint, FailPointMode},
16+
Client,
1617
Collection,
1718
};
1819

@@ -709,3 +710,67 @@ async fn create_coll_pre_post() -> Result<()> {
709710

710711
Ok(())
711712
}
713+
714+
// Prose test 19: large event splitting
715+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
716+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
717+
async fn split_large_event() -> Result<()> {
718+
let _guard = LOCK.run_concurrently().await;
719+
720+
let client = Client::test_builder().build().await;
721+
if !client.server_version_gte(7, 0) {
722+
log_uncaptured(format!(
723+
"skipping change stream test on unsupported version {:?}",
724+
client.server_version
725+
));
726+
return Ok(());
727+
}
728+
if !client.is_replica_set() && !client.is_sharded() {
729+
log_uncaptured("skipping change stream test on unsupported topology");
730+
return Ok(());
731+
}
732+
733+
let db = client.database("change_stream_tests");
734+
db.collection::<Document>("split_large_event")
735+
.drop(None)
736+
.await?;
737+
db.create_collection(
738+
"split_large_event",
739+
CreateCollectionOptions::builder()
740+
.change_stream_pre_and_post_images(ChangeStreamPreAndPostImages { enabled: true })
741+
.build(),
742+
)
743+
.await?;
744+
745+
let coll = db.collection::<Document>("split_large_event");
746+
coll.insert_one(doc! { "value": "q".repeat(10 * 1024 * 1024) }, None)
747+
.await?;
748+
let stream = coll
749+
.watch(
750+
[doc! { "$changeStreamSplitLargeEvent": {} }],
751+
ChangeStreamOptions::builder()
752+
.full_document_before_change(Some(FullDocumentBeforeChangeType::Required))
753+
.build(),
754+
)
755+
.await?
756+
.with_type::<Document>();
757+
coll.update_one(
758+
doc! {},
759+
doc! { "$set": { "value": "z".repeat(10 * 1024 * 1024) } },
760+
None,
761+
)
762+
.await?;
763+
764+
let events: Vec<_> = stream.take(2).try_collect().await?;
765+
assert_eq!(2, events.len());
766+
assert_eq!(
767+
events[0].get_document("splitEvent")?,
768+
&doc! { "fragment": 1, "of": 2 },
769+
);
770+
assert_eq!(
771+
events[1].get_document("splitEvent")?,
772+
&doc! { "fragment": 2, "of": 2 },
773+
);
774+
775+
Ok(())
776+
}

0 commit comments

Comments
 (0)