Skip to content

Commit ad22c49

Browse files
authored
RUST-1046 Fix iteration of cursors when batchSize doesn't divide result size (#484)
1 parent 648ebe3 commit ad22c49

File tree

2 files changed

+60
-6
lines changed

2 files changed

+60
-6
lines changed

src/cursor/session.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ pub struct SessionCursor<T>
5252
where
5353
T: DeserializeOwned + Unpin,
5454
{
55-
exhausted: bool,
5655
client: Client,
5756
info: CursorInformation,
5857
buffer: VecDeque<T>,
@@ -63,10 +62,7 @@ where
6362
T: DeserializeOwned + Unpin + Send + Sync,
6463
{
6564
pub(crate) fn new(client: Client, spec: CursorSpecification<T>) -> Self {
66-
let exhausted = spec.id() == 0;
67-
6865
Self {
69-
exhausted,
7066
client,
7167
info: spec.info,
7268
buffer: spec.initial_buffer,
@@ -165,12 +161,25 @@ where
165161
}
166162
}
167163

164+
impl<T> SessionCursor<T>
165+
where
166+
T: DeserializeOwned + Unpin,
167+
{
168+
fn mark_exhausted(&mut self) {
169+
self.info.id = 0;
170+
}
171+
172+
fn is_exhausted(&self) -> bool {
173+
self.info.id == 0
174+
}
175+
}
176+
168177
impl<T> Drop for SessionCursor<T>
169178
where
170179
T: DeserializeOwned + Unpin,
171180
{
172181
fn drop(&mut self) {
173-
if self.exhausted {
182+
if self.is_exhausted() {
174183
return;
175184
}
176185

@@ -220,7 +229,9 @@ where
220229
fn drop(&mut self) {
221230
// Update the parent cursor's state based on any iteration performed on this handle.
222231
self.session_cursor.buffer = self.generic_cursor.take_buffer();
223-
self.session_cursor.exhausted = self.generic_cursor.is_exhausted();
232+
if self.generic_cursor.is_exhausted() {
233+
self.session_cursor.mark_exhausted();
234+
}
224235
}
225236
}
226237

src/test/coll.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,3 +1049,46 @@ async fn collection_generic_bounds() {
10491049
.collection(function_name!());
10501050
let _result = coll.insert_one(Bar {}, None).await;
10511051
}
1052+
1053+
/// Verify that a cursor with multiple batches whose last batch isn't full
1054+
/// iterates without errors.
1055+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
1056+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
1057+
async fn cursor_batch_size() {
1058+
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;
1059+
1060+
let client = TestClient::new().await;
1061+
let coll = client
1062+
.init_db_and_coll("cursor_batch_size", "cursor_batch_size")
1063+
.await;
1064+
1065+
let doc = Document::new();
1066+
coll.insert_many(vec![&doc; 10], None).await.unwrap();
1067+
1068+
let opts = FindOptions::builder().batch_size(3).build();
1069+
let cursor_no_session = coll.find(doc! {}, opts.clone()).await.unwrap();
1070+
let docs: Vec<_> = cursor_no_session.try_collect().await.unwrap();
1071+
assert_eq!(docs.len(), 10);
1072+
1073+
// test session cursors
1074+
if client.is_standalone() {
1075+
return;
1076+
}
1077+
let mut session = client.start_session(None).await.unwrap();
1078+
let mut cursor = coll
1079+
.find_with_session(doc! {}, opts.clone(), &mut session)
1080+
.await
1081+
.unwrap();
1082+
let mut docs = Vec::new();
1083+
while let Some(doc) = cursor.next(&mut session).await {
1084+
docs.push(doc.unwrap());
1085+
}
1086+
assert_eq!(docs.len(), 10);
1087+
1088+
let mut cursor = coll
1089+
.find_with_session(doc! {}, opts, &mut session)
1090+
.await
1091+
.unwrap();
1092+
let docs: Vec<_> = cursor.stream(&mut session).try_collect().await.unwrap();
1093+
assert_eq!(docs.len(), 10);
1094+
}

0 commit comments

Comments
 (0)