Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 68 additions & 40 deletions api/parts/data/changeStreamReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class changeStreamReader {
this.waitingForAcknowledgement = false;
this.fallback = options.fallback;

if (this.fallback && !this.fallback.inteval) {
if (this.fallback && !this.fallback.interval) {
this.fallback.interval = 1000;
}

Expand All @@ -50,13 +50,16 @@ class changeStreamReader {
*/
checkState() {
if ((!this.stream || this.stream.closed) && !this.keep_closed) {
console.log("Stream is closed. Setting up again");
log.i(`[${this.name}] Stream is closed. Setting up again`);
this.setUp(this.onData);
}
else if (this.waitingForAcknowledgement && Date.now() - this.waitingForAcknowledgement > 60000) {
console.log("Waiting for acknowledgement for more than 60 seconds. Closing stream and restarting");
const waitTime = Date.now() - this.waitingForAcknowledgement;
log.w(`[${this.name}] Waiting for acknowledgement for ${waitTime}ms (>60s). Closing stream and restarting`);
this.keep_closed = false;
this.stream.close();
if (this.stream && !this.stream.closed) {
this.stream.close();
}
}
}

Expand Down Expand Up @@ -100,33 +103,54 @@ class changeStreamReader {
* @param {Object} tokenInfo - Token info object
*/
async processBadRange(options, tokenInfo) {
console.log("Processing bad range");
console.log(JSON.stringify({cd: {$gte: options.cd1, $lt: options.cd2}}));
log.i(`[${this.name}] Processing bad range for timespan: ${options.cd1} to ${options.cd2}`);
log.d(`[${this.name}] Query filter: ${JSON.stringify({cd: {$gte: options.cd1, $lt: options.cd2}})}`);

var gotTokenDoc = false;
var doc;
var cursor = this.db.collection(this.collection).find({cd: {$gte: new Date(options.cd1), $lt: new Date(options.cd2)}}).sort({cd: 1});
while (await cursor.hasNext() && !gotTokenDoc) {
doc = await cursor.next();
if (JSON.stringify(doc._id) === JSON.stringify(tokenInfo._id) || doc.cd > tokenInfo.cd) {
gotTokenDoc = true;
var cursor;
try {
cursor = this.db.collection(this.collection).find({cd: {$gte: new Date(options.cd1), $lt: new Date(options.cd2)}}).sort({cd: 1});

while (await cursor.hasNext() && !gotTokenDoc) {
doc = await cursor.next();
if (JSON.stringify(doc._id) === JSON.stringify(tokenInfo._id) || doc.cd > tokenInfo.cd) {
gotTokenDoc = true;
}
log.d(`[${this.name}] Skipping document: ${doc._id} at ${doc.cd}`);
}
console.log("SKIP:" + JSON.stringify(doc));

if (doc && doc.cd > tokenInfo.cd) {
tokenInfo.cd = doc.cd;
tokenInfo._id = doc._id;
log.d(`[${this.name}] Processing recovered document: ${doc._id} at ${doc.cd}`);
this.onData(tokenInfo, doc);
}

while (await cursor.hasNext()) {
doc = await cursor.next();
log.d(`[${this.name}] Processing document: ${doc._id} at ${doc.cd}`);
tokenInfo.cd = doc.cd;
tokenInfo._id = doc._id;
this.onData(tokenInfo, doc);
}

log.i(`[${this.name}] Bad range processing completed successfully`);
}
if (doc && doc.cd > tokenInfo.cd) {
tokenInfo.cd = doc.cd;
tokenInfo._id = doc._id;
console.log(this.name + " Process:" + JSON.stringify(doc));
this.onData(tokenInfo, doc);
catch (err) {
log.e(`[${this.name}] Error during bad range processing`, err);
throw err;
}

while (await cursor.hasNext()) {
doc = await cursor.next();
console.log(this.name + " Process:" + JSON.stringify(doc));
tokenInfo.cd = doc.cd;
tokenInfo._id = doc._id;
this.onData(tokenInfo, doc);
finally {
if (cursor) {
try {
await cursor.close();
}
catch (err) {
log.w(`[${this.name}] Error closing cursor during bad range processing`, err);
}
}
}
console.log("done");
}

/**
Expand All @@ -137,7 +161,7 @@ class changeStreamReader {
var token;
try {
if (this.stream && !this.stream.closed) {
console.log("Stream is already open. returning");
log.d(`[${this.name}] Stream is already open, skipping setup`);
return;
}
var options = JSON.parse(JSON.stringify(this.options || {}));
Expand All @@ -148,15 +172,15 @@ class changeStreamReader {
options.startAfter = token.token;
}
if (this.failedToken && JSON.stringify(this.failedToken.token) === JSON.stringify(token.token)) {
console.log("Do not use failed token");
log.w(`[${this.name}] Not using failed token, switching to time-based resume`);
tokenFailed = true;
delete options.startAfter;
var startTime = Date.now().valueOf() / 1000 - 60;
if (startTime) {
options.startAtOperationTime = new Timestamp({t: startTime, i: 1});
}
}
console.log("Stream options: " + JSON.stringify(options));
log.d(`[${this.name}] Stream options: ${JSON.stringify(options)}`);
if (this.collection) {
this.stream = await this.db.collection(this.collection).watch(this.pipeline, options);
}
Expand All @@ -167,11 +191,11 @@ class changeStreamReader {

if (tokenFailed) {
//fetch data while cd is less than failed token
console.log("Fetching data while cd is less or equal cd to failed token");
log.i(`[${this.name}] Fetching data while timestamp is less than or equal to failed token timestamp`);
var doc;
do {
doc = await this.stream.next();
console.log(JSON.stringify(doc));
log.d(`[${this.name}] Processing document during token recovery: ${doc?._id} at ${doc?.cd}`);
}
while (doc && doc.cd && doc.cd <= token.cd);
this.keep_closed = true;
Expand All @@ -180,7 +204,7 @@ class changeStreamReader {
next_token._id = doc.__id;
next_token.cd = doc.cd;
try {
this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken);
await this.processBadRange({name: this.name, cd1: token.cd, cd2: next_token.cd}, this.failedToken);
this.onData(next_token, doc);
this.waitingForAcknowledgement = Date.now();
this.restartStream = true;
Expand Down Expand Up @@ -214,7 +238,7 @@ class changeStreamReader {
this.failedToken = token;
}
else if (err.code === 40573) { //change stream is not supported
console.log("Change stream is not supported. Keeping streams closed");
log.w(`[${this.name}] Change streams not supported by database, switching to polling mode`);
this.keep_closed = true;
var newCD = Date.now();
if (token && token.cd) {
Expand All @@ -239,12 +263,12 @@ class changeStreamReader {
}
catch (err) {
if (err.code === 286 || err.code === 50811 || err.code === 9) { //failed because of bad token
console.log("Set Failed token", token);
log.w(`[${this.name}] Invalid token detected, marking as failed`, {token, error: err.message});
this.failedToken = token;
}
//Failed because of db does not support change streams. Run in "query mode";
else if (err.code === 40573) { //change stream is not supported
console.log("Change stream is not supported. Keeping streams closed");
log.w(`[${this.name}] Change streams not supported by database, switching to polling mode`);
this.keep_closed = true;
var newCD = Date.now();
if (token && token.cd) {
Expand All @@ -256,7 +280,7 @@ class changeStreamReader {
//Switch to query mode
}
else {
log.e("Error on change stream", err);
log.e(`[${this.name}] Unexpected error during change stream setup`, err);
}
}
}
Expand All @@ -272,9 +296,11 @@ class changeStreamReader {
await common.db.collection("plugins").updateOne({"_id": "_changeStreams"}, {$set: {[this.name]: token}}, {"upsert": true});
if (this.restartStream) {
this.waitingForAcknowledgement = false;
this.keep_closed = false;
this.restartStream = false;
this.stream.close();
this.keep_closed = false;
if (this.stream && !this.stream.closed) {
this.stream.close();
}
}
}
catch (err) {
Expand All @@ -286,15 +312,17 @@ class changeStreamReader {
* Closes stream permanently
*/
close() {
console.log("Closing permanently");
log.i(`[${this.name}] Closing change stream reader permanently`);
if (this.intervalRunner) {
clearInterval(this.intervalRunner);
this.intervalRunner = null;
}
this.keep_closed = true;
this.stream.close(true);
if (this.stream && !this.stream.closed) {
this.stream.close(true);
}
}


}

module.exports = {changeStreamReader};
Loading