Skip to content

Commit d626f70

Browse files
authored
RUST-1106 Make the change streams API visible (#571)
1 parent f8d2e6f commit d626f70

File tree

13 files changed

+55
-89
lines changed

13 files changed

+55
-89
lines changed

src/change_stream/event.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
//! Contains the types related to a `ChangeStream` event.
2+
#[cfg(test)]
23
use std::convert::TryInto;
34

4-
use crate::{
5-
coll::Namespace,
6-
cursor::CursorSpecification,
7-
error::Result,
8-
options::ChangeStreamOptions,
9-
};
5+
use crate::{coll::Namespace, cursor::CursorSpecification, options::ChangeStreamOptions};
106

11-
use bson::{Bson, Document, RawBson, RawDocument, RawDocumentBuf, Timestamp};
7+
#[cfg(test)]
8+
use bson::Bson;
9+
use bson::{Document, RawBson, RawDocumentBuf, Timestamp};
1210
use serde::{Deserialize, Serialize};
1311

1412
/// An opaque token used for resuming an interrupted

src/change_stream/mod.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ pub mod session;
55

66
use std::{
77
future::Future,
8-
marker::PhantomData,
98
pin::Pin,
109
task::{Context, Poll},
1110
};
1211

1312
use bson::{Document, Timestamp};
1413
use derivative::Derivative;
1514
use futures_core::{future::BoxFuture, Stream};
16-
use serde::{de::DeserializeOwned, Deserialize};
15+
use serde::de::DeserializeOwned;
16+
#[cfg(test)]
1717
use tokio::sync::oneshot;
1818

1919
use crate::{
@@ -22,15 +22,10 @@ use crate::{
2222
options::ChangeStreamOptions,
2323
},
2424
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
25-
error::{Error, ErrorKind, Result},
25+
error::{ErrorKind, Result},
2626
operation::AggregateTarget,
27-
options::AggregateOptions,
28-
selection_criteria::{ReadPreference, SelectionCriteria},
29-
Client,
3027
ClientSession,
31-
Collection,
3228
Cursor,
33-
Database,
3429
};
3530

3631
/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
@@ -49,7 +44,7 @@ use crate::{
4944
///
5045
/// A `ChangeStream` can be iterated like any other [`Stream`]:
5146
///
52-
/// ```ignore
47+
/// ```
5348
/// # #[cfg(not(feature = "sync"))]
5449
/// # use futures::stream::StreamExt;
5550
/// # use mongodb::{Client, error::Result, bson::doc,
@@ -144,18 +139,18 @@ where
144139
/// empty. This method should be used when storing the resume token in order to ensure the
145140
/// most up to date token is received, e.g.
146141
///
147-
/// ```ignore
148-
/// # use mongodb::{Client, error::Result};
142+
/// ```
143+
/// # use mongodb::{Client, Collection, bson::Document, error::Result};
149144
/// # async fn func() -> Result<()> {
150145
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
151-
/// # let coll = client.database("foo").collection("bar");
146+
/// # let coll: Collection<Document> = client.database("foo").collection("bar");
152147
/// let mut change_stream = coll.watch(None, None).await?;
153148
/// let mut resume_token = None;
154149
/// while change_stream.is_alive() {
155-
/// if let Some(event) = change_stream.next_if_any() {
150+
/// if let Some(event) = change_stream.next_if_any().await? {
156151
/// // process event
157152
/// }
158-
/// resume_token = change_stream.resume_token().cloned();
153+
/// resume_token = change_stream.resume_token();
159154
/// }
160155
/// #
161156
/// # Ok(())
@@ -170,8 +165,6 @@ where
170165

171166
#[cfg(test)]
172167
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
173-
use tokio::sync::oneshot;
174-
175168
self.cursor.set_kill_watcher(tx);
176169
}
177170
}
@@ -307,7 +300,7 @@ where
307300
{
308301
type Item = Result<T>;
309302

310-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
303+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
311304
stream_poll_next(Pin::into_inner(self), cx)
312305
}
313306
}

src/change_stream/session.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,11 @@
11
//! Types for change streams using sessions.
2-
use std::{
3-
future::Future,
4-
marker::PhantomData,
5-
pin::Pin,
6-
sync::{Arc, Mutex},
7-
task::{Context, Poll},
8-
};
9-
10-
use bson::Document;
112
use serde::de::DeserializeOwned;
123

134
use crate::{
14-
cursor::{BatchValue, CursorStream, NextInBatchFuture},
5+
cursor::{BatchValue, NextInBatchFuture},
156
error::Result,
167
ClientSession,
178
SessionCursor,
18-
SessionCursorStream,
199
};
2010

2111
use super::{
@@ -25,10 +15,10 @@ use super::{
2515
WatchArgs,
2616
};
2717

28-
/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
18+
/// A [`SessionChangeStream`] is a change stream that was created with a ClientSession that must
2919
/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
3020
///
31-
/// ```ignore
21+
/// ```
3222
/// # use mongodb::{bson::Document, Client, error::Result};
3323
/// #
3424
/// # async fn do_stuff() -> Result<()> {
@@ -79,7 +69,7 @@ where
7969
/// Retrieve the next result from the change stream.
8070
/// The session provided must be the same session used to create the change stream.
8171
///
82-
/// ```ignore
72+
/// ```
8373
/// # use bson::{doc, Document};
8474
/// # use mongodb::Client;
8575
/// # fn main() {
@@ -120,16 +110,16 @@ where
120110
/// empty. This method should be used when storing the resume token in order to ensure the
121111
/// most up to date token is received, e.g.
122112
///
123-
/// ```ignore
124-
/// # use mongodb::{Client, error::Result};
113+
/// ```
114+
/// # use mongodb::{Client, Collection, bson::Document, error::Result};
125115
/// # async fn func() -> Result<()> {
126116
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
127-
/// # let coll = client.database("foo").collection("bar");
117+
/// # let coll: Collection<Document> = client.database("foo").collection("bar");
128118
/// # let mut session = client.start_session(None).await?;
129119
/// let mut change_stream = coll.watch_with_session(None, None, &mut session).await?;
130120
/// let mut resume_token = None;
131121
/// while change_stream.is_alive() {
132-
/// if let Some(event) = change_stream.next_if_any(&mut session) {
122+
/// if let Some(event) = change_stream.next_if_any(&mut session).await? {
133123
/// // process event
134124
/// }
135125
/// resume_token = change_stream.resume_token();

src/client/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,7 @@ impl Client {
282282
///
283283
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
284284
/// changed via [`ChangeStream::with_type`].
285-
#[allow(unused)]
286-
pub(crate) async fn watch(
285+
pub async fn watch(
287286
&self,
288287
pipeline: impl IntoIterator<Item = Document>,
289288
options: impl Into<Option<ChangeStreamOptions>>,
@@ -299,8 +298,7 @@ impl Client {
299298

300299
/// Starts a new [`SessionChangeStream`] that receives events for all changes in the cluster
301300
/// using the provided [`ClientSession`]. See [`Client::watch`] for more information.
302-
#[allow(unused)]
303-
pub(crate) async fn watch_with_session(
301+
pub async fn watch_with_session(
304302
&self,
305303
pipeline: impl IntoIterator<Item = Document>,
306304
options: impl Into<Option<ChangeStreamOptions>>,

src/coll/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,7 @@ impl<T> Collection<T> {
814814
///
815815
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
816816
/// changed via [`ChangeStream::with_type`].
817-
#[allow(unused)]
818-
pub(crate) async fn watch(
817+
pub async fn watch(
819818
&self,
820819
pipeline: impl IntoIterator<Item = Document>,
821820
options: impl Into<Option<ChangeStreamOptions>>,
@@ -833,8 +832,7 @@ impl<T> Collection<T> {
833832

834833
/// Starts a new [`SessionChangeStream`] that receives events for all changes in this collection
835834
/// using the provided [`ClientSession`]. See [`Client::watch`] for more information.
836-
#[allow(unused)]
837-
pub(crate) async fn watch_with_session(
835+
pub async fn watch_with_session(
838836
&self,
839837
pipeline: impl IntoIterator<Item = Document>,
840838
options: impl Into<Option<ChangeStreamOptions>>,

src/db/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,7 @@ impl Database {
463463
///
464464
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
465465
/// changed via [`ChangeStream::with_type`].
466-
#[allow(unused)]
467-
pub(crate) async fn watch(
466+
pub async fn watch(
468467
&self,
469468
pipeline: impl IntoIterator<Item = Document>,
470469
options: impl Into<Option<ChangeStreamOptions>>,
@@ -479,8 +478,7 @@ impl Database {
479478

480479
/// Starts a new [`SessionChangeStream`] that receives events for all changes in this database
481480
/// using the provided [`ClientSession`]. See [`Database::watch`] for more information.
482-
#[allow(unused)]
483-
pub(crate) async fn watch_with_session(
481+
pub async fn watch_with_session(
484482
&self,
485483
pipeline: impl IntoIterator<Item = Document>,
486484
options: impl Into<Option<ChangeStreamOptions>>,

src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,7 @@ pub mod options;
310310
pub use ::bson;
311311

312312
mod bson_util;
313-
#[allow(unused)]
314-
pub(crate) mod change_stream;
313+
pub mod change_stream;
315314
mod client;
316315
mod cmap;
317316
mod coll;

src/options.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
//! ```
1717
1818
pub use crate::{
19+
change_stream::options::*,
1920
client::{auth::*, options::*},
2021
coll::options::*,
2122
collation::*,
@@ -26,9 +27,6 @@ pub use crate::{
2627
selection_criteria::*,
2728
};
2829

29-
#[allow(unused)]
30-
pub(crate) use crate::change_stream::options::*;
31-
3230
/// Updates an options struct with the read preference/read concern/write concern of a
3331
/// client/database/collection.
3432
macro_rules! resolve_options {

src/sync/change_stream.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ use super::ClientSession;
2222
/// ["resumable"](https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error)
2323
/// errors, such as transient network failures. It can also be done manually by passing
2424
/// a [`ResumeToken`] retrieved from a past event into either the
25-
/// [`resume_after`](ChangeStreamOptions::resume_after) or
26-
/// [`start_after`](ChangeStreamOptions::start_after) (4.2+) options used to create the
27-
/// `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
25+
/// [`resume_after`](crate::options::ChangeStreamOptions::resume_after) or
26+
/// [`start_after`](crate::options::ChangeStreamOptions::start_after) (4.2+) options used to create
27+
/// the `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
2828
/// explicitly opt out of resumability.
2929
///
3030
/// A `ChangeStream` can be iterated like any other [`Iterator`]:
3131
///
32-
/// ```ignore
32+
/// ```
3333
/// # use mongodb::{sync::Client, error::Result, bson::doc,
3434
/// # change_stream::event::ChangeStreamEvent};
3535
/// #
@@ -94,18 +94,18 @@ where
9494
/// empty. This method should be used when storing the resume token in order to ensure the
9595
/// most up to date token is received, e.g.
9696
///
97-
/// ```ignore
98-
/// # use mongodb::{sync::Client, error::Result};
97+
/// ```
98+
/// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
9999
/// # fn func() -> Result<()> {
100100
/// # let client = Client::with_uri_str("mongodb://example.com")?;
101-
/// # let coll = client.database("foo").collection("bar");
101+
/// # let coll: Collection<Document> = client.database("foo").collection("bar");
102102
/// let mut change_stream = coll.watch(None, None)?;
103103
/// let mut resume_token = None;
104104
/// while change_stream.is_alive() {
105-
/// if let Some(event) = change_stream.next_if_any() {
105+
/// if let Some(event) = change_stream.next_if_any()? {
106106
/// // process event
107107
/// }
108-
/// resume_token = change_stream.resume_token().cloned();
108+
/// resume_token = change_stream.resume_token();
109109
/// }
110110
/// #
111111
/// # Ok(())
@@ -130,7 +130,7 @@ where
130130
/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
131131
/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
132132
///
133-
/// ```ignore
133+
/// ```
134134
/// # use mongodb::{bson::Document, sync::Client, error::Result};
135135
/// #
136136
/// # async fn do_stuff() -> Result<()> {
@@ -181,7 +181,7 @@ where
181181
/// Retrieve the next result from the change stream.
182182
/// The session provided must be the same session used to create the change stream.
183183
///
184-
/// ```ignore
184+
/// ```
185185
/// # use bson::{doc, Document};
186186
/// # use mongodb::sync::Client;
187187
/// # fn main() {
@@ -215,16 +215,16 @@ where
215215
/// empty. This method should be used when storing the resume token in order to ensure the
216216
/// most up to date token is received, e.g.
217217
///
218-
/// ```ignore
219-
/// # use mongodb::{sync::Client, error::Result};
218+
/// ```
219+
/// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
220220
/// # async fn func() -> Result<()> {
221221
/// # let client = Client::with_uri_str("mongodb://example.com")?;
222-
/// # let coll = client.database("foo").collection("bar");
222+
/// # let coll: Collection<Document> = client.database("foo").collection("bar");
223223
/// # let mut session = client.start_session(None)?;
224224
/// let mut change_stream = coll.watch_with_session(None, None, &mut session)?;
225225
/// let mut resume_token = None;
226226
/// while change_stream.is_alive() {
227-
/// if let Some(event) = change_stream.next_if_any(&mut session) {
227+
/// if let Some(event) = change_stream.next_if_any(&mut session)? {
228228
/// // process event
229229
/// }
230230
/// resume_token = change_stream.resume_token();

src/sync/client/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ impl Client {
177177
///
178178
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
179179
/// changed via [`ChangeStream::with_type`].
180-
#[allow(unused)]
181-
pub(crate) fn watch(
180+
pub fn watch(
182181
&self,
183182
pipeline: impl IntoIterator<Item = Document>,
184183
options: impl Into<Option<ChangeStreamOptions>>,
@@ -190,8 +189,7 @@ impl Client {
190189

191190
/// Starts a new [`SessionChangeStream`] that receives events for all changes in the cluster
192191
/// using the provided [`ClientSession`]. See [`Client::watch`] for more information.
193-
#[allow(unused)]
194-
pub(crate) fn watch_with_session(
192+
pub fn watch_with_session(
195193
&self,
196194
pipeline: impl IntoIterator<Item = Document>,
197195
options: impl Into<Option<ChangeStreamOptions>>,

0 commit comments

Comments
 (0)