Skip to content

Commit d1c12e1

Browse files
RUST-1588 Implement IterateOnce unified test runner operation (#936)
1 parent 9baf48b commit d1c12e1

File tree

6 files changed

+160
-39
lines changed

6 files changed

+160
-39
lines changed

src/cursor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,16 @@ impl<T> Cursor<T> {
208208
self.wrapped_cursor.as_mut().unwrap().advance().await
209209
}
210210

211+
#[cfg(test)]
212+
pub(crate) async fn try_advance(&mut self) -> Result<()> {
213+
self.wrapped_cursor
214+
.as_mut()
215+
.unwrap()
216+
.try_advance()
217+
.await
218+
.map(|_| ())
219+
}
220+
211221
/// Returns a reference to the current result in the cursor.
212222
///
213223
/// # Panics

src/cursor/common.rs

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ use crate::{
2424
Namespace,
2525
};
2626

27+
/// The result of one attempt to advance a cursor.
28+
pub(super) enum AdvanceResult {
29+
/// The cursor was successfully advanced and the buffer has at least one item.
30+
Advanced,
31+
/// The cursor does not have any more items and will not return any more in the future.
32+
Exhausted,
33+
/// The cursor does not currently have any items, but future calls to getMore may yield more.
34+
Waiting,
35+
}
36+
2737
/// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`.
2838
#[derive(Derivative)]
2939
#[derivative(Debug)]
@@ -95,33 +105,44 @@ where
95105
self.state.as_ref().unwrap()
96106
}
97107

98-
/// Advance the cursor forward to the next document.
99-
/// If there are no documents cached locally, perform getMores until
100-
/// the cursor is exhausted or a result/error has been received.
108+
/// Attempt to advance the cursor forward to the next item. If there are no items cached
109+
/// locally, perform getMores until the cursor is exhausted or the buffer has been refilled.
110+
/// Return whether or not the cursor has been advanced.
101111
pub(super) async fn advance(&mut self) -> Result<bool> {
102112
loop {
103-
self.state_mut().buffer.advance();
104-
105-
if !self.state().buffer.is_empty() {
106-
break;
113+
match self.try_advance().await? {
114+
AdvanceResult::Advanced => return Ok(true),
115+
AdvanceResult::Exhausted => return Ok(false),
116+
AdvanceResult::Waiting => continue,
107117
}
118+
}
119+
}
108120

109-
// if moving the offset puts us at the end of the buffer, perform another
110-
// getMore if the cursor is still alive.
121+
/// Attempt to advance the cursor forward to the next item. If there are no items cached
122+
/// locally, perform a single getMore to attempt to retrieve more.
123+
pub(super) async fn try_advance(&mut self) -> Result<AdvanceResult> {
124+
if self.state_mut().buffer.advance() {
125+
return Ok(AdvanceResult::Advanced);
126+
} else if self.is_exhausted() {
127+
return Ok(AdvanceResult::Exhausted);
128+
}
111129

112-
if self.state().exhausted {
113-
return Ok(false);
114-
}
130+
// If the buffer is empty but the cursor is not exhausted, perform a getMore.
131+
let client = self.client.clone();
132+
let spec = self.info.clone();
133+
let pin = self.state().pinned_connection.replicate();
115134

116-
let client = self.client.clone();
117-
let spec = self.info.clone();
118-
let pin = self.state().pinned_connection.replicate();
135+
let result = self.provider.execute(spec, client, pin).await;
136+
self.handle_get_more_result(result)?;
119137

120-
let result = self.provider.execute(spec, client, pin).await;
121-
self.handle_get_more_result(result)?;
138+
if self.is_exhausted() {
139+
Ok(AdvanceResult::Exhausted)
140+
} else {
141+
match self.state_mut().buffer.advance() {
142+
true => Ok(AdvanceResult::Advanced),
143+
false => Ok(AdvanceResult::Waiting),
144+
}
122145
}
123-
124-
Ok(true)
125146
}
126147

127148
pub(super) fn take_state(&mut self) -> CursorState {
@@ -494,21 +515,27 @@ impl CursorBuffer {
494515
self.docs.is_empty()
495516
}
496517

518+
/// Removes and returns the document in the front of the buffer.
497519
pub(crate) fn next(&mut self) -> Option<RawDocumentBuf> {
498520
self.fresh = false;
499521
self.docs.pop_front()
500522
}
501523

502-
pub(crate) fn advance(&mut self) {
503-
// if at the front of the buffer, don't move forward as the first document
504-
// hasn't been consumed yet.
524+
/// Advances the buffer to the next document. Returns whether there are any documents remaining
525+
/// in the buffer after advancing.
526+
pub(crate) fn advance(&mut self) -> bool {
527+
// If at the front of the buffer, don't move forward as the first document hasn't been
528+
// consumed yet.
505529
if self.fresh {
506530
self.fresh = false;
507-
return;
531+
} else {
532+
self.docs.pop_front();
508533
}
509-
self.next();
534+
!self.is_empty()
510535
}
511536

537+
/// Returns the item at the front of the buffer, if there is one. This method does not change
538+
/// the state of the buffer.
512539
pub(crate) fn current(&self) -> Option<&RawDocument> {
513540
self.docs.front().map(|d| d.as_ref())
514541
}
@@ -519,3 +546,24 @@ impl AsRef<VecDeque<RawDocumentBuf>> for CursorBuffer {
519546
&self.docs
520547
}
521548
}
549+
550+
#[test]
551+
fn test_buffer() {
552+
use bson::rawdoc;
553+
554+
let queue: VecDeque<RawDocumentBuf> =
555+
[rawdoc! { "x": 1 }, rawdoc! { "x": 2 }, rawdoc! { "x": 3 }].into();
556+
let mut buffer = CursorBuffer::new(queue);
557+
558+
assert!(buffer.advance());
559+
assert_eq!(buffer.current(), Some(rawdoc! { "x": 1 }.as_ref()));
560+
561+
assert!(buffer.advance());
562+
assert_eq!(buffer.current(), Some(rawdoc! { "x": 2 }.as_ref()));
563+
564+
assert!(buffer.advance());
565+
assert_eq!(buffer.current(), Some(rawdoc! { "x": 3 }.as_ref()));
566+
567+
assert!(!buffer.advance());
568+
assert_eq!(buffer.current(), None);
569+
}

src/cursor/session.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,15 @@ impl<T> SessionCursor<T> {
245245
self.make_stream(session).generic_cursor.advance().await
246246
}
247247

248+
#[cfg(test)]
249+
pub(crate) async fn try_advance(&mut self, session: &mut ClientSession) -> Result<()> {
250+
self.make_stream(session)
251+
.generic_cursor
252+
.try_advance()
253+
.await
254+
.map(|_| ())
255+
}
256+
248257
/// Returns a reference to the current result in the cursor.
249258
///
250259
/// # Panics

src/test/spec/run_command.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ async fn run_unified() {
66
let _guard = LOCK.run_exclusively().await;
77
run_unified_tests(&["run-command", "unified"])
88
.skip_tests(&[
9-
// TODO re: RUST-1649: fix withTransaction for new test runner
9+
// TODO RUST-1649: unskip when withTransaction is implemented
1010
"attaches transaction fields to given command",
11-
// TODO fix in a follow up PR because need to add IterateOnce
12-
"does not close the cursor when receiving an empty batch",
1311
])
1412
.await;
1513
}

src/test/spec/unified_runner/operation.rs

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ impl<'de> Deserialize<'de> for Operation {
382382
"getKeys" => deserialize_op::<GetKeys>(definition.arguments),
383383
#[cfg(feature = "in-use-encryption-unstable")]
384384
"removeKeyAltName" => deserialize_op::<RemoveKeyAltName>(definition.arguments),
385+
"iterateOnce" => deserialize_op::<IterateOnce>(definition.arguments),
385386
s => Ok(Box::new(UnimplementedOperation {
386387
_name: s.to_string(),
387388
}) as Box<dyn TestOperation>),
@@ -2188,13 +2189,7 @@ impl TestOperation for IterateUntilDocumentOrError {
21882189
async move {
21892190
// A `SessionCursor` also requires a `&mut Session`, which would cause conflicting
21902191
// borrows, so take the cursor from the map and return it after execution instead.
2191-
let mut cursor = test_runner
2192-
.entities
2193-
.write()
2194-
.await
2195-
.remove(id)
2196-
.unwrap()
2197-
.into_cursor();
2192+
let mut cursor = test_runner.take_cursor(id).await;
21982193
let next = match &mut cursor {
21992194
TestCursor::Normal(cursor) => {
22002195
let mut cursor = cursor.lock().await;
@@ -2224,11 +2219,7 @@ impl TestOperation for IterateUntilDocumentOrError {
22242219
}
22252220
TestCursor::Closed => None,
22262221
};
2227-
test_runner
2228-
.entities
2229-
.write()
2230-
.await
2231-
.insert(id.to_string(), Entity::Cursor(cursor));
2222+
test_runner.return_cursor(id, cursor).await;
22322223
next.transpose()
22332224
.map(|opt| opt.map(|doc| Entity::Bson(Bson::Document(doc))))
22342225
}
@@ -2965,6 +2956,49 @@ impl TestOperation for Upload {
29652956
}
29662957
}
29672958

2959+
#[derive(Debug, Deserialize)]
2960+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2961+
pub(super) struct IterateOnce {}
2962+
2963+
impl TestOperation for IterateOnce {
2964+
fn execute_entity_operation<'a>(
2965+
&'a self,
2966+
id: &'a str,
2967+
test_runner: &'a TestRunner,
2968+
) -> BoxFuture<'a, Result<Option<Entity>>> {
2969+
async move {
2970+
let mut cursor = test_runner.take_cursor(id).await;
2971+
match &mut cursor {
2972+
TestCursor::Normal(cursor) => {
2973+
let mut cursor = cursor.lock().await;
2974+
cursor.try_advance().await?;
2975+
}
2976+
TestCursor::Session { cursor, session_id } => {
2977+
cursor
2978+
.try_advance(
2979+
test_runner
2980+
.entities
2981+
.write()
2982+
.await
2983+
.get_mut(session_id)
2984+
.unwrap()
2985+
.as_mut_session_entity(),
2986+
)
2987+
.await?;
2988+
}
2989+
TestCursor::ChangeStream(change_stream) => {
2990+
let mut change_stream = change_stream.lock().await;
2991+
change_stream.next_if_any().await?;
2992+
}
2993+
TestCursor::Closed => panic!("Attempted to call IterateOnce on a closed cursor"),
2994+
}
2995+
test_runner.return_cursor(id, cursor).await;
2996+
Ok(None)
2997+
}
2998+
.boxed()
2999+
}
3000+
}
3001+
29683002
#[derive(Debug, Deserialize)]
29693003
pub(super) struct UnimplementedOperation {
29703004
_name: String,

src/test/spec/unified_runner/test_runner.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use super::{
5050
CollectionData,
5151
Entity,
5252
SessionEntity,
53+
TestCursor,
5354
TestFileEntity,
5455
};
5556

@@ -698,6 +699,27 @@ impl TestRunner {
698699
.as_client_encryption()
699700
.clone()
700701
}
702+
703+
/// Removes the cursor with the given ID from the entity map. This method passes ownership of
704+
/// the cursor to the caller so that a mutable reference to a ClientSession can be accessed from
705+
/// the entity map simultaneously. Once the caller is finished with the cursor, it MUST be
706+
/// returned to the test runner via the return_cursor method below.
707+
pub(crate) async fn take_cursor(&self, id: impl AsRef<str>) -> TestCursor {
708+
self.entities
709+
.write()
710+
.await
711+
.remove(id.as_ref())
712+
.unwrap()
713+
.into_cursor()
714+
}
715+
716+
/// Returns the given cursor to the entity map. This method must be called after take_cursor.
717+
pub(crate) async fn return_cursor(&self, id: impl AsRef<str>, cursor: TestCursor) {
718+
self.entities
719+
.write()
720+
.await
721+
.insert(id.as_ref().into(), Entity::Cursor(cursor));
722+
}
701723
}
702724

703725
#[cfg(feature = "in-use-encryption-unstable")]

0 commit comments

Comments
 (0)