Skip to content

Commit 637cb4d

Browse files
authored
RUST-1149 Prose tests for change streams. (#561)
1 parent 577345c commit 637cb4d

File tree

11 files changed

+733
-58
lines changed

11 files changed

+733
-58
lines changed

src/change_stream/event.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize};
2222
/// See the documentation
2323
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
2424
/// information on resume tokens.
25-
#[derive(Clone, Debug, Deserialize, Serialize)]
25+
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
2626
pub struct ResumeToken(pub(crate) RawBson);
2727

2828
impl ResumeToken {
@@ -43,11 +43,16 @@ impl ResumeToken {
4343
pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
4444
doc.map(|doc| ResumeToken(RawBson::Document(doc)))
4545
}
46+
47+
#[cfg(test)]
48+
pub fn parsed(self) -> std::result::Result<Bson, bson::raw::Error> {
49+
self.0.try_into()
50+
}
4651
}
4752

4853
/// A `ChangeStreamEvent` represents a
4954
/// [change event](https://docs.mongodb.com/manual/reference/change-events/) in the associated change stream.
50-
#[derive(Debug, Deserialize)]
55+
#[derive(Debug, Deserialize, PartialEq)]
5156
#[serde(rename_all = "camelCase")]
5257
#[non_exhaustive]
5358
pub struct ChangeStreamEvent<T> {
@@ -99,7 +104,7 @@ pub struct ChangeStreamEvent<T> {
99104
}
100105

101106
/// Describes which fields have been updated or removed from a document.
102-
#[derive(Debug, Deserialize)]
107+
#[derive(Debug, Deserialize, PartialEq)]
103108
#[serde(rename_all = "camelCase")]
104109
#[non_exhaustive]
105110
pub struct UpdateDescription {
@@ -112,7 +117,7 @@ pub struct UpdateDescription {
112117
}
113118

114119
/// The operation type represented in a given change notification.
115-
#[derive(Debug, Deserialize, Clone, PartialEq)]
120+
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
116121
#[serde(rename_all = "camelCase")]
117122
#[non_exhaustive]
118123
pub enum OperationType {
@@ -142,13 +147,12 @@ pub enum OperationType {
142147
}
143148

144149
/// Identifies the collection or database on which an event occurred.
145-
#[derive(Deserialize, Debug)]
146-
#[serde(untagged)]
150+
#[derive(Deserialize, Debug, PartialEq, Eq)]
147151
#[non_exhaustive]
148-
pub enum ChangeStreamEventSource {
149-
/// The [`Namespace`] containing the database and collection in which the change occurred.
150-
Namespace(Namespace),
152+
pub struct ChangeStreamEventSource {
153+
/// The name of the database in which the change occurred.
154+
pub db: String,
151155

152-
/// Contains the name of the database in which the change happened.
153-
Database(String),
156+
/// The name of the collection in which the change occurred.
157+
pub coll: Option<String>,
154158
}

src/change_stream/mod.rs

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
options::ChangeStreamOptions,
2222
},
2323
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
24-
error::{Error, Result},
24+
error::{Error, ErrorKind, Result},
2525
operation::AggregateTarget,
2626
options::AggregateOptions,
2727
selection_criteria::{ReadPreference, SelectionCriteria},
@@ -220,10 +220,14 @@ fn get_resume_token(
220220
) -> Result<Option<ResumeToken>> {
221221
Ok(match batch_value {
222222
BatchValue::Some { doc, is_last } => {
223+
let doc_token = match doc.get("_id")? {
224+
Some(val) => ResumeToken(val.to_raw_bson()),
225+
None => return Err(ErrorKind::MissingResumeToken.into()),
226+
};
223227
if *is_last && batch_token.is_some() {
224228
batch_token.cloned()
225229
} else {
226-
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson()))
230+
Some(doc_token)
227231
}
228232
}
229233
BatchValue::Empty => batch_token.cloned(),
@@ -236,50 +240,56 @@ where
236240
T: DeserializeOwned + Unpin + Send + Sync,
237241
{
238242
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
239-
if let Some(mut pending) = self.pending_resume.take() {
240-
match Pin::new(&mut pending).poll(cx) {
241-
Poll::Pending => {
242-
self.pending_resume = Some(pending);
243-
return Poll::Pending;
243+
loop {
244+
if let Some(mut pending) = self.pending_resume.take() {
245+
match Pin::new(&mut pending).poll(cx) {
246+
Poll::Pending => {
247+
self.pending_resume = Some(pending);
248+
return Poll::Pending;
249+
}
250+
Poll::Ready(Ok(new_stream)) => {
251+
// Ensure that the old cursor is killed on the server selected for the new
252+
// one.
253+
self.cursor
254+
.set_drop_address(new_stream.cursor.address().clone());
255+
self.cursor = new_stream.cursor;
256+
self.args = new_stream.args;
257+
continue;
258+
}
259+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
244260
}
245-
Poll::Ready(Ok(new_stream)) => {
246-
// Ensure that the old cursor is killed on the server selected for the new one.
247-
self.cursor
248-
.set_drop_address(new_stream.cursor.address().clone());
249-
self.cursor = new_stream.cursor;
250-
self.args = new_stream.args;
251-
return Poll::Pending;
252-
}
253-
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
254261
}
255-
}
256-
let out = self.cursor.poll_next_in_batch(cx);
257-
match &out {
258-
Poll::Ready(Ok(bv)) => {
259-
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
260-
self.data.resume_token = Some(token);
262+
let out = self.cursor.poll_next_in_batch(cx);
263+
match &out {
264+
Poll::Ready(Ok(bv)) => {
265+
if let Some(token) =
266+
get_resume_token(bv, self.cursor.post_batch_resume_token())?
267+
{
268+
self.data.resume_token = Some(token);
269+
}
270+
if matches!(bv, BatchValue::Some { .. }) {
271+
self.data.document_returned = true;
272+
}
261273
}
262-
if matches!(bv, BatchValue::Some { .. }) {
263-
self.data.document_returned = true;
274+
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => {
275+
self.data.resume_attempted = true;
276+
let client = self.cursor.client().clone();
277+
let args = self.args.clone();
278+
let mut data = self.data.take();
279+
data.implicit_session = self.cursor.take_implicit_session();
280+
self.pending_resume = Some(Box::pin(async move {
281+
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client
282+
.execute_watch(args.pipeline, args.options, args.target, Some(data))
283+
.await;
284+
new_stream.map(|cs| cs.with_type::<T>())
285+
}));
286+
// Iterate the loop so the new future gets polled and can register wakers.
287+
continue;
264288
}
289+
_ => {}
265290
}
266-
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => {
267-
self.data.resume_attempted = true;
268-
let client = self.cursor.client().clone();
269-
let args = self.args.clone();
270-
let mut data = self.data.take();
271-
data.implicit_session = self.cursor.take_implicit_session();
272-
self.pending_resume = Some(Box::pin(async move {
273-
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client
274-
.execute_watch(args.pipeline, args.options, args.target, Some(data))
275-
.await;
276-
new_stream.map(|cs| cs.with_type::<T>())
277-
}));
278-
return Poll::Pending;
279-
}
280-
_ => {}
291+
return out;
281292
}
282-
out
283293
}
284294
}
285295

src/cmap/test/integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn connection_error_during_establishment() {
180180
client_options.repl_set_name = None;
181181

182182
let client = TestClient::with_options(Some(client_options.clone())).await;
183-
if !client.supports_fail_command().await {
183+
if !client.supports_fail_command() {
184184
println!(
185185
"skipping {} due to failCommand not being supported",
186186
function_name!()
@@ -235,7 +235,7 @@ async fn connection_error_during_operation() {
235235
options.max_pool_size = Some(1);
236236

237237
let client = TestClient::with_options(options.into()).await;
238-
if !client.supports_fail_command().await {
238+
if !client.supports_fail_command() {
239239
println!(
240240
"skipping {} due to failCommand not being supported",
241241
function_name!()

src/coll/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1354,7 +1354,7 @@ where
13541354
}
13551355

13561356
/// A struct modeling the canonical name for a collection in MongoDB.
1357-
#[derive(Debug, Clone)]
1357+
#[derive(Debug, Clone, PartialEq, Eq)]
13581358
pub struct Namespace {
13591359
/// The name of the database associated with this namespace.
13601360
pub db: String,

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,10 @@ pub enum ErrorKind {
470470
#[error("The server does not support a database operation: {message}")]
471471
#[non_exhaustive]
472472
IncompatibleServer { message: String },
473+
474+
/// No resume token was present in a change stream document.
475+
#[error("Cannot provide resume functionality when the resume token is missing")]
476+
MissingResumeToken,
473477
}
474478

475479
impl ErrorKind {

src/sdam/description/topology/test/sdam.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ async fn heartbeat_events() {
615615
.await
616616
.expect("should see server heartbeat succeeded event");
617617

618-
if !client.supports_fail_command().await {
618+
if !client.supports_fail_command() {
619619
return;
620620
}
621621

src/sdam/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn min_heartbeat_frequency() {
4242

4343
let setup_client = TestClient::with_options(Some(setup_client_options.clone())).await;
4444

45-
if !setup_client.supports_fail_command().await {
45+
if !setup_client.supports_fail_command() {
4646
println!("skipping min_heartbeat_frequency test due to server not supporting fail points");
4747
return;
4848
}

0 commit comments

Comments
 (0)