Skip to content

Commit 5478575

Browse files
authored
RUST-1191 Ensure change stream is in proper state in mid_batch_resume_token test (#689)
1 parent 09e93e2 commit 5478575

File tree

4 files changed

+51
-4
lines changed

4 files changed

+51
-4
lines changed

src/change_stream/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@ pub mod event;
33
pub(crate) mod options;
44
pub mod session;
55

6+
#[cfg(test)]
7+
use std::collections::VecDeque;
68
use std::{
79
future::Future,
810
pin::Pin,
911
task::{Context, Poll},
1012
};
1113

14+
#[cfg(test)]
15+
use bson::RawDocumentBuf;
1216
use bson::{Document, Timestamp};
1317
use derivative::Derivative;
1418
use futures_core::{future::BoxFuture, Stream};
@@ -167,6 +171,11 @@ where
167171
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
168172
self.cursor.set_kill_watcher(tx);
169173
}
174+
175+
#[cfg(test)]
176+
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
177+
self.cursor.current_batch()
178+
}
170179
}
171180

172181
/// Arguments passed to a `watch` method, captured to allow resume.

src/cursor/common.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ where
8787
self.state().buffer.current()
8888
}
8989

90+
#[cfg(test)]
91+
pub(super) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
92+
self.state().buffer.as_ref()
93+
}
94+
9095
fn state_mut(&mut self) -> &mut CursorState {
9196
self.state.as_mut().unwrap()
9297
}
@@ -531,3 +536,9 @@ impl CursorBuffer {
531536
self.docs.front().map(|d| d.as_ref())
532537
}
533538
}
539+
540+
impl AsRef<VecDeque<RawDocumentBuf>> for CursorBuffer {
541+
fn as_ref(&self) -> &VecDeque<RawDocumentBuf> {
542+
&self.docs
543+
}
544+
}

src/cursor/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
mod common;
22
pub(crate) mod session;
33

4+
#[cfg(test)]
5+
use std::collections::VecDeque;
46
use std::{
57
pin::Pin,
68
task::{Context, Poll},
79
};
810

911
use bson::RawDocument;
12+
13+
#[cfg(test)]
14+
use bson::RawDocumentBuf;
1015
use futures_core::{future::BoxFuture, Stream};
1116
use serde::{de::DeserializeOwned, Deserialize};
1217
#[cfg(test)]
@@ -269,6 +274,11 @@ impl<T> Cursor<T> {
269274
);
270275
self.kill_watcher = Some(tx);
271276
}
277+
278+
#[cfg(test)]
279+
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
280+
self.wrapped_cursor.as_ref().unwrap().current_batch()
281+
}
272282
}
273283

274284
impl<T> CursorStream for Cursor<T>

src/test/change_stream.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,11 +455,28 @@ async fn batch_mid_resume_token() -> Result<()> {
455455
None => return Ok(()),
456456
};
457457

458-
coll.insert_many((0..2).map(|i| doc! { "_id": i as i32 }), None)
459-
.await?;
458+
// This loop gets the stream to a point where it has been iterated up to but not including
459+
// the last event in its batch.
460+
let mut event_id = None;
461+
loop {
462+
match stream.next_if_any().await? {
463+
Some(event) => {
464+
event_id = Some(event.id);
465+
}
466+
// If we're out of events, make some more.
467+
None => {
468+
coll.insert_many((0..3).map(|_| doc! {}), None).await?;
469+
}
470+
};
471+
472+
// if after iterating the stream last time there's one document left,
473+
// then we're done here and can continue to the assertions.
474+
if stream.current_batch().len() == 1 {
475+
break;
476+
}
477+
}
460478

461-
let mid_id = stream.next().await.transpose()?.unwrap().id;
462-
assert_eq!(stream.resume_token(), Some(mid_id));
479+
assert_eq!(stream.resume_token().unwrap(), event_id.unwrap());
463480

464481
Ok(())
465482
}

0 commit comments

Comments
 (0)