Skip to content

Commit f9bc73f

Browse files
authored
RUST-796 Provide easy way to reborrow session in between cursor iterations (#341)
1 parent 993f2c9 commit f9bc73f

File tree

9 files changed

+200
-70
lines changed

9 files changed

+200
-70
lines changed

src/coll/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ where
488488
let mut cursor = self
489489
.find_with_session(filter, Some(options), session)
490490
.await?;
491-
let mut cursor = cursor.with_session(session);
491+
let mut cursor = cursor.stream(session);
492492
cursor.next().await.transpose()
493493
}
494494

src/cursor/mod.rs

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use crate::{
2121
pub(crate) use common::{CursorInformation, CursorSpecification};
2222
use common::{GenericCursor, GetMoreProvider, GetMoreProviderResult};
2323

24-
/// A `Cursor` streams the result of a query. When a query is made, a `Cursor` will be returned with
25-
/// the first batch of results from the server; the documents will be returned as the `Cursor` is
26-
/// iterated. When the batch is exhausted and if there are more results, the `Cursor` will fetch the
27-
/// next batch of documents, and so forth until the results are exhausted. Note that because of this
28-
/// batching, additional network I/O may occur on any given call to `Cursor::next`. Because of this,
29-
/// a `Cursor` iterates over `Result<Document>` items rather than simply `Document` items.
24+
/// A [`Cursor`] streams the result of a query. When a query is made, the returned [`Cursor`] will
25+
/// contain the first batch of results from the server; the individual results will then be returned
26+
/// as the [`Cursor`] is iterated. When the batch is exhausted and if there are more results, the
27+
/// [`Cursor`] will fetch the next batch of documents, and so forth until the results are exhausted.
28+
/// Note that because of this batching, additional network I/O may occur on any given call to
29+
/// `next`. Because of this, a [`Cursor`] iterates over `Result<T>` items rather than
30+
/// simply `T` items.
3031
///
3132
/// The batch size of the `Cursor` can be configured using the options to the method that returns
3233
/// it. For example, setting the `batch_size` field of
@@ -38,45 +39,42 @@ use common::{GenericCursor, GetMoreProvider, GetMoreProviderResult};
3839
/// results from the server; both of these factors should be taken into account when choosing the
3940
/// optimal batch size.
4041
///
41-
/// A cursor can be used like any other [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html). The simplest way is just to iterate over the
42-
/// documents it yields:
42+
/// [`Cursor`] implements [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html), which means
43+
/// it can be iterated over much in the same way that an `Iterator` can be in synchronous Rust. In
44+
/// order to do so, the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) trait must
45+
/// be imported. Because a [`Cursor`] iterates over a `Result<T>`, it also has access to the
46+
/// potentially more ergonomic functionality provided by
47+
/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html), which can be
48+
/// imported instead of or in addition to
49+
/// [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html). The methods from
50+
/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) are especially useful when
51+
/// used in conjunction with the `?` operator.
4352
///
4453
/// ```rust
45-
/// # use futures::stream::StreamExt;
4654
/// # use mongodb::{bson::Document, Client, error::Result};
4755
/// #
4856
/// # async fn do_stuff() -> Result<()> {
4957
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
5058
/// # let coll = client.database("foo").collection::<Document>("bar");
51-
/// # let mut cursor = coll.find(None, None).await?;
5259
/// #
60+
/// use futures::stream::{StreamExt, TryStreamExt};
61+
///
62+
/// let mut cursor = coll.find(None, None).await?;
63+
/// // regular Stream uses next() and iterates over Option<Result<T>>
5364
/// while let Some(doc) = cursor.next().await {
5465
/// println!("{}", doc?)
5566
/// }
56-
/// #
57-
/// # Ok(())
58-
/// # }
59-
/// ```
60-
///
61-
/// Additionally, all the other methods that an [`Stream`](https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html) has are available on `Cursor` as well.
62-
/// This includes all of the functionality provided by [`StreamExt`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html), which provides similar functionality to the standard library `Iterator` trait.
63-
/// For instance, if the number of results from a query is known to be small, it might make sense
64-
/// to collect them into a vector:
67+
/// // regular Stream uses collect() and collects into a Vec<Result<T>>
68+
/// let v: Vec<Result<_>> = cursor.collect().await;
6569
///
66-
/// ```rust
67-
/// # use futures::stream::StreamExt;
68-
/// # use mongodb::{
69-
/// # bson::{doc, Document},
70-
/// # error::Result,
71-
/// # Client,
72-
/// # };
73-
/// #
74-
/// # async fn do_stuff() -> Result<()> {
75-
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
76-
/// # let coll = client.database("foo").collection("bar");
77-
/// # let cursor = coll.find(Some(doc! { "x": 1 }), None).await?;
70+
/// let mut cursor = coll.find(None, None).await?;
71+
/// // TryStream uses try_next() and iterates over Result<Option<T>>
72+
/// while let Some(doc) = cursor.try_next().await? {
73+
/// println!("{}", doc)
74+
/// }
75+
/// // TryStream uses try_collect() and collects into a Result<Vec<T>>
76+
/// let v: Vec<_> = cursor.try_collect().await?;
7877
/// #
79-
/// let results: Vec<Result<Document>> = cursor.collect().await;
8078
/// # Ok(())
8179
/// # }
8280
/// ```

src/cursor/session.rs

Lines changed: 92 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{
55
};
66

77
use futures_core::{future::BoxFuture, Stream};
8+
use futures_util::StreamExt;
89
use serde::de::DeserializeOwned;
910

1011
use super::common::{CursorInformation, GenericCursor, GetMoreProvider, GetMoreProviderResult};
@@ -19,22 +20,29 @@ use crate::{
1920
RUNTIME,
2021
};
2122

22-
/// A `SessionCursor` is a cursor that was created with a `ClientSession` and must be iterated using
23-
/// one. To iterate, retrieve a `SessionCursorHandle` using `SessionCursor::with_session`:
23+
/// A [`SessionCursor`] is a cursor that was created with a [`ClientSession`] that must be iterated
24+
/// using one. To iterate, use [`SessionCursor::next`] or retrieve a [`SessionCursorStream`] using
25+
/// [`SessionCursor::stream`]:
2426
///
2527
/// ```rust
26-
/// # use futures::stream::StreamExt;
2728
/// # use mongodb::{bson::Document, Client, error::Result, ClientSession, SessionCursor};
2829
/// #
2930
/// # async fn do_stuff() -> Result<()> {
3031
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
3132
/// # let mut session = client.start_session(None).await?;
3233
/// # let coll = client.database("foo").collection::<Document>("bar");
33-
/// # let mut cursor = coll.find_with_session(None, None, &mut session).await?;
3434
/// #
35-
/// while let Some(doc) = cursor.with_session(&mut session).next().await {
36-
/// println!("{}", doc?)
35+
/// // iterate using next()
36+
/// let mut cursor = coll.find_with_session(None, None, &mut session).await?;
37+
/// while let Some(doc) = cursor.next(&mut session).await.transpose()? {
38+
/// println!("{}", doc)
3739
/// }
40+
///
41+
/// // iterate using `Stream`:
42+
/// use futures::stream::TryStreamExt;
43+
///
44+
/// let mut cursor = coll.find_with_session(None, None, &mut session).await?;
45+
/// let results: Vec<_> = cursor.stream(&mut session).try_collect().await?;
3846
/// #
3947
/// # Ok(())
4048
/// # }
@@ -67,12 +75,52 @@ where
6775
}
6876
}
6977

70-
/// Retrieves a `SessionCursorHandle` to iterate this cursor. The session provided must be the
78+
/// Retrieves a [`SessionCursorStream`] to iterate this cursor. The session provided must be the
7179
/// same session used to create the cursor.
72-
pub fn with_session<'session>(
80+
///
81+
/// Note that the borrow checker will not allow the session to be reused in between iterations
82+
/// of this stream. In order to do that, either use [`SessionCursor::next`] instead or drop
83+
/// the stream before using the session.
84+
///
85+
/// ```
86+
/// # use bson::{doc, Document};
87+
/// # use mongodb::{Client, error::Result};
88+
/// # fn main() {
89+
/// # async {
90+
/// # let client = Client::with_uri_str("foo").await?;
91+
/// # let coll = client.database("foo").collection::<Document>("bar");
92+
/// # let other_coll = coll.clone();
93+
/// # let mut session = client.start_session(None).await?;
94+
/// #
95+
/// use futures::stream::TryStreamExt;
96+
///
97+
/// // iterate over the results
98+
/// let mut cursor = coll.find_with_session(doc! { "x": 1 }, None, &mut session).await?;
99+
/// while let Some(doc) = cursor.stream(&mut session).try_next().await? {
100+
/// println!("{}", doc);
101+
/// }
102+
///
103+
/// // collect the results
104+
/// let mut cursor1 = coll.find_with_session(doc! { "x": 1 }, None, &mut session).await?;
105+
/// let v: Vec<Document> = cursor1.stream(&mut session).try_collect().await?;
106+
///
107+
/// // use session between iterations
108+
/// let mut cursor2 = coll.find_with_session(doc! { "x": 1 }, None, &mut session).await?;
109+
/// loop {
110+
/// let doc = match cursor2.stream(&mut session).try_next().await? {
111+
/// Some(d) => d,
112+
/// None => break,
113+
/// };
114+
/// other_coll.insert_one_with_session(doc, None, &mut session).await?;
115+
/// }
116+
/// # Ok::<(), mongodb::error::Error>(())
117+
/// # };
118+
/// # }
119+
/// ```
120+
pub fn stream<'session>(
73121
&mut self,
74122
session: &'session mut ClientSession,
75-
) -> SessionCursorHandle<'_, 'session, T> {
123+
) -> SessionCursorStream<'_, 'session, T> {
76124
let get_more_provider = ExplicitSessionGetMoreProvider::new(session);
77125

78126
// Pass the buffer into this cursor handle for iteration.
@@ -81,7 +129,7 @@ where
81129
info: self.info.clone(),
82130
initial_buffer: std::mem::take(&mut self.buffer),
83131
};
84-
SessionCursorHandle {
132+
SessionCursorStream {
85133
generic_cursor: ExplicitSessionCursor::new(
86134
self.client.clone(),
87135
spec,
@@ -90,6 +138,33 @@ where
90138
session_cursor: self,
91139
}
92140
}
141+
142+
/// Retrieve the next result from the cursor.
143+
/// The session provided must be the same session used to create the cursor.
144+
///
145+
/// Use this method when the session needs to be used again between iterations or when the added
146+
/// functionality of `Stream` is not needed.
147+
///
148+
/// ```
149+
/// # use bson::{doc, Document};
150+
/// # use mongodb::Client;
151+
/// # fn main() {
152+
/// # async {
153+
/// # let client = Client::with_uri_str("foo").await?;
154+
/// # let coll = client.database("foo").collection::<Document>("bar");
155+
/// # let other_coll = coll.clone();
156+
/// # let mut session = client.start_session(None).await?;
157+
/// let mut cursor = coll.find_with_session(doc! { "x": 1 }, None, &mut session).await?;
158+
/// while let Some(doc) = cursor.next(&mut session).await.transpose()? {
159+
/// other_coll.insert_one_with_session(doc, None, &mut session).await?;
160+
/// }
161+
/// # Ok::<(), mongodb::error::Error>(())
162+
/// # };
163+
/// # }
164+
/// ```
165+
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
166+
self.stream(session).next().await
167+
}
93168
}
94169

95170
impl<T> Drop for SessionCursor<T>
@@ -115,19 +190,20 @@ where
115190
/// This is to be used with cursors associated with explicit sessions borrowed from the user.
116191
type ExplicitSessionCursor<'session> = GenericCursor<ExplicitSessionGetMoreProvider<'session>>;
117192

118-
/// A handle that borrows a `ClientSession` temporarily for executing getMores or iterating through
119-
/// the current buffer of a `SessionCursor`.
193+
/// A type that implements [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) which can be used to
194+
/// stream the results of a [`SessionCursor`]. Returned from [`SessionCursor::stream`].
120195
///
121-
/// This updates the buffer of the parent `SessionCursor` when dropped.
122-
pub struct SessionCursorHandle<'cursor, 'session, T = Document>
196+
/// This updates the buffer of the parent [`SessionCursor`] when dropped. [`SessionCursor::next`] or
197+
/// any further streams created from [`SessionCursor::stream`] will pick up where this one left off.
198+
pub struct SessionCursorStream<'cursor, 'session, T = Document>
123199
where
124200
T: DeserializeOwned + Unpin,
125201
{
126202
session_cursor: &'cursor mut SessionCursor<T>,
127203
generic_cursor: ExplicitSessionCursor<'session>,
128204
}
129205

130-
impl<'cursor, 'session, T> Stream for SessionCursorHandle<'cursor, 'session, T>
206+
impl<'cursor, 'session, T> Stream for SessionCursorStream<'cursor, 'session, T>
131207
where
132208
T: DeserializeOwned + Unpin,
133209
{
@@ -144,7 +220,7 @@ where
144220
}
145221
}
146222

147-
impl<'cursor, 'session, T> Drop for SessionCursorHandle<'cursor, 'session, T>
223+
impl<'cursor, 'session, T> Drop for SessionCursorStream<'cursor, 'session, T>
148224
where
149225
T: DeserializeOwned + Unpin,
150226
{

src/db/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ impl Database {
278278
.await
279279
.map(|spec| SessionCursor::new(self.client().clone(), spec))?;
280280

281-
self.list_collection_names_common(cursor.with_session(session))
281+
self.list_collection_names_common(cursor.stream(session))
282282
.await
283283
}
284284

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,15 @@ define_if_single_runtime_enabled! {
136136
pub use crate::{
137137
client::Client,
138138
coll::Collection,
139-
cursor::{Cursor, session::{SessionCursor, SessionCursorHandle}},
139+
cursor::{Cursor, session::{SessionCursor, SessionCursorStream}},
140140
db::Database,
141141
};
142142

143143
#[cfg(feature = "sync")]
144144
pub(crate) use crate::{
145145
client::Client,
146146
coll::Collection,
147-
cursor::{Cursor, session::{SessionCursor, SessionCursorHandle}},
147+
cursor::{Cursor, session::{SessionCursor, SessionCursorStream}},
148148
db::Database,
149149
};
150150

0 commit comments

Comments
 (0)