Skip to content

Commit 7248837

Browse files
authored
RUST-522 Implement resume functionality for change streams (#547)
1 parent cc7a1c3 commit 7248837

File tree

15 files changed

+468
-261
lines changed

15 files changed

+468
-261
lines changed

src/change_stream/event.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@ pub struct ResumeToken(pub(crate) RawBson);
2727

2828
impl ResumeToken {
2929
pub(crate) fn initial(
30-
options: &Option<ChangeStreamOptions>,
30+
options: Option<&ChangeStreamOptions>,
3131
spec: &CursorSpecification,
3232
) -> Option<ResumeToken> {
3333
match &spec.post_batch_resume_token {
3434
// Token from initial response from `aggregate`
3535
Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
3636
// Token from options passed to `watch`
3737
_ => options
38-
.as_ref()
3938
.and_then(|o| o.start_after.as_ref().or_else(|| o.resume_after.as_ref()))
4039
.cloned(),
4140
}

src/change_stream/mod.rs

Lines changed: 90 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use std::{
1010
task::{Context, Poll},
1111
};
1212

13-
use bson::Document;
14-
use futures_core::Stream;
13+
use bson::{Document, Timestamp};
14+
use derivative::Derivative;
15+
use futures_core::{future::BoxFuture, Stream};
1516
use serde::{de::DeserializeOwned, Deserialize};
1617

1718
use crate::{
@@ -20,11 +21,12 @@ use crate::{
2021
options::ChangeStreamOptions,
2122
},
2223
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
23-
error::Result,
24+
error::{Error, Result},
2425
operation::AggregateTarget,
2526
options::AggregateOptions,
2627
selection_criteria::{ReadPreference, SelectionCriteria},
2728
Client,
29+
ClientSession,
2830
Collection,
2931
Cursor,
3032
Database,
@@ -75,34 +77,37 @@ use crate::{
7577
///
7678
/// See the documentation [here](https://docs.mongodb.com/manual/changeStreams) for more
7779
/// details. Also see the documentation on [usage recommendations](https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/).
78-
#[derive(Debug)]
80+
#[derive(Derivative)]
81+
#[derivative(Debug)]
7982
pub struct ChangeStream<T>
8083
where
8184
T: DeserializeOwned + Unpin + Send + Sync,
8285
{
8386
/// The cursor to iterate over event instances.
8487
cursor: Cursor<T>,
8588

86-
/// The information associate with this change stream.
89+
/// Arguments to `watch` that created this change stream.
90+
args: WatchArgs,
91+
92+
/// Dynamic information associated with this change stream.
8793
data: ChangeStreamData,
8894

89-
/// The cached resume token.
90-
resume_token: Option<ResumeToken>,
95+
/// A pending future for a resume.
96+
#[derivative(Debug = "ignore")]
97+
pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>>,
9198
}
9299

93100
impl<T> ChangeStream<T>
94101
where
95102
T: DeserializeOwned + Unpin + Send + Sync,
96103
{
97-
pub(crate) fn new(
98-
cursor: Cursor<T>,
99-
data: ChangeStreamData,
100-
resume_token: Option<ResumeToken>,
101-
) -> Self {
104+
pub(crate) fn new(cursor: Cursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
105+
let pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>> = None;
102106
Self {
103107
cursor,
108+
args,
104109
data,
105-
resume_token,
110+
pending_resume,
106111
}
107112
}
108113

@@ -112,16 +117,17 @@ where
112117
/// See the documentation
113118
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
114119
/// information on change stream resume tokens.
115-
pub fn resume_token(&self) -> Option<&ResumeToken> {
116-
self.resume_token.as_ref()
120+
pub fn resume_token(&self) -> Option<ResumeToken> {
121+
self.data.resume_token.clone()
117122
}
118123

119124
/// Update the type streamed values will be parsed as.
120125
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
121126
ChangeStream {
122127
cursor: self.cursor.with_type(),
128+
args: self.args,
123129
data: self.data,
124-
resume_token: self.resume_token,
130+
pending_resume: None,
125131
}
126132
}
127133

@@ -162,45 +168,48 @@ where
162168
}
163169
}
164170

165-
#[derive(Debug)]
166-
pub(crate) struct ChangeStreamData {
171+
/// Arguments passed to a `watch` method, captured to allow resume.
172+
#[derive(Debug, Clone)]
173+
pub(crate) struct WatchArgs {
167174
/// The pipeline of stages to append to an initial `$changeStream` stage.
168-
pipeline: Vec<Document>,
169-
170-
/// The client that was used for the initial `$changeStream` aggregation, used for server
171-
/// selection during an automatic resume.
172-
client: Client,
175+
pub(crate) pipeline: Vec<Document>,
173176

174-
/// The original target of the change stream, used for re-issuing the aggregation during
175-
/// an automatic resume.
176-
target: AggregateTarget,
177+
/// The original target of the change stream.
178+
pub(crate) target: AggregateTarget,
177179

178180
/// The options provided to the initial `$changeStream` stage.
179-
options: Option<ChangeStreamOptions>,
181+
pub(crate) options: Option<ChangeStreamOptions>,
182+
}
183+
184+
/// Dynamic change stream data needed for resume.
185+
#[derive(Debug, Default)]
186+
pub(crate) struct ChangeStreamData {
187+
/// The `operationTime` returned by the initial `aggregate` command.
188+
pub(crate) initial_operation_time: Option<Timestamp>,
189+
190+
/// The cached resume token.
191+
pub(crate) resume_token: Option<ResumeToken>,
180192

181193
/// Whether or not the change stream has attempted a resume, used to attempt a resume only
182194
/// once.
183-
resume_attempted: bool,
195+
pub(crate) resume_attempted: bool,
184196

185197
/// Whether or not the change stream has returned a document, used to update resume token
186198
/// during an automatic resume.
187-
document_returned: bool,
199+
pub(crate) document_returned: bool,
200+
201+
/// The implicit session used to create the original cursor.
202+
pub(crate) implicit_session: Option<ClientSession>,
188203
}
189204

190205
impl ChangeStreamData {
191-
pub(crate) fn new(
192-
pipeline: Vec<Document>,
193-
client: Client,
194-
target: AggregateTarget,
195-
options: Option<ChangeStreamOptions>,
196-
) -> Self {
206+
fn take(&mut self) -> Self {
197207
Self {
198-
pipeline,
199-
client,
200-
target,
201-
options,
202-
resume_attempted: false,
203-
document_returned: false,
208+
initial_operation_time: self.initial_operation_time,
209+
resume_token: self.resume_token.clone(),
210+
resume_attempted: self.resume_attempted,
211+
document_returned: self.document_returned,
212+
implicit_session: self.implicit_session.take(),
204213
}
205214
}
206215
}
@@ -227,11 +236,48 @@ where
227236
T: DeserializeOwned + Unpin + Send + Sync,
228237
{
229238
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
239+
if let Some(mut pending) = self.pending_resume.take() {
240+
match Pin::new(&mut pending).poll(cx) {
241+
Poll::Pending => {
242+
self.pending_resume = Some(pending);
243+
return Poll::Pending;
244+
}
245+
Poll::Ready(Ok(new_stream)) => {
246+
// Ensure that the old cursor is killed on the server selected for the new one.
247+
self.cursor
248+
.set_drop_address(new_stream.cursor.address().clone());
249+
self.cursor = new_stream.cursor;
250+
self.args = new_stream.args;
251+
return Poll::Pending;
252+
}
253+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
254+
}
255+
}
230256
let out = self.cursor.poll_next_in_batch(cx);
231-
if let Poll::Ready(Ok(bv)) = &out {
232-
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
233-
self.resume_token = Some(token);
257+
match &out {
258+
Poll::Ready(Ok(bv)) => {
259+
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
260+
self.data.resume_token = Some(token);
261+
}
262+
if matches!(bv, BatchValue::Some { .. }) {
263+
self.data.document_returned = true;
264+
}
265+
}
266+
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => {
267+
self.data.resume_attempted = true;
268+
let client = self.cursor.client().clone();
269+
let args = self.args.clone();
270+
let mut data = self.data.take();
271+
data.implicit_session = self.cursor.take_implicit_session();
272+
self.pending_resume = Some(Box::pin(async move {
273+
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client
274+
.execute_watch(args.pipeline, args.options, args.target, Some(data))
275+
.await;
276+
new_stream.map(|cs| cs.with_type::<T>())
277+
}));
278+
return Poll::Pending;
234279
}
280+
_ => {}
235281
}
236282
out
237283
}

0 commit comments

Comments
 (0)