Skip to content

Commit 5ecf18e

Browse files
authored
fix(ChangeStream): should resume from errors when iterating
Introduced `getCursor` method to safely provide a change stream cursor for `next`/`hasNext` across recoveries from resumable errors. NODE-2548
1 parent 7fad15a commit 5ecf18e

File tree

3 files changed

+291
-108
lines changed

3 files changed

+291
-108
lines changed

lib/change_stream.js

Lines changed: 134 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22

3+
const Denque = require('denque');
34
const EventEmitter = require('events');
45
const isResumableError = require('./error').isResumableError;
56
const MongoError = require('./core').MongoError;
@@ -9,6 +10,8 @@ const maxWireVersion = require('./core/utils').maxWireVersion;
910
const maybePromise = require('./utils').maybePromise;
1011
const AggregateOperation = require('./operations/aggregate');
1112

13+
const kResumeQueue = Symbol('resumeQueue');
14+
1215
const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
1316
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
1417
CHANGE_STREAM_OPTIONS
@@ -91,6 +94,8 @@ class ChangeStream extends EventEmitter {
9194
this.options.readPreference = parent.s.readPreference;
9295
}
9396

97+
this[kResumeQueue] = new Denque();
98+
9499
// Create contained Change Stream cursor
95100
this.cursor = createChangeStreamCursor(this, options);
96101

@@ -99,9 +104,7 @@ class ChangeStream extends EventEmitter {
99104
// Listen for any `change` listeners being added to ChangeStream
100105
this.on('newListener', eventName => {
101106
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
102-
this.cursor.on('data', change =>
103-
processNewChange({ changeStream: this, change, eventEmitter: true })
104-
);
107+
this.cursor.on('data', change => processNewChange(this, change));
105108
}
106109
});
107110

@@ -130,7 +133,12 @@ class ChangeStream extends EventEmitter {
130133
* @returns {Promise|void} returns Promise if no callback passed
131134
*/
132135
hasNext(callback) {
133-
return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
136+
return maybePromise(this.parent, callback, cb => {
137+
getCursor(this, (err, cursor) => {
138+
if (err) return cb(err); // failed to resume, raise an error
139+
cursor.hasNext(cb);
140+
});
141+
});
134142
}
135143

136144
/**
@@ -142,18 +150,24 @@ class ChangeStream extends EventEmitter {
142150
*/
143151
next(callback) {
144152
return maybePromise(this.parent, callback, cb => {
145-
if (this.isClosed()) {
146-
return cb(new MongoError('ChangeStream is closed'));
147-
}
148-
this.cursor.next((error, change) => {
149-
processNewChange({ changeStream: this, error, change, callback: cb });
153+
getCursor(this, (err, cursor) => {
154+
if (err) return cb(err); // failed to resume, raise an error
155+
cursor.next((error, change) => {
156+
if (error) {
157+
this[kResumeQueue].push(() => this.next(cb));
158+
processError(this, error, cb);
159+
return;
160+
}
161+
processNewChange(this, change, cb);
162+
});
150163
});
151164
});
152165
}
153166

154167
/**
155-
* Is the cursor closed
168+
* Is the change stream closed
156169
* @method ChangeStream.prototype.isClosed
170+
* @param {boolean} [checkCursor=true] also check if the underlying cursor is closed
157171
* @return {boolean}
158172
*/
159173
isClosed() {
@@ -173,6 +187,8 @@ class ChangeStream extends EventEmitter {
173187
// flag the change stream as explicitly closed
174188
this.closed = true;
175189

190+
if (!this.cursor) return cb();
191+
176192
// Tidy up the existing cursor
177193
const cursor = this.cursor;
178194

@@ -383,7 +399,7 @@ function createChangeStreamCursor(self, options) {
383399
*/
384400
if (self.listenerCount('change') > 0) {
385401
changeStreamCursor.on('data', function(change) {
386-
processNewChange({ changeStream: self, change, eventEmitter: true });
402+
processNewChange(self, change);
387403
});
388404
}
389405

@@ -415,7 +431,7 @@ function createChangeStreamCursor(self, options) {
415431
* @type {Error}
416432
*/
417433
changeStreamCursor.on('error', function(error) {
418-
processNewChange({ changeStream: self, error, eventEmitter: true });
434+
processError(self, error);
419435
});
420436

421437
if (self.pipeDestinations) {
@@ -456,73 +472,20 @@ function waitForTopologyConnected(topology, options, callback) {
456472
}, 500); // this is an arbitrary wait time to allow SDAM to transition
457473
}
458474

459-
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
460-
function processNewChange(args) {
461-
const changeStream = args.changeStream;
462-
const error = args.error;
463-
const change = args.change;
464-
const callback = args.callback;
465-
const eventEmitter = args.eventEmitter || false;
475+
function processNewChange(changeStream, change, callback) {
466476
const cursor = changeStream.cursor;
467477

468-
// If the cursor is null or the change stream has been closed explictly, do not process a change.
469-
if (cursor == null || changeStream.closed) {
470-
// We do not error in the eventEmitter case.
471-
changeStream.closed = true;
472-
if (eventEmitter) {
473-
return;
474-
}
475-
callback(new MongoError('ChangeStream is closed'));
478+
if (changeStream.closed) {
479+
if (callback) callback(new MongoError('ChangeStream is closed'));
476480
return;
477481
}
478482

479-
const topology = changeStream.topology;
480-
const options = changeStream.cursor.options;
481-
const wireVersion = maxWireVersion(cursor.server);
482-
483-
if (error) {
484-
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
485-
changeStream.attemptingResume = true;
486-
487-
// stop listening to all events from old cursor
488-
['data', 'close', 'end', 'error'].forEach(event =>
489-
changeStream.cursor.removeAllListeners(event)
490-
);
491-
492-
// close internal cursor, ignore errors
493-
changeStream.cursor.close();
494-
495-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
496-
if (err) {
497-
// if there's an error reconnecting, close the change stream
498-
changeStream.closed = true;
499-
if (eventEmitter) {
500-
changeStream.emit('error', err);
501-
changeStream.emit('close');
502-
return;
503-
}
504-
return callback(err);
505-
}
506-
507-
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
508-
if (eventEmitter) return;
509-
changeStream.next(callback);
510-
});
511-
return;
512-
}
513-
514-
if (eventEmitter) return changeStream.emit('error', error);
515-
return callback(error);
516-
}
517-
518-
changeStream.attemptingResume = false;
519-
520483
if (change && !change._id) {
521484
const noResumeTokenError = new Error(
522485
'A change stream document has been received that lacks a resume token (_id).'
523486
);
524487

525-
if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
488+
if (!callback) return changeStream.emit('error', noResumeTokenError);
526489
return callback(noResumeTokenError);
527490
}
528491

@@ -534,8 +497,108 @@ function processNewChange(args) {
534497
changeStream.options.startAtOperationTime = undefined;
535498

536499
// Return the change
537-
if (eventEmitter) return changeStream.emit('change', change);
538-
return callback(error, change);
500+
if (!callback) return changeStream.emit('change', change);
501+
return callback(undefined, change);
502+
}
503+
504+
function processError(changeStream, error, callback) {
505+
const topology = changeStream.topology;
506+
const cursor = changeStream.cursor;
507+
508+
// If the change stream has been closed explictly, do not process error.
509+
if (changeStream.closed) {
510+
if (callback) callback(new MongoError('ChangeStream is closed'));
511+
return;
512+
}
513+
514+
// if the resume succeeds, continue with the new cursor
515+
function resumeWithCursor(newCursor) {
516+
changeStream.cursor = newCursor;
517+
processResumeQueue(changeStream);
518+
}
519+
520+
// otherwise, raise an error and close the change stream
521+
function unresumableError(err) {
522+
if (!callback) {
523+
changeStream.emit('error', err);
524+
changeStream.emit('close');
525+
}
526+
processResumeQueue(changeStream, err);
527+
changeStream.closed = true;
528+
}
529+
530+
if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
531+
changeStream.cursor = undefined;
532+
533+
// stop listening to all events from old cursor
534+
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
535+
536+
// close internal cursor, ignore errors
537+
cursor.close();
538+
539+
waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
540+
// if the topology can't reconnect, close the stream
541+
if (err) return unresumableError(err);
542+
543+
// create a new cursor, preserving the old cursor's options
544+
const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
545+
546+
// attempt to continue in emitter mode
547+
if (!callback) return resumeWithCursor(newCursor);
548+
549+
// attempt to continue in iterator mode
550+
newCursor.hasNext(err => {
551+
// if there's an error immediately after resuming, close the stream
552+
if (err) return unresumableError(err);
553+
resumeWithCursor(newCursor);
554+
});
555+
});
556+
return;
557+
}
558+
559+
if (!callback) return changeStream.emit('error', error);
560+
return callback(error);
561+
}
562+
563+
/**
564+
* Safely provides a cursor across resume attempts
565+
*
566+
* @param {ChangeStream} changeStream the parent ChangeStream
567+
* @param {function} callback gets the cursor or error
568+
* @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
569+
*/
570+
function getCursor(changeStream, callback) {
571+
if (changeStream.isClosed()) {
572+
callback(new MongoError('ChangeStream is closed.'));
573+
return;
574+
}
575+
576+
// if a cursor exists and it is open, return it
577+
if (changeStream.cursor) {
578+
callback(undefined, changeStream.cursor);
579+
return;
580+
}
581+
582+
// no cursor, queue callback until topology reconnects
583+
changeStream[kResumeQueue].push(callback);
584+
}
585+
586+
/**
587+
* Drain the resume queue when a new has become available
588+
*
589+
* @param {ChangeStream} changeStream the parent ChangeStream
590+
* @param {ChangeStreamCursor?} changeStream.cursor the new cursor
591+
* @param {Error} [err] error getting a new cursor
592+
*/
593+
function processResumeQueue(changeStream, err) {
594+
while (changeStream[kResumeQueue].length) {
595+
const request = changeStream[kResumeQueue].pop();
596+
if (changeStream.isClosed() && !err) {
597+
request(new MongoError('Change Stream is not open.'));
598+
return;
599+
}
600+
request(err, changeStream.cursor);
601+
}
539602
}
540603

541604
/**

0 commit comments

Comments
 (0)